// Copyright 2011 Google Inc. All rights reserved. // Use of this source code is governed by the Apache 2.0 // license that can be found in the LICENSE file. /* Package delay provides a way to execute code outside the scope of a user request by using the taskqueue API. To declare a function that may be executed later, call Func in a top-level assignment context, passing it an arbitrary string key and a function whose first argument is of type context.Context. var laterFunc = delay.Func("key", myFunc) It is also possible to use a function literal. var laterFunc = delay.Func("key", func(c context.Context, x string) { // ... }) To call a function, invoke its Call method. laterFunc.Call(c, "something") A function may be called any number of times. If the function has any return arguments, and the last one is of type error, the function may return a non-nil error to signal that the function should be retried. The arguments to functions may be of any type that is encodable by the gob package. If an argument is of interface type, it is the client's responsibility to register with the gob package whatever concrete type may be passed for that argument; see http://golang.org/pkg/gob/#Register for details. Any errors during initialization or execution of a function will be logged to the application logs. Error logs that occur during initialization will be associated with the request that invoked the Call method. The state of a function invocation that has not yet successfully executed is preserved by combining the file name in which it is declared with the string key that was passed to the Func function. Updating an app with pending function invocations is safe as long as the relevant functions have the (filename, key) combination preserved. The delay package uses the Task Queue API to create tasks that call the reserved application path "/_ah/queue/go/delay". This path must not be marked as "login: required" in app.yaml; it must be marked as "login: admin" or have no access restriction. */ package delay // import "google.golang.org/appengine/delay" import ( "bytes" "encoding/gob" "errors" "fmt" "net/http" "reflect" "runtime" "golang.org/x/net/context" "google.golang.org/appengine" "google.golang.org/appengine/log" "google.golang.org/appengine/taskqueue" ) // Function represents a function that may have a delayed invocation. type Function struct { fv reflect.Value // Kind() == reflect.Func key string err error // any error during initialization } const ( // The HTTP path for invocations. path = "/_ah/queue/go/delay" // Use the default queue. queue = "" ) var ( // registry of all delayed functions funcs = make(map[string]*Function) // precomputed types contextType = reflect.TypeOf((*context.Context)(nil)).Elem() errorType = reflect.TypeOf((*error)(nil)).Elem() // errors errFirstArg = errors.New("first argument must be context.Context") ) // Func declares a new Function. The second argument must be a function with a // first argument of type context.Context. // This function must be called at program initialization time. That means it // must be called in a global variable declaration or from an init function. // This restriction is necessary because the instance that delays a function // call may not be the one that executes it. Only the code executed at program // initialization time is guaranteed to have been run by an instance before it // receives a request. func Func(key string, i interface{}) *Function { f := &Function{fv: reflect.ValueOf(i)} // Derive unique, somewhat stable key for this func. _, file, _, _ := runtime.Caller(1) f.key = file + ":" + key t := f.fv.Type() if t.Kind() != reflect.Func { f.err = errors.New("not a function") return f } if t.NumIn() == 0 || t.In(0) != contextType { f.err = errFirstArg return f } // Register the function's arguments with the gob package. // This is required because they are marshaled inside a []interface{}. // gob.Register only expects to be called during initialization; // that's fine because this function expects the same. for i := 0; i < t.NumIn(); i++ { // Only concrete types may be registered. If the argument has // interface type, the client is resposible for registering the // concrete types it will hold. if t.In(i).Kind() == reflect.Interface { continue } gob.Register(reflect.Zero(t.In(i)).Interface()) } if old := funcs[f.key]; old != nil { old.err = fmt.Errorf("multiple functions registered for %s in %s", key, file) } funcs[f.key] = f return f } type invocation struct { Key string Args []interface{} } // Call invokes a delayed function. // err := f.Call(c, ...) // is equivalent to // t, _ := f.Task(...) // _, err := taskqueue.Add(c, t, "") func (f *Function) Call(c context.Context, args ...interface{}) error { t, err := f.Task(args...) if err != nil { return err } _, err = taskqueueAdder(c, t, queue) return err } // Task creates a Task that will invoke the function. // Its parameters may be tweaked before adding it to a queue. // Users should not modify the Path or Payload fields of the returned Task. func (f *Function) Task(args ...interface{}) (*taskqueue.Task, error) { if f.err != nil { return nil, fmt.Errorf("delay: func is invalid: %v", f.err) } nArgs := len(args) + 1 // +1 for the context.Context ft := f.fv.Type() minArgs := ft.NumIn() if ft.IsVariadic() { minArgs-- } if nArgs < minArgs { return nil, fmt.Errorf("delay: too few arguments to func: %d < %d", nArgs, minArgs) } if !ft.IsVariadic() && nArgs > minArgs { return nil, fmt.Errorf("delay: too many arguments to func: %d > %d", nArgs, minArgs) } // Check arg types. for i := 1; i < nArgs; i++ { at := reflect.TypeOf(args[i-1]) var dt reflect.Type if i < minArgs { // not a variadic arg dt = ft.In(i) } else { // a variadic arg dt = ft.In(minArgs).Elem() } // nil arguments won't have a type, so they need special handling. if at == nil { // nil interface switch dt.Kind() { case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice: continue // may be nil } return nil, fmt.Errorf("delay: argument %d has wrong type: %v is not nilable", i, dt) } switch at.Kind() { case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice: av := reflect.ValueOf(args[i-1]) if av.IsNil() { // nil value in interface; not supported by gob, so we replace it // with a nil interface value args[i-1] = nil } } if !at.AssignableTo(dt) { return nil, fmt.Errorf("delay: argument %d has wrong type: %v is not assignable to %v", i, at, dt) } } inv := invocation{ Key: f.key, Args: args, } buf := new(bytes.Buffer) if err := gob.NewEncoder(buf).Encode(inv); err != nil { return nil, fmt.Errorf("delay: gob encoding failed: %v", err) } return &taskqueue.Task{ Path: path, Payload: buf.Bytes(), }, nil } var taskqueueAdder = taskqueue.Add // for testing func init() { http.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) { runFunc(appengine.NewContext(req), w, req) }) } func runFunc(c context.Context, w http.ResponseWriter, req *http.Request) { defer req.Body.Close() var inv invocation if err := gob.NewDecoder(req.Body).Decode(&inv); err != nil { log.Errorf(c, "delay: failed decoding task payload: %v", err) log.Warningf(c, "delay: dropping task") return } f := funcs[inv.Key] if f == nil { log.Errorf(c, "delay: no func with key %q found", inv.Key) log.Warningf(c, "delay: dropping task") return } ft := f.fv.Type() in := []reflect.Value{reflect.ValueOf(c)} for _, arg := range inv.Args { var v reflect.Value if arg != nil { v = reflect.ValueOf(arg) } else { // Task was passed a nil argument, so we must construct // the zero value for the argument here. n := len(in) // we're constructing the nth argument var at reflect.Type if !ft.IsVariadic() || n < ft.NumIn()-1 { at = ft.In(n) } else { at = ft.In(ft.NumIn() - 1).Elem() } v = reflect.Zero(at) } in = append(in, v) } out := f.fv.Call(in) if n := ft.NumOut(); n > 0 && ft.Out(n-1) == errorType { if errv := out[n-1]; !errv.IsNil() { log.Errorf(c, "delay: func failed (will retry): %v", errv.Interface()) w.WriteHeader(http.StatusInternalServerError) return } } }