From 298dd394ed8eff2ad51965369c8bcea246eb823d Mon Sep 17 00:00:00 2001 From: Tristan Swadell Date: Thu, 18 Jun 2020 22:00:23 -0700 Subject: [PATCH] Asynchronous function invocation support --- cel/BUILD.bazel | 1 + cel/cel_test.go | 98 ++++++++++++ cel/env.go | 19 ++- cel/program.go | 185 +++++++++++++++++----- common/types/ref/provider.go | 8 + interpreter/BUILD.bazel | 2 + interpreter/activation.go | 7 +- interpreter/async.go | 244 +++++++++++++++++++++++++++++ interpreter/async_test.go | 203 ++++++++++++++++++++++++ interpreter/functions/functions.go | 31 ++-- interpreter/interpretable.go | 77 +++------ interpreter/interpreter.go | 63 +++++--- interpreter/planner.go | 60 +++++-- test/BUILD.bazel | 5 + test/async.go | 39 +++++ 15 files changed, 893 insertions(+), 149 deletions(-) create mode 100644 interpreter/async.go create mode 100644 interpreter/async_test.go create mode 100644 test/async.go diff --git a/cel/BUILD.bazel b/cel/BUILD.bazel index 73ee4ef0..07518577 100644 --- a/cel/BUILD.bazel +++ b/cel/BUILD.bazel @@ -52,6 +52,7 @@ go_test( "//common/types/ref:go_default_library", "//common/types/traits:go_default_library", "//interpreter/functions:go_default_library", + "//test:go_default_library", "//test/proto2pb:go_default_library", "//test/proto3pb:go_default_library", "@io_bazel_rules_go//proto/wkt:descriptor_go_proto", diff --git a/cel/cel_test.go b/cel/cel_test.go index 900cc555..b2c3bf45 100644 --- a/cel/cel_test.go +++ b/cel/cel_test.go @@ -15,11 +15,13 @@ package cel import ( + "context" "fmt" "io/ioutil" "log" "sync" "testing" + "time" "github.com/golang/protobuf/proto" "github.com/google/cel-go/checker/decls" @@ -32,6 +34,7 @@ import ( "github.com/google/cel-go/interpreter" "github.com/google/cel-go/interpreter/functions" "github.com/google/cel-go/parser" + "github.com/google/cel-go/test" descpb "github.com/golang/protobuf/protoc-gen-go/descriptor" proto2pb "github.com/google/cel-go/test/proto2pb" @@ -876,3 +879,98 @@ func Test_CustomInterpreterDecorator(t *testing.T) { t.Errorf("got %v as the last observed constant, wanted 1", lastConst) } } + +func Test_AsyncExtension(t *testing.T) { + env, err := NewEnv( + Declarations( + decls.NewVar("x", decls.String), + decls.NewFunction("asyncEcho", + decls.NewOverload( + "async_echo_string", + []*exprpb.Type{decls.String}, + decls.String)), + ), + ) + if err != nil { + t.Fatal(err) + } + funcs := Functions( + &functions.Overload{ + Operator: "asyncEcho", + Async: test.FakeRPC(25 * time.Millisecond), + }, + &functions.Overload{ + Operator: "async_echo_string", + Async: test.FakeRPC(25 * time.Millisecond), + }, + ) + + tests := []struct { + expr string + parseOnly bool + evalOpts EvalOption + out ref.Val + }{ + { + expr: `asyncEcho(x)`, + out: types.String("async echo success!"), + }, + { + expr: `asyncEcho(x)`, + parseOnly: true, + out: types.String("async echo success!"), + }, + { + expr: `asyncEcho(x)`, + evalOpts: OptOptimize, + out: types.String("async echo success!"), + }, + { + expr: `asyncEcho(x) == 'async echo success!'`, + evalOpts: OptOptimize | OptTrackState, + out: types.True, + }, + { + expr: `asyncEcho(x) == 'async echo success!' || true`, + evalOpts: OptOptimize | OptTrackState, + out: types.True, + }, + } + for i, tst := range tests { + tc := tst + t.Run(fmt.Sprintf("%d", i), func(tt *testing.T) { + var ast *Ast + var iss *Issues + if tc.parseOnly { + ast, iss = env.Parse(tc.expr) + } else { + ast, iss = env.Compile(tc.expr) + } + if iss.Err() != nil { + tt.Fatal(iss.Err()) + } + opts := []ProgramOption{funcs} + if tc.evalOpts != 0 { + opts = append(opts, EvalOptions(tc.evalOpts)) + } + prg, err := env.AsyncProgram(ast, opts...) + if err != nil { + tt.Fatal(err) + } + ctx := context.TODO() + out, det, err := prg.AsyncEval(ctx, map[string]interface{}{ + "x": "async echo", + }) + if err != nil { + tt.Fatal(err) + } + if out.Equal(tc.out) != types.True { + tt.Errorf("got %v, wanted %v", out, tc.out) + } + if tc.evalOpts&OptTrackState == OptTrackState && det == nil { + tt.Error("details was nil, expected non-nil") + } + }) + } + +} diff --git a/cel/env.go b/cel/env.go index 93dd2582..f5a2e9ed 100644 --- a/cel/env.go +++ b/cel/env.go @@ -307,14 +307,13 @@ func (e *Env) ParseSource(src common.Source) (*Ast, *Issues) { // Program generates an evaluable instance of the Ast within the environment (Env). func (e *Env) Program(ast *Ast, opts ...ProgramOption) (Program, error) { - optSet := e.progOpts - if len(opts) != 0 { - mergedOpts := []ProgramOption{} - mergedOpts = append(mergedOpts, e.progOpts...) - mergedOpts = append(mergedOpts, opts...) - optSet = mergedOpts - } - return newProgram(e, ast, optSet) + return e.newProgram(ast, opts /* async= */, false) +} + +// AsyncProgram generates an evaluable instance of the Ast with support for asynchronous extension +// functions. +func (e *Env) AsyncProgram(ast *Ast, opts ...ProgramOption) (AsyncProgram, error) { + return e.newProgram(ast, opts /* async= */, true) } // SetFeature sets the given feature flag, as enumerated in options.go. @@ -427,8 +426,8 @@ func (i *Issues) Err() error { if i == nil { return nil } - if len(i.errs.GetErrors()) > 0 { - return errors.New(i.errs.ToDisplayString()) + if len(i.Errors()) > 0 { + return errors.New(i.String()) } return nil } diff --git a/cel/program.go b/cel/program.go index 0e6a7bc7..4430f63f 100644 --- a/cel/program.go +++ b/cel/program.go @@ -15,6 +15,8 @@ package cel import ( + "context" + "errors" "fmt" "github.com/google/cel-go/common/types" @@ -28,7 +30,7 @@ import ( type Program interface { // Eval returns the result of an evaluation of the Ast and environment against the input vars. // - // The vars value may either be an `interpreter.Activation` or a `map[string]interface{}`. + // The argument value may either be an `interpreter.Activation` or a `map[string]interface{}`. // // If the `OptTrackState` or `OptExhaustiveEval` flags are used, the `details` response will // be non-nil. Given this caveat on `details`, the return state from evaluation will be: @@ -36,11 +38,23 @@ type Program interface { // * `val`, `details`, `nil` - Successful evaluation of a non-error result. // * `val`, `details`, `err` - Successful evaluation to an error result. // * `nil`, `details`, `err` - Unsuccessful evaluation. + Eval(interface{}) (ref.Val, *EvalDetails, error) +} + +// AsyncProgram is an evaluable view of an Ast which may contain asynchronous compute. +type AsyncProgram interface { + // AsyncEval returns the result of an evaluation of the Ast against a given input. // - // An unsuccessful evaluation is typically the result of a series of incompatible `EnvOption` - // or `ProgramOption` values used in the creation of the evaluation environment or executable - // program. - Eval(vars interface{}) (ref.Val, *EvalDetails, error) + // The input arguments (apart from Context) and return values for AsyncEval mirror those from + // the standard Eval call. + AsyncEval(context.Context, interface{}) (ref.Val, *EvalDetails, error) +} + +// programWrapper embeds both the Program and AsyncProgram interface, but in practice only one +// interface is exposed through the top-level 'cel' package exports. +type programWrapper struct { + Program + AsyncProgram } // NoVars returns an empty Activation. @@ -92,17 +106,19 @@ func (ed *EvalDetails) State() interpreter.EvalState { // prog is the internal implementation of the Program interface. type prog struct { *Env - evalOpts EvalOption - decorators []interpreter.InterpretableDecorator - defaultVars interpreter.Activation - dispatcher interpreter.Dispatcher - interpreter interpreter.Interpreter - interpretable interpreter.Interpretable - attrFactory interpreter.AttributeFactory + async bool + evalOpts EvalOption + decorators []interpreter.InterpretableDecorator + defaultVars interpreter.Activation + dispatcher interpreter.Dispatcher + interpreter *interpreter.Interpreter + interpretable interpreter.Interpretable + asyncInterpretable interpreter.AsyncInterpretable + attrFactory interpreter.AttributeFactory } // progFactory is a helper alias for marking a program creation factory function. -type progFactory func(interpreter.EvalState) (Program, error) +type progFactory func(interpreter.EvalState) (*programWrapper, error) // progGen holds a reference to a progFactory instance and implements the Program interface. type progGen struct { @@ -113,7 +129,16 @@ type progGen struct { // ProgramOption values. // // If the program cannot be configured the prog will be nil, with a non-nil error response. -func newProgram(e *Env, ast *Ast, opts []ProgramOption) (Program, error) { +func (e *Env) newProgram(ast *Ast, + opts []ProgramOption, + async bool) (*programWrapper, error) { + optSet := e.progOpts + if len(opts) != 0 { + mergedOpts := []ProgramOption{} + mergedOpts = append(mergedOpts, e.progOpts...) + mergedOpts = append(mergedOpts, opts...) + optSet = mergedOpts + } // Build the dispatcher, interpreter, and default program value. disp := interpreter.NewDispatcher() @@ -123,11 +148,12 @@ func newProgram(e *Env, ast *Ast, opts []ProgramOption) (Program, error) { Env: e, decorators: []interpreter.InterpretableDecorator{}, dispatcher: disp, + async: async, } // Configure the program via the ProgramOption values. var err error - for _, opt := range opts { + for _, opt := range optSet { if opt == nil { return nil, fmt.Errorf("program options should be non-nil") } @@ -159,12 +185,13 @@ func newProgram(e *Env, ast *Ast, opts []ProgramOption) (Program, error) { if p.evalOpts&OptExhaustiveEval == OptExhaustiveEval { // State tracking requires that each Eval() call operate on an isolated EvalState // object; hence, the presence of the factory. - factory := func(state interpreter.EvalState) (Program, error) { + factory := func(state interpreter.EvalState) (*programWrapper, error) { decs := append(decorators, interpreter.ExhaustiveEval(state)) clone := &prog{ evalOpts: p.evalOpts, defaultVars: p.defaultVars, Env: e, + async: p.async, dispatcher: disp, interpreter: interp} return initInterpretable(clone, ast, decs) @@ -174,12 +201,13 @@ func newProgram(e *Env, ast *Ast, opts []ProgramOption) (Program, error) { // Enable state tracking last since it too requires the factory approach but is less // featured than the ExhaustiveEval decorator. if p.evalOpts&OptTrackState == OptTrackState { - factory := func(state interpreter.EvalState) (Program, error) { + factory := func(state interpreter.EvalState) (*programWrapper, error) { decs := append(decorators, interpreter.TrackState(state)) clone := &prog{ evalOpts: p.evalOpts, defaultVars: p.defaultVars, Env: e, + async: p.async, dispatcher: disp, interpreter: interp} return initInterpretable(clone, ast, decs) @@ -191,31 +219,37 @@ func newProgram(e *Env, ast *Ast, opts []ProgramOption) (Program, error) { // initProgGen tests the factory object by calling it once and returns a factory-based Program if // the test is successful. -func initProgGen(factory progFactory) (Program, error) { +func initProgGen(factory progFactory) (*programWrapper, error) { // Test the factory to make sure that configuration errors are spotted at config _, err := factory(interpreter.NewEvalState()) if err != nil { return nil, err } - return &progGen{factory: factory}, nil + pg := &progGen{factory: factory} + wrapper := &programWrapper{Program: pg, AsyncProgram: pg} + return wrapper, nil } // initIterpretable creates a checked or unchecked interpretable depending on whether the Ast // has been run through the type-checker. -func initInterpretable( - p *prog, +func initInterpretable(p *prog, ast *Ast, - decorators []interpreter.InterpretableDecorator) (Program, error) { + decorators []interpreter.InterpretableDecorator) (*programWrapper, error) { var err error // Unchecked programs do not contain type and reference information and may be // slower to execute than their checked counterparts. if !ast.IsChecked() { - p.interpretable, err = - p.interpreter.NewUncheckedInterpretable(ast.Expr(), decorators...) + if p.async { + p.asyncInterpretable, err = + p.interpreter.NewAsyncUncheckedInterpretable(ast.Expr(), decorators...) + } else { + p.interpretable, err = + p.interpreter.NewUncheckedInterpretable(ast.Expr(), decorators...) + } if err != nil { return nil, err } - return p, nil + return &programWrapper{Program: p, AsyncProgram: p}, nil } // When the AST has been checked it contains metadata that can be used to speed up program // execution. @@ -224,16 +258,25 @@ func initInterpretable( if err != nil { return nil, err } - p.interpretable, err = p.interpreter.NewInterpretable(checked, decorators...) + if p.async { + p.asyncInterpretable, err = + p.interpreter.NewAsyncInterpretable(checked, decorators...) + } else { + p.interpretable, err = p.interpreter.NewInterpretable(checked, decorators...) + } if err != nil { return nil, err } - - return p, nil + return &programWrapper{Program: p, AsyncProgram: p}, nil } // Eval implements the Program interface method. func (p *prog) Eval(input interface{}) (v ref.Val, det *EvalDetails, err error) { + // In general this should never happen, since only one view (sync, async) is returned back to + // the caller. + if p.interpretable == nil { + return nil, nil, errors.New("async program invoked synchronously") + } // Configure error recovery for unexpected panics during evaluation. Note, the use of named // return values makes it possible to modify the error response during the recovery // function. @@ -242,16 +285,39 @@ func (p *prog) Eval(input interface{}) (v ref.Val, det *EvalDetails, err error) err = fmt.Errorf("internal error: %v", r) } }() - // Build a hierarchical activation if there are default vars set. - vars, err := interpreter.NewActivation(input) + vars, err := p.vars(input) + v = p.interpretable.Eval(vars) + // The output of an internal Eval may have a value (`v`) that is a types.Err. This step + // translates the CEL value to a Go error response. This interface does not quite match the + // RPC signature which allows for multiple errors to be returned, but should be sufficient. + if types.IsError(v) { + err = v.Value().(error) + } + return +} + +// AsyncEval implements the AsyncProgram interface method. +func (p *prog) AsyncEval(ctx context.Context, + input interface{}) (v ref.Val, det *EvalDetails, err error) { + // In general this should never happen, since only one view (sync, async) is returned back to + // the caller. + if p.asyncInterpretable == nil { + return nil, nil, errors.New("sync program invoked asynchronously") + } + // Configure error recovery for unexpected panics during evaluation. Note, the use of named + // return values makes it possible to modify the error response during the recovery + // function. + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("internal error: %v", r) + } + }() + asyncVars, err := p.asyncVars(input) if err != nil { return } - if p.defaultVars != nil { - vars = interpreter.NewHierarchicalActivation(p.defaultVars, vars) - } - v = p.interpretable.Eval(vars) - // The output of an internal Eval may have a value (`v`) that is a types.Err. This step + v = p.asyncInterpretable.AsyncEval(ctx, asyncVars) + // The output of an internal AsyncEval may have a value (`v`) that is a types.Err. This step // translates the CEL value to a Go error response. This interface does not quite match the // RPC signature which allows for multiple errors to be returned, but should be sufficient. if types.IsError(v) { @@ -260,6 +326,30 @@ func (p *prog) Eval(input interface{}) (v ref.Val, det *EvalDetails, err error) return } +// asyncVars creates an AsyncActivation suitable for tracking async invocations in a manner which +// can be used to orchestrate async calls needed to complete expression evaluation. +func (p *prog) asyncVars(input interface{}) (*interpreter.AsyncActivation, error) { + vars, err := p.vars(input) + if err != nil { + return nil, err + } + return interpreter.NewAsyncActivation(vars), nil +} + +// vars creates an Activation from the input, and if applicable extends the set of default values +// configured via ProgramOptions. +func (p *prog) vars(input interface{}) (interpreter.Activation, error) { + vars, err := interpreter.NewActivation(input) + if err != nil { + return nil, err + } + // Build a hierarchical activation if there are default vars set. + if p.defaultVars != nil { + return interpreter.NewHierarchicalActivation(p.defaultVars, vars), nil + } + return vars, nil +} + // Eval implements the Program interface method. func (gen *progGen) Eval(input interface{}) (ref.Val, *EvalDetails, error) { // The factory based Eval() differs from the standard evaluation model in that it generates a @@ -283,3 +373,28 @@ func (gen *progGen) Eval(input interface{}) (ref.Val, *EvalDetails, error) { } return v, det, nil } + +// AsyncEval implements the AsyncProgram interface method. +func (gen *progGen) AsyncEval(ctx context.Context, + input interface{}) (ref.Val, *EvalDetails, error) { + // The factory based AsyncEval() differs from the standard evaluation model in that it + // generates a new EvalState instance for each call to ensure that unique evaluations yield + // unique stateful results. + state := interpreter.NewEvalState() + det := &EvalDetails{state: state} + + // Generate a new instance of the interpretable using the factory configured during the call to + // newProgram(). It is incredibly unlikely that the factory call will generate an error given + // the factory test performed within the Program() call. + p, err := gen.factory(state) + if err != nil { + return nil, det, err + } + + // Evaluate the input, returning the result and the 'state' within EvalDetails. + v, _, err := p.AsyncEval(ctx, input) + if err != nil { + return v, det, err + } + return v, det, nil +} diff --git a/common/types/ref/provider.go b/common/types/ref/provider.go index 541dbdbf..70c17ac9 100644 --- a/common/types/ref/provider.go +++ b/common/types/ref/provider.go @@ -101,3 +101,11 @@ type FieldTester func(target interface{}) bool // FieldGetter is used to get the field value from an input object, if set. type FieldGetter func(target interface{}) (interface{}, error) + +// Resolver abstracts variable and type identifier resolution behind a single interface +// method. +type Resolver interface { + // ResolveName returns the value associated with the given fully qualified name, if + // present. + ResolveName(name string) (interface{}, bool) +} diff --git a/interpreter/BUILD.bazel b/interpreter/BUILD.bazel index 09b7a45d..0c9f5a2c 100644 --- a/interpreter/BUILD.bazel +++ b/interpreter/BUILD.bazel @@ -9,6 +9,7 @@ go_library( name = "go_default_library", srcs = [ "activation.go", + "async.go", "attributes.go", "attribute_patterns.go", "decorators.go", @@ -42,6 +43,7 @@ go_test( name = "go_default_test", srcs = [ "activation_test.go", + "async_test.go", "attributes_test.go", "attribute_patterns_test.go", "interpreter_test.go", diff --git a/interpreter/activation.go b/interpreter/activation.go index 7eb22f74..566994db 100644 --- a/interpreter/activation.go +++ b/interpreter/activation.go @@ -37,9 +37,7 @@ type Activation interface { // EmptyActivation returns a variable free activation. func EmptyActivation() Activation { - // This call cannot fail. - a, _ := NewActivation(map[string]interface{}{}) - return a + return emptyActivation } // NewActivation returns an activation based on a map-based binding where the map keys are @@ -197,6 +195,9 @@ func (v *varActivation) ResolveName(name string) (interface{}, bool) { } var ( + // emptyActivation is a singleton activation which provides no input + emptyActivation = &mapActivation{bindings: map[string]interface{}{}} + // pool of var activations to reduce allocations during folds. varActivationPool = &sync.Pool{ New: func() interface{} { diff --git a/interpreter/async.go b/interpreter/async.go new file mode 100644 index 00000000..b2a89973 --- /dev/null +++ b/interpreter/async.go @@ -0,0 +1,244 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package interpreter + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/google/cel-go/common/types" + "github.com/google/cel-go/common/types/ref" + "github.com/google/cel-go/interpreter/functions" +) + +// AsyncInterpretable supports evaluation with async extension functions. +type AsyncInterpretable interface { + // ID of the interpretable expression. + ID() int64 + + // AsyncEval drives the evaluation of an Interpretable program which may invoke asynchronous + // calls. + AsyncEval(context.Context, *AsyncActivation) ref.Val +} + +// asyncEval is the default implementation of the AsyncInterpretable interface. +type asyncEval struct { + Interpretable +} + +// AsyncEval implements the AsyncInterpretable interface method and applies the following +// algorithm: +// +// - Evaluate synchronously. If the result is not of types.Unkonwn, return the result. +// - Otherwise for each unknown value determine if the expression id maps to the expression id +// of an async call. +// - For each async call and unique argument set found, invoke the async implementation. +// - Async call results are memoized with their corresponding argument sets. +// - Revaluate synchronously. Repeat if unknowns are still present and progress (at least one +// async call invocation has been made). +// +// The process of incremental evaluation may be strictly more expensive than some alternative +// approaches; however, the async call is expected to dominate the execution time by several +// orders of magnitude. +func (async *asyncEval) AsyncEval(ctx context.Context, vars *AsyncActivation) ref.Val { + res := async.Interpretable.Eval(vars) + for types.IsUnknown(res) { + progress := false + unk := res.(types.Unknown) + for _, id := range unk { + call, found := vars.findCall(id) + if !found { + continue + } + call.evals.Range(func(idx, rawArgs interface{}) bool { + asyncResult, hasResult := call.results.Load(idx) + if hasResult { + return true + } + progress = true + args := rawArgs.([]ref.Val) + asyncResult = call.impl(ctx, vars, args) + call.results.Store(idx, asyncResult) + return false + }) + } + if !progress { + break + } + res = async.Interpretable.Eval(vars) + } + return res +} + +// NewAsyncActivation returns an AsyncActivation capable of tracking async calls relevant to a +// single evaluation pass through an AsyncInterpretable object. +// +func NewAsyncActivation(vars Activation) *AsyncActivation { + return &AsyncActivation{Activation: vars} +} + +// AsyncActivation tracks calls by expression identifier and overload name for use in subsequent +// async function invocations when the result cannot be computed without context provided by the +// remote call. +// +// Note, this object is concurrency safe, but should not be reused across invocations. +type AsyncActivation struct { + Activation + asyncCalls sync.Map // map[int64]*asyncCall +} + +// findCall returns an async call matching the given expression id, if any. +func (act *AsyncActivation) findCall(id int64) (*asyncCall, bool) { + call, found := act.asyncCalls.Load(id) + if found { + return call.(*asyncCall), true + } + return nil, false +} + +// findCallbyOverload returns an async call matching the given overload identifer, if any. +// +// Note, the same overload may be referenced in multiple locations within the expression. +// This call will return the first instance of the call evaluated for the purpose of aggregating +// related arugments sets to the underlying function and deduping them. +func (act *AsyncActivation) findCallByOverload(overload string) (*asyncCall, bool) { + var call *asyncCall + act.asyncCalls.Range(func(_, ac interface{}) bool { + asyncCall := ac.(*asyncCall) + if asyncCall.overload == overload { + call = asyncCall + return false + } + return true + }) + if call != nil { + return call, true + } + return nil, false +} + +// putCall stores the asyncCall by its expression id. +func (act *AsyncActivation) putCall(id int64, call *asyncCall) { + act.asyncCalls.Store(id, call) +} + +// newAsyncCall creates a new asyncCall instance capable of tracking argument sets to the call. +func newAsyncCall(id int64, overload string, impl functions.AsyncOp) *asyncCall { + var idx int32 + return &asyncCall{ + id: id, + overload: overload, + impl: impl, + nextCallIdx: &idx, + } +} + +// asyncCall tracks argument sets and associated results for a given async call. +type asyncCall struct { + id int64 + impl functions.AsyncOp + overload string + nextCallIdx *int32 + evals sync.Map // [][]ref.Val + results sync.Map // []ref.Val +} + +// Eval implements the Interpretable interface method, tracks argument sets, and returns memoized +// results, if present. +func (ac *asyncCall) Eval(args []ref.Val) ref.Val { + argsFound := false + idx := atomic.LoadInt32(ac.nextCallIdx) + ac.evals.Range(func(i, rawArgs interface{}) bool { + asyncArgs := rawArgs.([]ref.Val) + for j, arg := range args { + if arg.Equal(asyncArgs[j]) != types.True { + return true + } + } + idx = i.(int32) + argsFound = true + return false + }) + val, resultFound := ac.results.Load(idx) + // args found and there's a result, return it. + if resultFound { + return val.(ref.Val) + } + // args found, but no result yet, return unknown. + if argsFound { + return types.Unknown{ac.id} + } + // Args not tracked, track them here. + swapped := false + for !swapped { + swapped = atomic.CompareAndSwapInt32(ac.nextCallIdx, idx, idx+1) + idx++ + } + ac.evals.Store(idx, args) + return types.Unknown{ac.id} +} + +// evalAsyncCall implements the InterpretableCall interface and is the entry point into async +// function evaluation coordination. +type evalAsyncCall struct { + id int64 + function string + overload string + args []Interpretable + impl functions.AsyncOp +} + +// ID returns the expression identifier where the call is referenced. +func (async *evalAsyncCall) ID() int64 { + return async.id +} + +// Function implements the InterpretableCall interface method. +func (async *evalAsyncCall) Function() string { + return async.function +} + +// OverloadID implements the InterpretableCall interface method. +func (async *evalAsyncCall) OverloadID() string { + return async.overload +} + +// Args returns the argument to the unary function. +func (async *evalAsyncCall) Args() []Interpretable { + return async.args +} + +// Eval tracks the arguments provided to the underlying async invocation, deduping argument sets +// if possible. +func (async *evalAsyncCall) Eval(vars Activation) ref.Val { + argVals := make([]ref.Val, len(async.args), len(async.args)) + // Early return if any argument to the function is unknown or error. + for i, arg := range async.args { + argVals[i] = arg.Eval(vars) + if types.IsUnknownOrError(argVals[i]) { + return argVals[i] + } + } + // Attempt to return the result if one exists. Note, this can be pretty expensive, + // but presumably cheaper than actually invoking the async call. + asyncVars := vars.(*AsyncActivation) + calls, found := asyncVars.findCallByOverload(async.overload) + if !found { + calls = newAsyncCall(async.id, async.overload, async.impl) + } + asyncVars.putCall(async.id, calls) + return calls.Eval(argVals) +} diff --git a/interpreter/async_test.go b/interpreter/async_test.go new file mode 100644 index 00000000..e3089133 --- /dev/null +++ b/interpreter/async_test.go @@ -0,0 +1,203 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package interpreter + +import ( + "context" + "fmt" + "reflect" + "testing" + "time" + + "github.com/google/cel-go/common/types" + "github.com/google/cel-go/common/types/ref" + "github.com/google/cel-go/test" +) + +func TestAsyncEval_CallTracking(t *testing.T) { + var tests = []struct { + lhs ref.Val + lhsOverload string + lhsTimeout time.Duration + rhs ref.Val + rhsOverload string + rhsTimeout time.Duration + in map[string]interface{} + out ref.Val + }{ + { + lhs: types.String("x success!"), + lhsTimeout: 50 * time.Millisecond, + lhsOverload: "same_async", + rhs: types.String("y success!"), + rhsTimeout: 50 * time.Millisecond, + rhsOverload: "same_async", + in: map[string]interface{}{"x": "x", "y": "y"}, + out: types.True, + }, + { + lhs: types.String("x success!"), + lhsTimeout: 50 * time.Millisecond, + lhsOverload: "same_async", + rhs: types.String("y success!"), + rhsTimeout: 1 * time.Millisecond, + rhsOverload: "alt_async", + in: map[string]interface{}{"x": "x", "y": "y"}, + out: types.True, + }, + { + lhs: types.String("x success!"), + lhsTimeout: 1 * time.Millisecond, + lhsOverload: "alt_async", + rhs: types.String("y success!"), + rhsTimeout: 50 * time.Millisecond, + rhsOverload: "same_async", + in: map[string]interface{}{"x": "x", "y": "y"}, + out: types.True, + }, + { + lhs: types.String("x success!"), + lhsTimeout: 1 * time.Millisecond, + lhsOverload: "alt_async", + rhs: types.String("y success!"), + rhsTimeout: 50 * time.Millisecond, + rhsOverload: "same_async", + in: map[string]interface{}{"x": "x", "y": "not y"}, + out: types.NewErr("context deadline exceeded"), + }, + { + lhs: types.String("x success!"), + rhs: types.String("y success!"), + lhsTimeout: 1 * time.Millisecond, + rhsTimeout: 1 * time.Millisecond, + in: map[string]interface{}{"x": "x", "y": "y"}, + out: types.NewErr("context deadline exceeded"), + }, + { + lhs: types.Unknown{42}, + lhsTimeout: 50 * time.Millisecond, + rhs: types.String("y success!"), + rhsTimeout: 50 * time.Millisecond, + in: map[string]interface{}{"x": "x", "y": "y"}, + out: types.True, + }, + { + lhs: types.Unknown{42}, + lhsTimeout: 50 * time.Millisecond, + rhs: types.String("y success!"), + rhsTimeout: 50 * time.Millisecond, + in: map[string]interface{}{"x": "x", "y": "z"}, + out: types.Unknown{42}, + }, + { + lhs: types.String("x success!"), + lhsTimeout: 50 * time.Millisecond, + lhsOverload: "same_async", + rhs: types.String("x success!"), + rhsTimeout: 50 * time.Millisecond, + rhsOverload: "same_async", + in: map[string]interface{}{"x": "x", "y": "x"}, + out: types.True, + }, + { + lhs: types.String("x success!"), + lhsTimeout: 50 * time.Millisecond, + lhsOverload: "same_async", + rhs: types.String(" success!"), + rhsTimeout: 50 * time.Millisecond, + rhsOverload: "same_async", + in: map[string]interface{}{ + "x": types.Unknown{42}, + "y": "not y", + }, + out: types.Unknown{42}, + }, + { + lhs: types.String("x success!"), + lhsTimeout: 50 * time.Millisecond, + lhsOverload: "same_async", + rhs: types.String("y success!"), + rhsTimeout: 50 * time.Millisecond, + rhsOverload: "same_async", + in: map[string]interface{}{ + "x": types.Unknown{42}, + "y": "y", + }, + out: types.True, + }, + } + xVar := &absoluteAttribute{ + id: 2, + namespaceNames: []string{"x"}, + } + yVar := &absoluteAttribute{ + id: 4, + namespaceNames: []string{"y"}, + } + + for i, tst := range tests { + tc := tst + t.Run(fmt.Sprintf("%d", i), func(tt *testing.T) { + testX := makeTestEq(6, + tc.lhsOverload, xVar, NewConstValue(8, tc.lhs), tc.lhsTimeout) + testY := makeTestEq(10, + tc.rhsOverload, yVar, NewConstValue(12, tc.rhs), tc.rhsTimeout) + logic := &evalOr{ + id: 14, + lhs: testX, + rhs: testY, + } + async := &asyncEval{Interpretable: logic} + in, err := NewActivation(tc.in) + if err != nil { + tt.Fatal(err) + } + vars := NewAsyncActivation(in) + ctx := context.TODO() + out := async.AsyncEval(ctx, vars) + if !reflect.DeepEqual(out, tc.out) { + tt.Errorf("got %v, wanted %v", out, tc.out) + } + outCached := async.AsyncEval(ctx, vars) + if !reflect.DeepEqual(out, outCached) { + tt.Errorf("got %v, wanted %v", outCached, out) + } + }) + } +} + +func makeTestEq(id int64, + overload string, + arg Attribute, + value Interpretable, + timeout time.Duration) Interpretable { + test := &evalAsyncCall{ + id: id, + function: "asyncEcho", + overload: overload, + args: []Interpretable{ + &evalAttr{ + attr: arg, + adapter: types.DefaultTypeAdapter, + }, + }, + impl: test.FakeRPC(timeout), + } + return &evalEq{ + id: id + 1, + lhs: test, + rhs: value, + } +} diff --git a/interpreter/functions/functions.go b/interpreter/functions/functions.go index 4ca706e9..a1affee3 100644 --- a/interpreter/functions/functions.go +++ b/interpreter/functions/functions.go @@ -16,7 +16,11 @@ // interpreter and as declared within the checker#StandardDeclarations. package functions -import "github.com/google/cel-go/common/types/ref" +import ( + "context" + + "github.com/google/cel-go/common/types/ref" +) // Overload defines a named overload of a function, indicating an operand trait // which must be present on the first argument to the overload as well as one @@ -27,8 +31,7 @@ import "github.com/google/cel-go/common/types/ref" // types with operator overloads. Any added complexity is assumed to be handled // by the generic FunctionOp. type Overload struct { - // Operator name as written in an expression or defined within - // operators.go. + // Operator name as written in an expression or defined within operators.go. Operator string // Operand trait used to dispatch the call. The zero-value indicates a @@ -42,17 +45,21 @@ type Overload struct { // Binary defines the overload with a BinaryOp implementation. May be nil. Binary BinaryOp - // Function defines the overload with a FunctionOp implementation. May be - // nil. + // Function defines the overload with a FunctionOp implementation. May be nil. Function FunctionOp + + // Async defines an overload with an AsyncOp implementation. May be nil. + Async AsyncOp } -// UnaryOp is a function that takes a single value and produces an output. -type UnaryOp func(value ref.Val) ref.Val +// UnaryOp is a function that takes a single argument. +type UnaryOp func(ref.Val) ref.Val + +// BinaryOp is a function that takes two arguments. +type BinaryOp func(ref.Val, ref.Val) ref.Val -// BinaryOp is a function that takes two values and produces an output. -type BinaryOp func(lhs ref.Val, rhs ref.Val) ref.Val +// FunctionOp is a function with accepts zero or more arguments. +type FunctionOp func(...ref.Val) ref.Val -// FunctionOp is a function with accepts zero or more arguments and produces -// an value (as interface{}) or error as a result. -type FunctionOp func(values ...ref.Val) ref.Val +// AsyncOp is an asynchronous function which accepts a Context value and arguments. +type AsyncOp func(context.Context, ref.Resolver, []ref.Val) ref.Val diff --git a/interpreter/interpretable.go b/interpreter/interpretable.go index f349a6bc..296bdd91 100644 --- a/interpreter/interpretable.go +++ b/interpreter/interpretable.go @@ -31,7 +31,7 @@ type Interpretable interface { ID() int64 // Eval an Activation to produce an output. - Eval(activation Activation) ref.Val + Eval(Activation) ref.Val } // InterpretableConst interface for tracking whether the Interpretable is a constant value. @@ -184,19 +184,7 @@ func (or *evalOr) Eval(ctx Activation) ref.Val { if lok && rok { return types.False } - // TODO: return both values as a set if both are unknown or error. - // prefer left unknown to right unknown. - if types.IsUnknown(lVal) { - return lVal - } - if types.IsUnknown(rVal) { - return rVal - } - // If the left-hand side is non-boolean return it as the error. - if types.IsError(lVal) { - return lVal - } - return types.ValOrErr(rVal, "no such overload") + return logicallyMergeUnkErr("||", lVal, rVal) } type evalAnd struct { @@ -228,19 +216,7 @@ func (and *evalAnd) Eval(ctx Activation) ref.Val { if lok && rok { return types.True } - // TODO: return both values as a set if both are unknown or error. - // prefer left unknown to right unknown. - if types.IsUnknown(lVal) { - return lVal - } - if types.IsUnknown(rVal) { - return rVal - } - // If the left-hand side is non-boolean return it as the error. - if types.IsError(lVal) { - return lVal - } - return types.ValOrErr(rVal, "no such overload") + return logicallyMergeUnkErr("&&", lVal, rVal) } type evalEq struct { @@ -735,18 +711,7 @@ func (or *evalExhaustiveOr) Eval(ctx Activation) ref.Val { if lok && rok { return types.False } - if types.IsUnknown(lVal) { - return lVal - } - if types.IsUnknown(rVal) { - return rVal - } - // TODO: Combine the errors into a set in the future. - // If the left-hand side is non-boolean return it as the error. - if types.IsError(lVal) { - return lVal - } - return types.ValOrErr(rVal, "no such overload") + return logicallyMergeUnkErr("||", lVal, rVal) } // evalExhaustiveAnd is just like evalAnd, but does not short-circuit argument evaluation. @@ -776,18 +741,7 @@ func (and *evalExhaustiveAnd) Eval(ctx Activation) ref.Val { if lok && rok { return types.True } - if types.IsUnknown(lVal) { - return lVal - } - if types.IsUnknown(rVal) { - return rVal - } - // TODO: Combine the errors into a set in the future. - // If the left-hand side is non-boolean return it as the error. - if types.IsError(lVal) { - return lVal - } - return types.ValOrErr(rVal, "no such overload") + return logicallyMergeUnkErr("&&", lVal, rVal) } // evalExhaustiveConditional is like evalConditional, but does not short-circuit argument @@ -907,3 +861,24 @@ func (a *evalAttr) AddQualifier(qual Qualifier) (InterpretableAttribute, error) _, err := a.attr.AddQualifier(qual) return a, err } + +func logicallyMergeUnkErr(op string, value, other ref.Val) ref.Val { + vUnk, vIsUnk := value.(types.Unknown) + oUnk, oIsUnk := other.(types.Unknown) + if vIsUnk && oIsUnk { + return append(vUnk, oUnk...) + } + if vIsUnk { + return vUnk + } + if oIsUnk { + return oUnk + } + if types.IsError(value) { + return value + } + if types.IsError(other) { + return other + } + return types.NewErr("no such overload: %v %s %v", value, op, other) +} diff --git a/interpreter/interpreter.go b/interpreter/interpreter.go index 7dacb885..0260467e 100644 --- a/interpreter/interpreter.go +++ b/interpreter/interpreter.go @@ -25,19 +25,6 @@ import ( exprpb "google.golang.org/genproto/googleapis/api/expr/v1alpha1" ) -// Interpreter generates a new Interpretable from a checked or unchecked expression. -type Interpreter interface { - // NewInterpretable creates an Interpretable from a checked expression and an - // optional list of InterpretableDecorator values. - NewInterpretable(checked *exprpb.CheckedExpr, - decorators ...InterpretableDecorator) (Interpretable, error) - - // NewUncheckedInterpretable returns an Interpretable from a parsed expression - // and an optional list of InterpretableDecorator values. - NewUncheckedInterpretable(expr *exprpb.Expr, - decorators ...InterpretableDecorator) (Interpretable, error) -} - // TrackState decorates each expression node with an observer which records the value // associated with the given expression id. EvalState must be provided to the decorator. // This decorator is not thread-safe, and the EvalState must be reset between Eval() @@ -73,7 +60,8 @@ func Optimize() InterpretableDecorator { return decOptimize() } -type exprInterpreter struct { +// Interpreter generates a new Interpretable from a checked or unchecked expression. +type Interpreter struct { dispatcher Dispatcher packager packages.Packager provider ref.TypeProvider @@ -86,8 +74,8 @@ type exprInterpreter struct { func NewInterpreter(dispatcher Dispatcher, packager packages.Packager, provider ref.TypeProvider, adapter ref.TypeAdapter, - attrFactory AttributeFactory) Interpreter { - return &exprInterpreter{ + attrFactory AttributeFactory) *Interpreter { + return &Interpreter{ dispatcher: dispatcher, packager: packager, provider: provider, @@ -100,14 +88,15 @@ func NewInterpreter(dispatcher Dispatcher, packager packages.Packager, func NewStandardInterpreter(packager packages.Packager, provider ref.TypeProvider, adapter ref.TypeAdapter, - resolver AttributeFactory) Interpreter { + resolver AttributeFactory) *Interpreter { dispatcher := NewDispatcher() dispatcher.Add(functions.StandardOverloads()...) return NewInterpreter(dispatcher, packager, provider, adapter, resolver) } -// NewIntepretable implements the Interpreter interface method. -func (i *exprInterpreter) NewInterpretable( +// NewInterpretable creates an Interpretable from a checked expression and an +// optional list of InterpretableDecorator values. +func (i *Interpreter) NewInterpretable( checked *exprpb.CheckedExpr, decorators ...InterpretableDecorator) (Interpretable, error) { p := newPlanner( @@ -121,8 +110,25 @@ func (i *exprInterpreter) NewInterpretable( return p.Plan(checked.GetExpr()) } -// NewUncheckedIntepretable implements the Interpreter interface method. -func (i *exprInterpreter) NewUncheckedInterpretable( +// NewAsyncInterpretable creates an CEL program from a type-checked expression which +// supports asynchronous extension functions. +func (i *Interpreter) NewAsyncInterpretable( + checked *exprpb.CheckedExpr, + decorators ...InterpretableDecorator) (AsyncInterpretable, error) { + p := newPlanner( + i.dispatcher, + i.provider, + i.adapter, + i.attrFactory, + i.packager, + checked, + decorators...) + return p.AsyncPlan(checked.GetExpr()) +} + +// NewUncheckedInterpretable returns an Interpretable from a parsed expression +// and an optional list of InterpretableDecorator values. +func (i *Interpreter) NewUncheckedInterpretable( expr *exprpb.Expr, decorators ...InterpretableDecorator) (Interpretable, error) { p := newUncheckedPlanner( @@ -134,3 +140,18 @@ func (i *exprInterpreter) NewUncheckedInterpretable( decorators...) return p.Plan(expr) } + +// NewAsyncUncheckedInterpretable creates an CEL program from a parse-only expression which +// supports asynchronous extension functions. +func (i *Interpreter) NewAsyncUncheckedInterpretable( + expr *exprpb.Expr, + decorators ...InterpretableDecorator) (AsyncInterpretable, error) { + p := newUncheckedPlanner( + i.dispatcher, + i.provider, + i.adapter, + i.attrFactory, + i.packager, + decorators...) + return p.AsyncPlan(expr) +} diff --git a/interpreter/planner.go b/interpreter/planner.go index d3da79ab..998f5437 100644 --- a/interpreter/planner.go +++ b/interpreter/planner.go @@ -27,23 +27,17 @@ import ( exprpb "google.golang.org/genproto/googleapis/api/expr/v1alpha1" ) -// interpretablePlanner creates an Interpretable evaluation plan from a proto Expr value. -type interpretablePlanner interface { - // Plan generates an Interpretable value (or error) from the input proto Expr. - Plan(expr *exprpb.Expr) (Interpretable, error) -} - -// newPlanner creates an interpretablePlanner which references a Dispatcher, TypeProvider, -// TypeAdapter, Packager, and CheckedExpr value. These pieces of data are used to resolve -// functions, types, and namespaced identifiers at plan time rather than at runtime since -// it only needs to be done once and may be semi-expensive to compute. +// newPlanner creates a planner which references a Dispatcher, TypeProvider, TypeAdapter, +// Packager, and CheckedExpr value. These pieces of data are used to resolve functions, +// types, and namespaced identifiers at plan time rather than at runtime since it only +// needs to be done once and may be semi-expensive to compute. func newPlanner(disp Dispatcher, provider ref.TypeProvider, adapter ref.TypeAdapter, attrFactory AttributeFactory, pkg packages.Packager, checked *exprpb.CheckedExpr, - decorators ...InterpretableDecorator) interpretablePlanner { + decorators ...InterpretableDecorator) *planner { return &planner{ disp: disp, provider: provider, @@ -56,15 +50,15 @@ func newPlanner(disp Dispatcher, } } -// newUncheckedPlanner creates an interpretablePlanner which references a Dispatcher, TypeProvider, -// TypeAdapter, and Packager to resolve functions and types at plan time. Namespaces present in -// Select expressions are resolved lazily at evaluation time. +// newUncheckedPlanner creates a planner which references a Dispatcher, TypeProvider, TypeAdapter, +// and Packager to resolve functions and types at plan time. Namespaces present in Select +// expressions are resolved lazily at evaluation time. func newUncheckedPlanner(disp Dispatcher, provider ref.TypeProvider, adapter ref.TypeAdapter, attrFactory AttributeFactory, pkg packages.Packager, - decorators ...InterpretableDecorator) interpretablePlanner { + decorators ...InterpretableDecorator) *planner { return &planner{ disp: disp, provider: provider, @@ -77,7 +71,7 @@ func newUncheckedPlanner(disp Dispatcher, } } -// planner is an implementatio of the interpretablePlanner interface. +// planner generates an Interpretable execution plan from a CEL Ast. type planner struct { disp Dispatcher provider ref.TypeProvider @@ -89,6 +83,20 @@ type planner struct { decorators []InterpretableDecorator } +// AsyncPlan wraps an Interpretable CEL program in an async evaluation driver. +// +// Async evaluation will stub calls to asynchronous functions, capturing arguments, deduping +// argument sets, and evaluating synchronously if possible. If the synchronous evaluation returns a +// types.Unknown value implicating one or more unresolved async calls, the calls are progressively +// invoked until the evaluation resolves into an error or value. +func (p *planner) AsyncPlan(expr *exprpb.Expr) (AsyncInterpretable, error) { + interp, err := p.Plan(expr) + if err != nil { + return nil, err + } + return &asyncEval{Interpretable: interp}, nil +} + // Plan implements the interpretablePlanner interface. This implementation of the Plan method also // applies decorators to each Interpretable generated as part of the overall plan. Decorators are // useful for layering functionality into the evaluation that is not natively understood by CEL, @@ -297,6 +305,9 @@ func (p *planner) planCall(expr *exprpb.Expr) (Interpretable, error) { if fnDef == nil { fnDef, _ = p.disp.FindOverload(fnName) } + if fnDef != nil && fnDef.Async != nil { + return p.planCallAsync(expr, fnName, oName, fnDef, args) + } switch argCount { case 0: return p.planCallZero(expr, fnName, oName, fnDef) @@ -309,6 +320,21 @@ func (p *planner) planCall(expr *exprpb.Expr) (Interpretable, error) { } } +// planCallAsync returns an evaluable object which supports asynchronous evaluation. +func (p *planner) planCallAsync(expr *exprpb.Expr, + function string, + overload string, + impl *functions.Overload, + args []Interpretable) (Interpretable, error) { + return &evalAsyncCall{ + id: expr.GetId(), + function: function, + overload: overload, + impl: impl.Async, + args: args, + }, nil +} + // planCallZero generates a zero-arity callable Interpretable. func (p *planner) planCallZero(expr *exprpb.Expr, function string, @@ -318,7 +344,7 @@ func (p *planner) planCallZero(expr *exprpb.Expr, return nil, fmt.Errorf("no such overload: %s()", function) } return &evalZeroArity{ - id: expr.Id, + id: expr.GetId(), function: function, overload: overload, impl: impl.Function, diff --git a/test/BUILD.bazel b/test/BUILD.bazel index 0a415e47..89c91b54 100644 --- a/test/BUILD.bazel +++ b/test/BUILD.bazel @@ -2,6 +2,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") package( default_visibility = [ + "//cel:__subpackages__", "//checker:__subpackages__", "//common:__subpackages__", "//interpreter:__subpackages__", @@ -14,12 +15,16 @@ package( go_library( name = "go_default_library", srcs = [ + "async.go", "compare.go", "expr.go", ], importpath = "github.com/google/cel-go/test", deps = [ "//common/operators:go_default_library", + "//common/types:go_default_library", + "//common/types/ref:go_default_library", + "//interpreter/functions:go_default_library", "@com_github_golang_protobuf//proto:go_default_library", "@io_bazel_rules_go//proto/wkt:struct_go_proto", "@org_golang_google_genproto//googleapis/api/expr/v1alpha1:go_default_library", diff --git a/test/async.go b/test/async.go new file mode 100644 index 00000000..344b7e22 --- /dev/null +++ b/test/async.go @@ -0,0 +1,39 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "context" + "time" + + "github.com/google/cel-go/common/types" + "github.com/google/cel-go/common/types/ref" + "github.com/google/cel-go/interpreter/functions" +) + +func FakeRPC(timeout time.Duration) functions.AsyncOp { + return func(ctx context.Context, vars ref.Resolver, args []ref.Val) ref.Val { + rpcCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + time.Sleep(20 * time.Millisecond) + select { + case <-rpcCtx.Done(): + return types.NewErr(rpcCtx.Err().Error()) + default: + in := args[0].(types.String) + return in.Add(types.String(" success!")) + } + } +}