Using Push Queues in Go
In App Engine push queues, a task is a unit of work to be performed by the application. Each task is an object of the Task type. Each Task object contains an application-specific URL with a request handler for the task, and an optional data payload that parameterizes the task.
For example, consider a calendaring application that needs to notify an invitee, via email, that an event has been updated. The data payload for this task consists of the email address and name of the invitee, along with a description of the event. The webhook might live at /app_worker/send_email and contain a function that adds the relevant strings to an email template and sends the email. The app can create a separate task for each email it needs to send.
You can use push queues only within the App Engine environment; if you need to access App Engine tasks from outside of App Engine, use pull queues.
- Using push queues
- Task execution
- Deferred tasks
- URL endpoints
- Push queues and the development server
- Push queues and backends
- Quotas and limits for push queues
Using push queues
A Go app sets up queues using a configuration file named queue.yaml (see Go Task Queue Configuration). If an app does not have a queue.yaml file, it has a queue named default with some default settings.
To enqueue a task, you call the taskqueue.Add function. The task consists of data for a request, including a URL path, parameters, HTTP headers, and an HTTP payload. It can also include the earliest time to execute the task (the default is as soon as possible) and a name for the task. The task is added to a queue, then performed by the Task Queue service as the queue is processed.
The following example defines a task handler (worker) that increments a counter in the datastore, mapped to the URL /worker. It also defines a user-accessible request handler that displays the current value of the counter for a GET request, and for a POST request enqueues a task and returns. It's difficult to visualize the execution of tasks with the user-accessible handler if the queue processes them too quickly. Therefore, the task in this example should run at a rate no greater than once per second.
package counter
import (
"html/template"
"net/http"
"appengine"
"appengine/datastore"
"appengine/taskqueue"
)
func init() {
http.HandleFunc("/", handler)
http.HandleFunc("/worker", worker)
}
type Counter struct {
Name string
Count int
}
func handler(w http.ResponseWriter, r *http.Request) {
c := appengine.NewContext(r)
if name := r.FormValue("name"); name != "" {
t := taskqueue.NewPOSTTask("/worker", map[string][]string{"name": {name}})
if _, err := taskqueue.Add(c, t, ""); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
q := datastore.NewQuery("Counter")
var counters []Counter
if _, err := q.GetAll(c, &counters); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := handlerTemplate.Execute(w, counters); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// OK
}
func worker(w http.ResponseWriter, r *http.Request) {
c := appengine.NewContext(r)
name := r.FormValue("name")
key := datastore.NewKey(c, "Counter", name, 0, nil)
var counter Counter
if err := datastore.Get(c, key, &counter); err == datastore.ErrNoSuchEntity {
counter.Name = name
} else if err != nil {
c.Errorf("%v", err)
return
}
counter.Count++
if _, err := datastore.Put(c, key, &counter); err != nil {
c.Errorf("%v", err)
}
}
var handlerTemplate = template.Must(template.New("handler").Parse(handlerHTML))
const handlerHTML = `
{{repeat .}}
<p>{{.Name}}: {{.Count}}</p>
{{end}}
<p>Start a new counter:</p>
<form action="/" method="POST">
<input type="text" name="name">
<input type="submit" value="Add">
</form>
`
Note that this example is not idempotent. It is possible for the task queue to execute a task more than once. In this case, the counter is incremented each time the task is run, possibly skewing the results.
Task execution
App Engine executes push tasks by calling application-specific URLs with request handlers for those tasks. These URLs must be local to your application root directory and specified as a relative URL. You attach this URL to the task definition using the Path field of the Task struct.
Programmatically referring to a bundled HTTP request in this fashion is sometimes called a "web hook." You can specify web hooks ahead of time, without waiting for their actual execution. Thus, an application can create many web hooks at once and then hand them off to App Engine. The system then processes them asynchronously (by invoking the HTTP request). This web hook model enables efficient parallel processing.
A task must finish executing and send an HTTP response value between 200–299 within 10 minutes of the original request. This deadline is separate from user requests, which have a 60-second deadline. If the task failed to execute, App Engine retries it based on criteria that you can configure.
Task request headers
Requests from the Task Queue service contain the following HTTP headers:
X-AppEngine-QueueName, the name of the queue (possiblydefault)X-AppEngine-TaskName, the name of the task, or a system-generated unique ID if no name was specifiedX-AppEngine-TaskRetryCount, the number of times this task has been retried; for the first attempt, this value is 0. This number includes attempts where the task failed due to a lack of available instances and never reached the execution phase.X-AppEngine-TaskExecutionCount, the number of times this task has previously failed during the execution phase. This number does not include failures due to a lack of available instances.X-AppEngine-TaskETA, the target execution time of the task, specified in milliseconds since January 1st 1970.
These headers are set internally by Google App Engine. If your request handler finds any of these headers, it can trust that the request is a Task Queue request. If any of the above headers are present in an external user request to your app, they are stripped. The exception being requests from logged in administrators of the application, who are allowed to set the headers for testing purposes.
Tasks may be created with the X-AppEngine-FailFast header, which specifies that a task running on a backend fails immediately instead of waiting in a pending queue.
Google App Engine issues Task Queue requests from the IP address 0.1.0.2.
The rate of task execution
You set the maximum processing rate for the entire queue when you configure the queue. App Engine uses a token bucket algorithm to execute tasks once they've been delivered to the queue. Each queue has a token bucket, and each bucket holds a certain number of tokens. Your app consumes a token each time it executes a task. If the bucket runs out of tokens, the system pauses until the bucket has more tokens. The rate at which the bucket is refilled is the limiting factor that determines the rate of the queue. See Defining Push Queues and Processing Rates for more details.
To ensure that the Task Queue system does not overwhelm your application, it may throttle the rate at which requests are sent. This throttled rate is known as the enforced rate. The enforced rate may be decreased when your application returns a 503 HTTP response code, or if there are no instances able to execute a request for an extended period of time. You can view the enforced rate on the Task Queue tab of the Administration Console.
The order of task execution
The order in which tasks are executed depends on several factors:
- The position of the task in the queue. App Engine attempts to process tasks based on FIFO (first in, first out) order. In general, tasks are inserted into the end of a queue, and executed from the head of the queue.
- The backlog of tasks in the queue. The system attempts to deliver the lowest latency possible for any given task via specially optimized notifications to the scheduler. Thus, in the case that a queue has a large backlog of tasks, the system's scheduling may "jump" new tasks to the head of the queue.
- The value of the task's ETA field. This specifies the earliest time that a task can execute. App Engine always waits until after the specified ETA to process push tasks.
- The value of the task's Delay field. This specifies the minimum number of seconds to wait before executing a task. Delay and ETA are mutually exclusive; if you specify one, do not specify the other.
Task retries
If a push task request handler returns an HTTP status code within the range 200–299, App Engine considers the task to have completed successfully. If the task returns a status code outside of this range, App Engine retries the task until it succeeds. The system backs off gradually to avoid flooding your application with too many requests, but schedules retry attempts for failed tasks to recur at a maximum of once per hour.
You can also configure your own scheme for task retries using the retry_parameters directive in queue.yaml.
When implementing the code for tasks (as worker URLs within your app), it is important to consider whether the task is idempotent. App Engine's Task Queue API is designed to only invoke a given task once; however, it is possible in exceptional circumstances that a task may execute multiple times (such as in the unlikely case of major system failure). Thus, your code must ensure that there are no harmful side-effects of repeated execution.
Deferred tasks
Setting up a handler for each distinct task (as described in the previous sections) can be cumbersome, as can serializing and deserializing complex arguments for the task—particularly if you have many diverse but small tasks that you want to run on the queue. The Go SDK includes a package (appengine/delay) exposing a simple API that allows you to bypass all the work of setting up dedicated task handlers and serializing and deserializing your parameters.
To use the delay package:
import "appengine/delay"
var expensiveFunc = delay.Func("some-arbitrary-key", func(c appengine.Context, a string, b int) {
// do something expensive!
})
// Somewhere else
expensiveFunc.Call(c, "Hello, world!", 42)
The delay package serializes your function call and its arguments, then adds it to the task queue. When the task is executed, the delay package executes the function.
For more information about using the delay package, refer to its documentation.
URL endpoints
Push tasks reference their implementation via URL. For example, a task which fetches and parses an RSS feed might use a worker URL called /app_worker/fetch_feed. You can specify this worker URL or use the default. In general, you can use any URL as the worker for a task, so long as it is within your application; all task worker URLs must be specified as relative URLs:
import "appengine/taskqueue"
t := &taskqueue.Task{Path: "/path/to/my/worker"}
t := &taskqueue.Task{Path: "/path?a=b&c=d", Method: "GET"}
If you do not specify a worker URL, the task uses a default worker URL named after the queue:
/_ah/queue/queue_name
A queue's default URL is used if, and only if, a task does not have a worker URL of its own. If a task does have its own worker URL, then it is only invoked at the worker URL, never another. Once inserted into a queue, its url endpoint cannot be changed.
You can also target tasks to App Engine Backends. Backends allow you to process tasks beyond the 10-minute deadline for task execution. See Push Queues and Backends for more information.
Securing URLs for tasks
You can prevent users from accessing URLs of tasks by restricting access to administrator accounts. Task queues can access admin-only URLs. You can restrict a URL by adding login: admin to the handler configuration in app.yaml.
An example might look like this in app.yaml:
application: hello-tasks
version: 1
runtime: go
api_version: go1
handlers:
- url: /tasks/process
script: _go_app
login: admin
For more information see Go Application Configuration: Requiring Login or Administrator Status.
To test a task web hook, sign in as an administrator and visit the URL of the handler in your browser.
Push queues and the development server
When your app is running in the development server, tasks are automatically executed at the appropriate time just as in production. To disable tasks from running in the development server, run the following command:
dev_appserver.py --disable_task_runnning
You can examine and manipulate tasks from the developer console at: http://localhost:8000/taskqueue.
To execute tasks, select the queue by clicking on its name, select the tasks to execute, and click Run Now. To clear a queue without executing any tasks, click Purge Queue.
Push queues and backends
Push tasks typically must finish execution within 10 minutes. If you have push tasks that require more time or computing resources to process, you can use App Engine Backends to process these tasks outside of the normal limits of App Engine applications. The following code sample demonstrates how to create a push task addressed to an instance 1 of a backend named backend1:
import (
"net/http"
"appengine"
"appengine/taskqueue"
)
key := r.FormValue("key")
// Create a task pointed at a backend.
t := taskqueue.NewPOSTTask("/path/to/my/worker/", url.Values{
"key": {key},
})
t.Header = http.Header{
"Host": []string{appengine.BackendHostname(c, "backend1", 1)},
}
// Add the task to the default queue.
err := taskqueue.Add(c, t, "")
Quotas and limits for push queues
Enqueuing a task in a push queue counts toward the following quotas:
- Task Queue Stored Task Count
- Task Queue Stored Task Bytes
- Task Queue API Calls
The Task Queue Stored Task Bytes quota is configurable in queue.yaml by setting total_storage_limit. This quota counts towards your Stored Data (billable) quota.
Execution of a task counts toward the following quotas:
- Requests
- Incoming Bandwidth
- Outgoing Bandwidth
The act of executing a task consumes bandwidth-related quotas for the request and response data, just as if the request handler were called by a remote client. When the task queue processes a task, the response data is discarded.
Once a task has been executed or deleted, the storage used by that task is reclaimed. The reclaiming of storage quota for tasks happens at regular intervals, and this may not be reflected in the storage quota immediately after the task is deleted.
For more information on quotas, see Quotas, and the "Quota Details" section of the Admin Console.
In addition to quotas, the following limits apply to the use of push queues:
| Limit | Amount |
|---|---|
| task object size | 100KB |
| number of active queues (not including the default queue) | 10 for free apps 100 for billed apps |
| queue execution rate | 500 task invocations per second per queue |
| maximum countdown/ETA for a task | 30 days from the current date and time |
| maximum number of tasks that can be added in a batch | 100 tasks |
| maximum number of tasks that can be added in a transaction | 5 tasks |