Skip to content

Commit

Permalink
feat(capabilities/compute): pass module config from registry for comp…
Browse files Browse the repository at this point in the history
…ute (#14708)

* chore(deps): omits a local dev folder

* feat(workflows): change precedence of config merge

* feat(compute): adds a config transformer

* feat(compute): extracts module config from req

* feat(keystone/scripts): deploy custom compute action with default config
  • Loading branch information
MStreet3 authored Oct 11, 2024
1 parent 02dd4cb commit 100ca1e
Show file tree
Hide file tree
Showing 7 changed files with 553 additions and 57 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tools/clroot/db.sqlite3-wal
debug.env
*.txt
operator_ui/install
.devenv

# codeship
*.aes
Expand Down
51 changes: 18 additions & 33 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ import (
capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"
)

const (
CapabilityIDCompute = "[email protected]"

binaryKey = "binary"
configKey = "config"
binaryKey = "binary"
configKey = "config"
maxMemoryMBsKey = "maxMemoryMBs"
timeoutKey = "timeout"
tickIntervalKey = "tickInterval"
)

var (
Expand Down Expand Up @@ -65,6 +67,8 @@ type Compute struct {
log logger.Logger
registry coretypes.CapabilitiesRegistry
modules *moduleCache

transformer ConfigTransformer
}

func (c *Compute) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
Expand All @@ -91,34 +95,29 @@ func copyRequest(req capabilities.CapabilityRequest) capabilities.CapabilityRequ
func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
copied := copyRequest(request)

binary, err := c.popBytesValue(copied.Config, binaryKey)
if err != nil {
return capabilities.CapabilityResponse{}, fmt.Errorf("invalid request: %w", err)
}

config, err := c.popBytesValue(copied.Config, configKey)
cfg, err := c.transformer.Transform(copied.Config, WithLogger(c.log))
if err != nil {
return capabilities.CapabilityResponse{}, fmt.Errorf("invalid request: %w", err)
return capabilities.CapabilityResponse{}, fmt.Errorf("invalid request: could not transform config: %w", err)
}

id := generateID(binary)
id := generateID(cfg.Binary)

m, ok := c.modules.get(id)
if !ok {
mod, err := c.initModule(id, binary, request.Metadata.WorkflowID, request.Metadata.ReferenceID)
mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata.WorkflowID, request.Metadata.ReferenceID)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

m = mod
}

return c.executeWithModule(m.module, config, request)
return c.executeWithModule(m.module, cfg.Config, request)
}

func (c *Compute) initModule(id string, binary []byte, workflowID, referenceID string) (*module, error) {
func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, workflowID, referenceID string) (*module, error) {
initStart := time.Now()
mod, err := host.NewModule(&host.ModuleConfig{Logger: c.log}, binary)
mod, err := host.NewModule(cfg, binary)
if err != nil {
return nil, fmt.Errorf("failed to instantiate WASM module: %w", err)
}
Expand All @@ -133,21 +132,6 @@ func (c *Compute) initModule(id string, binary []byte, workflowID, referenceID s
return m, nil
}

func (c *Compute) popBytesValue(m *values.Map, key string) ([]byte, error) {
v, ok := m.Underlying[key]
if !ok {
return nil, fmt.Errorf("could not find %q in map", key)
}

vb, ok := v.(*values.Bytes)
if !ok {
return nil, fmt.Errorf("value is not bytes: %q", key)
}

delete(m.Underlying, key)
return vb.Underlying, nil
}

func (c *Compute) executeWithModule(module *host.Module, config []byte, req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
executeStart := time.Now()
capReq := capabilitiespb.CapabilityRequestToProto(req)
Expand Down Expand Up @@ -204,9 +188,10 @@ func (c *Compute) Close() error {

func NewAction(log logger.Logger, registry coretypes.CapabilitiesRegistry) *Compute {
compute := &Compute{
log: logger.Named(log, "CustomCompute"),
registry: registry,
modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3),
log: logger.Named(log, "CustomCompute"),
registry: registry,
modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3),
transformer: NewTransformer(),
}
return compute
}
149 changes: 149 additions & 0 deletions core/capabilities/compute/transformer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package compute

import (
"errors"
"fmt"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
)

type Transformer[T any, U any] interface {
Transform(T, ...func(*U)) (*U, error)
}

type ConfigTransformer = Transformer[*values.Map, ParsedConfig]

type ParsedConfig struct {
Binary []byte
Config []byte
ModuleConfig *host.ModuleConfig
}

type transformer struct{}

func (t *transformer) Transform(in *values.Map, opts ...func(*ParsedConfig)) (*ParsedConfig, error) {
binary, err := popValue[[]byte](in, binaryKey)
if err != nil {
return nil, NewInvalidRequestError(err)
}

config, err := popValue[[]byte](in, configKey)
if err != nil {
return nil, NewInvalidRequestError(err)
}

maxMemoryMBs, err := popOptionalValue[int64](in, maxMemoryMBsKey)
if err != nil {
return nil, NewInvalidRequestError(err)
}

mc := &host.ModuleConfig{
MaxMemoryMBs: maxMemoryMBs,
}

timeout, err := popOptionalValue[string](in, timeoutKey)
if err != nil {
return nil, NewInvalidRequestError(err)
}

var td time.Duration
if timeout != "" {
td, err = time.ParseDuration(timeout)
if err != nil {
return nil, NewInvalidRequestError(err)
}
mc.Timeout = &td
}

tickInterval, err := popOptionalValue[string](in, tickIntervalKey)
if err != nil {
return nil, NewInvalidRequestError(err)
}

var ti time.Duration
if tickInterval != "" {
ti, err = time.ParseDuration(tickInterval)
if err != nil {
return nil, NewInvalidRequestError(err)
}
mc.TickInterval = ti
}

pc := &ParsedConfig{
Binary: binary,
Config: config,
ModuleConfig: mc,
}

for _, opt := range opts {
opt(pc)
}

return pc, nil
}

func NewTransformer() *transformer {
return &transformer{}
}

func WithLogger(l logger.Logger) func(*ParsedConfig) {
return func(pc *ParsedConfig) {
pc.ModuleConfig.Logger = l
}
}

func popOptionalValue[T any](m *values.Map, key string) (T, error) {
v, err := popValue[T](m, key)
if err != nil {
var nfe *NotFoundError
if errors.As(err, &nfe) {
return v, nil
}
return v, err
}
return v, nil
}

func popValue[T any](m *values.Map, key string) (T, error) {
var empty T

wrapped, ok := m.Underlying[key]
if !ok {
return empty, NewNotFoundError(key)
}

delete(m.Underlying, key)
err := wrapped.UnwrapTo(&empty)
if err != nil {
return empty, fmt.Errorf("could not unwrap value: %w", err)
}

return empty, nil
}

type NotFoundError struct {
Key string
}

func (e *NotFoundError) Error() string {
return fmt.Sprintf("could not find %q in map", e.Key)
}

func NewNotFoundError(key string) *NotFoundError {
return &NotFoundError{Key: key}
}

type InvalidRequestError struct {
Err error
}

func (e *InvalidRequestError) Error() string {
return fmt.Sprintf("invalid request: %v", e.Err)
}

func NewInvalidRequestError(err error) *InvalidRequestError {
return &InvalidRequestError{Err: err}
}
Loading

0 comments on commit 100ca1e

Please sign in to comment.