Skip to content

Commit

Permalink
Asynchronous function invocation support
Browse files Browse the repository at this point in the history
  • Loading branch information
TristonianJones committed Jun 19, 2020
1 parent 2309529 commit 82f50b7
Show file tree
Hide file tree
Showing 10 changed files with 762 additions and 149 deletions.
19 changes: 9 additions & 10 deletions cel/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,13 @@ func (e *Env) ParseSource(src common.Source) (*Ast, *Issues) {

// Program generates an evaluable instance of the Ast within the environment (Env).
func (e *Env) Program(ast *Ast, opts ...ProgramOption) (Program, error) {
optSet := e.progOpts
if len(opts) != 0 {
mergedOpts := []ProgramOption{}
mergedOpts = append(mergedOpts, e.progOpts...)
mergedOpts = append(mergedOpts, opts...)
optSet = mergedOpts
}
return newProgram(e, ast, optSet)
return e.newProgram(ast, opts /* async= */, false)
}

// AsyncProgram generates an evaluable instance of the Ast with support for asynchronous extension
// functions.
func (e *Env) AsyncProgram(ast *Ast, opts ...ProgramOption) (AsyncProgram, error) {
return e.newProgram(ast, opts /* async= */, true)
}

// SetFeature sets the given feature flag, as enumerated in options.go.
Expand Down Expand Up @@ -427,8 +426,8 @@ func (i *Issues) Err() error {
if i == nil {
return nil
}
if len(i.errs.GetErrors()) > 0 {
return errors.New(i.errs.ToDisplayString())
if len(i.Errors()) > 0 {
return errors.New(i.String())
}
return nil
}
Expand Down
184 changes: 149 additions & 35 deletions cel/program.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package cel

import (
"context"
"errors"
"fmt"

"github.com/google/cel-go/common/types"
Expand All @@ -28,19 +30,31 @@ import (
type Program interface {
// Eval returns the result of an evaluation of the Ast and environment against the input vars.
//
// The vars value may either be an `interpreter.Activation` or a `map[string]interface{}`.
// The argument value may either be an `interpreter.Activation` or a `map[string]interface{}`.
//
// If the `OptTrackState` or `OptExhaustiveEval` flags are used, the `details` response will
// be non-nil. Given this caveat on `details`, the return state from evaluation will be:
//
// * `val`, `details`, `nil` - Successful evaluation of a non-error result.
// * `val`, `details`, `err` - Successful evaluation to an error result.
// * `nil`, `details`, `err` - Unsuccessful evaluation.
Eval(interface{}) (ref.Val, *EvalDetails, error)
}

// AsyncProgram is an evaluable view of an Ast which may contain asynchronous compute.
type AsyncProgram interface {
// AsyncEval returns the result of an evaluation of the Ast against a given input.
//
// An unsuccessful evaluation is typically the result of a series of incompatible `EnvOption`
// or `ProgramOption` values used in the creation of the evaluation environment or executable
// program.
Eval(vars interface{}) (ref.Val, *EvalDetails, error)
// The input arguments (apart from Context) and return values for AsyncEval mirror those from
// the standard Eval call.
AsyncEval(context.Context, interface{}) (ref.Val, *EvalDetails, error)
}

// programWrapper embeds both the Program and AsyncProgram interface, but in practice only one
// interface is exposed through the top-level 'cel' package exports.
type programWrapper struct {
Program
AsyncProgram
}

// NoVars returns an empty Activation.
Expand Down Expand Up @@ -92,17 +106,19 @@ func (ed *EvalDetails) State() interpreter.EvalState {
// prog is the internal implementation of the Program interface.
type prog struct {
*Env
evalOpts EvalOption
decorators []interpreter.InterpretableDecorator
defaultVars interpreter.Activation
dispatcher interpreter.Dispatcher
interpreter interpreter.Interpreter
interpretable interpreter.Interpretable
attrFactory interpreter.AttributeFactory
async bool
evalOpts EvalOption
decorators []interpreter.InterpretableDecorator
defaultVars interpreter.Activation
dispatcher interpreter.Dispatcher
interpreter *interpreter.Interpreter
interpretable interpreter.Interpretable
asyncInterpretable interpreter.AsyncInterpretable
attrFactory interpreter.AttributeFactory
}

// progFactory is a helper alias for marking a program creation factory function.
type progFactory func(interpreter.EvalState) (Program, error)
type progFactory func(interpreter.EvalState) (*programWrapper, error)

// progGen holds a reference to a progFactory instance and implements the Program interface.
type progGen struct {
Expand All @@ -113,7 +129,16 @@ type progGen struct {
// ProgramOption values.
//
// If the program cannot be configured the prog will be nil, with a non-nil error response.
func newProgram(e *Env, ast *Ast, opts []ProgramOption) (Program, error) {
func (e *Env) newProgram(ast *Ast,
opts []ProgramOption,
async bool) (*programWrapper, error) {
optSet := e.progOpts
if len(opts) != 0 {
mergedOpts := []ProgramOption{}
mergedOpts = append(mergedOpts, e.progOpts...)
mergedOpts = append(mergedOpts, opts...)
optSet = mergedOpts
}
// Build the dispatcher, interpreter, and default program value.
disp := interpreter.NewDispatcher()

Expand All @@ -123,11 +148,12 @@ func newProgram(e *Env, ast *Ast, opts []ProgramOption) (Program, error) {
Env: e,
decorators: []interpreter.InterpretableDecorator{},
dispatcher: disp,
async: async,
}

// Configure the program via the ProgramOption values.
var err error
for _, opt := range opts {
for _, opt := range optSet {
if opt == nil {
return nil, fmt.Errorf("program options should be non-nil")
}
Expand Down Expand Up @@ -159,12 +185,13 @@ func newProgram(e *Env, ast *Ast, opts []ProgramOption) (Program, error) {
if p.evalOpts&OptExhaustiveEval == OptExhaustiveEval {
// State tracking requires that each Eval() call operate on an isolated EvalState
// object; hence, the presence of the factory.
factory := func(state interpreter.EvalState) (Program, error) {
factory := func(state interpreter.EvalState) (*programWrapper, error) {
decs := append(decorators, interpreter.ExhaustiveEval(state))
clone := &prog{
evalOpts: p.evalOpts,
defaultVars: p.defaultVars,
Env: e,
async: p.async,
dispatcher: disp,
interpreter: interp}
return initInterpretable(clone, ast, decs)
Expand All @@ -174,7 +201,7 @@ func newProgram(e *Env, ast *Ast, opts []ProgramOption) (Program, error) {
// Enable state tracking last since it too requires the factory approach but is less
// featured than the ExhaustiveEval decorator.
if p.evalOpts&OptTrackState == OptTrackState {
factory := func(state interpreter.EvalState) (Program, error) {
factory := func(state interpreter.EvalState) (*programWrapper, error) {
decs := append(decorators, interpreter.TrackState(state))
clone := &prog{
evalOpts: p.evalOpts,
Expand All @@ -191,31 +218,37 @@ func newProgram(e *Env, ast *Ast, opts []ProgramOption) (Program, error) {

// initProgGen tests the factory object by calling it once and returns a factory-based Program if
// the test is successful.
func initProgGen(factory progFactory) (Program, error) {
func initProgGen(factory progFactory) (*programWrapper, error) {
// Test the factory to make sure that configuration errors are spotted at config
_, err := factory(interpreter.NewEvalState())
if err != nil {
return nil, err
}
return &progGen{factory: factory}, nil
pg := &progGen{factory: factory}
wrapper := &programWrapper{Program: pg, AsyncProgram: pg}
return wrapper, nil
}

// initIterpretable creates a checked or unchecked interpretable depending on whether the Ast
// has been run through the type-checker.
func initInterpretable(
p *prog,
func initInterpretable(p *prog,
ast *Ast,
decorators []interpreter.InterpretableDecorator) (Program, error) {
decorators []interpreter.InterpretableDecorator) (*programWrapper, error) {
var err error
// Unchecked programs do not contain type and reference information and may be
// slower to execute than their checked counterparts.
if !ast.IsChecked() {
p.interpretable, err =
p.interpreter.NewUncheckedInterpretable(ast.Expr(), decorators...)
if p.async {
p.asyncInterpretable, err =
p.interpreter.NewAsyncUncheckedInterpretable(ast.Expr(), decorators...)
} else {
p.interpretable, err =
p.interpreter.NewUncheckedInterpretable(ast.Expr(), decorators...)
}
if err != nil {
return nil, err
}
return p, nil
return &programWrapper{Program: p, AsyncProgram: p}, nil
}
// When the AST has been checked it contains metadata that can be used to speed up program
// execution.
Expand All @@ -224,16 +257,25 @@ func initInterpretable(
if err != nil {
return nil, err
}
p.interpretable, err = p.interpreter.NewInterpretable(checked, decorators...)
if p.async {
p.asyncInterpretable, err =
p.interpreter.NewAsyncInterpretable(checked, decorators...)
} else {
p.interpretable, err = p.interpreter.NewInterpretable(checked, decorators...)
}
if err != nil {
return nil, err
}

return p, nil
return &programWrapper{Program: p, AsyncProgram: p}, nil
}

// Eval implements the Program interface method.
func (p *prog) Eval(input interface{}) (v ref.Val, det *EvalDetails, err error) {
// In general this should never happen, since only one view (sync, async) is returned back to
// the caller.
if p.interpretable == nil {
return nil, nil, errors.New("async program invoked synchronously")
}
// Configure error recovery for unexpected panics during evaluation. Note, the use of named
// return values makes it possible to modify the error response during the recovery
// function.
Expand All @@ -242,16 +284,39 @@ func (p *prog) Eval(input interface{}) (v ref.Val, det *EvalDetails, err error)
err = fmt.Errorf("internal error: %v", r)
}
}()
// Build a hierarchical activation if there are default vars set.
vars, err := interpreter.NewActivation(input)
vars, err := p.vars(input)
v = p.interpretable.Eval(vars)
// The output of an internal Eval may have a value (`v`) that is a types.Err. This step
// translates the CEL value to a Go error response. This interface does not quite match the
// RPC signature which allows for multiple errors to be returned, but should be sufficient.
if types.IsError(v) {
err = v.Value().(error)
}
return
}

// AsyncEval implements the AsyncProgram interface method.
func (p *prog) AsyncEval(ctx context.Context,
input interface{}) (v ref.Val, det *EvalDetails, err error) {
// In general this should never happen, since only one view (sync, async) is returned back to
// the caller.
if p.asyncInterpretable == nil {
return nil, nil, errors.New("sync program invoked asynchronously")
}
// Configure error recovery for unexpected panics during evaluation. Note, the use of named
// return values makes it possible to modify the error response during the recovery
// function.
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("internal error: %v", r)
}
}()
asyncVars, err := p.asyncVars(input)
if err != nil {
return
}
if p.defaultVars != nil {
vars = interpreter.NewHierarchicalActivation(p.defaultVars, vars)
}
v = p.interpretable.Eval(vars)
// The output of an internal Eval may have a value (`v`) that is a types.Err. This step
v = p.asyncInterpretable.AsyncEval(ctx, asyncVars)
// The output of an internal AsyncEval may have a value (`v`) that is a types.Err. This step
// translates the CEL value to a Go error response. This interface does not quite match the
// RPC signature which allows for multiple errors to be returned, but should be sufficient.
if types.IsError(v) {
Expand All @@ -260,6 +325,30 @@ func (p *prog) Eval(input interface{}) (v ref.Val, det *EvalDetails, err error)
return
}

// asyncVars creates an AsyncActivation suitable for tracking async invocations in a manner which
// can be used to orchestrate async calls needed to complete expression evaluation.
func (p *prog) asyncVars(input interface{}) (*interpreter.AsyncActivation, error) {
vars, err := p.vars(input)
if err != nil {
return nil, err
}
return interpreter.NewAsyncActivation(vars), nil
}

// vars creates an Activation from the input, and if applicable extends the set of default values
// configured via ProgramOptions.
func (p *prog) vars(input interface{}) (interpreter.Activation, error) {
vars, err := interpreter.NewActivation(input)
if err != nil {
return nil, err
}
// Build a hierarchical activation if there are default vars set.
if p.defaultVars != nil {
return interpreter.NewHierarchicalActivation(p.defaultVars, vars), nil
}
return vars, nil
}

// Eval implements the Program interface method.
func (gen *progGen) Eval(input interface{}) (ref.Val, *EvalDetails, error) {
// The factory based Eval() differs from the standard evaluation model in that it generates a
Expand All @@ -283,3 +372,28 @@ func (gen *progGen) Eval(input interface{}) (ref.Val, *EvalDetails, error) {
}
return v, det, nil
}

// AsyncEval implements the AsyncProgram interface method.
func (gen *progGen) AsyncEval(ctx context.Context,
input interface{}) (ref.Val, *EvalDetails, error) {
// The factory based AsyncEval() differs from the standard evaluation model in that it
// generates a new EvalState instance for each call to ensure that unique evaluations yield
// unique stateful results.
state := interpreter.NewEvalState()
det := &EvalDetails{state: state}

// Generate a new instance of the interpretable using the factory configured during the call to
// newProgram(). It is incredibly unlikely that the factory call will generate an error given
// the factory test performed within the Program() call.
p, err := gen.factory(state)
if err != nil {
return nil, det, err
}

// Evaluate the input, returning the result and the 'state' within EvalDetails.
v, _, err := p.AsyncEval(ctx, input)
if err != nil {
return v, det, err
}
return v, det, nil
}
8 changes: 8 additions & 0 deletions common/types/ref/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,11 @@ type FieldTester func(target interface{}) bool

// FieldGetter is used to get the field value from an input object, if set.
type FieldGetter func(target interface{}) (interface{}, error)

// Resolver abstracts variable and type identifier resolution behind a single interface
// method.
type Resolver interface {
// ResolveName returns the value associated with the given fully qualified name, if
// present.
ResolveName(name string) (interface{}, bool)
}
7 changes: 4 additions & 3 deletions interpreter/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ type Activation interface {

// EmptyActivation returns a variable free activation.
func EmptyActivation() Activation {
// This call cannot fail.
a, _ := NewActivation(map[string]interface{}{})
return a
return emptyActivation
}

// NewActivation returns an activation based on a map-based binding where the map keys are
Expand Down Expand Up @@ -197,6 +195,9 @@ func (v *varActivation) ResolveName(name string) (interface{}, bool) {
}

var (
// emptyActivation is a singleton activation which provides no input
emptyActivation = &mapActivation{bindings: map[string]interface{}{}}

// pool of var activations to reduce allocations during folds.
varActivationPool = &sync.Pool{
New: func() interface{} {
Expand Down
Loading

0 comments on commit 82f50b7

Please sign in to comment.