Skip to content

Commit

Permalink
moving core Labeler functionality to core/monitoring. Consuming in wo…
Browse files Browse the repository at this point in the history
…rkflows and registrysyncer packages
  • Loading branch information
patrickhuie19 committed Oct 12, 2024
1 parent be6a82e commit 2be3b36
Show file tree
Hide file tree
Showing 15 changed files with 319 additions and 224 deletions.
127 changes: 127 additions & 0 deletions core/monitoring/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package monitoring

import (
"context"
"fmt"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
beholderpb "github.com/smartcontractkit/chainlink-common/pkg/beholder/pb"
valuespb "github.com/smartcontractkit/chainlink-common/pkg/values/pb"
"google.golang.org/protobuf/proto"
)

type CustomMessageLabeler struct {
labels map[string]string
}

func NewCustomMessageLabeler() CustomMessageLabeler {
return CustomMessageLabeler{labels: make(map[string]string)}
}

// With adds multiple key-value pairs to the CustomMessageLabeler for transmission With SendLogAsCustomMessage
func (c CustomMessageLabeler) With(keyValues ...string) CustomMessageLabeler {
newCustomMessageLabeler := NewCustomMessageLabeler()

if len(keyValues)%2 != 0 {
// If an odd number of key-value arguments is passed, return the original CustomMessageLabeler unchanged
return c
}

// Copy existing labels from the current agent
for k, v := range c.labels {
newCustomMessageLabeler.labels[k] = v
}

// Add new key-value pairs
for i := 0; i < len(keyValues); i += 2 {
key := keyValues[i]
value := keyValues[i+1]
newCustomMessageLabeler.labels[key] = value
}

return newCustomMessageLabeler
}

// SendLogAsCustomMessage emits a BaseMessage With msg and labels as data.
// any key in labels that is not part of orderedLabelKeys will not be transmitted
func (c CustomMessageLabeler) SendLogAsCustomMessage(msg string) error {
return sendLogAsCustomMessageW(msg, c.labels)
}

type MetricsLabeler struct {
Labels map[string]string
}

func NewMetricsLabeler() MetricsLabeler {
return MetricsLabeler{Labels: make(map[string]string)}
}

// With adds multiple key-value pairs to the CustomMessageLabeler for transmission With SendLogAsCustomMessage
func (c MetricsLabeler) With(keyValues ...string) MetricsLabeler {
newCustomMetricsLabeler := NewMetricsLabeler()

if len(keyValues)%2 != 0 {
// If an odd number of key-value arguments is passed, return the original CustomMessageLabeler unchanged
return c
}

// Copy existing labels from the current agent
for k, v := range c.Labels {
newCustomMetricsLabeler.Labels[k] = v
}

// Add new key-value pairs
for i := 0; i < len(keyValues); i += 2 {
key := keyValues[i]
value := keyValues[i+1]
newCustomMetricsLabeler.Labels[key] = value
}

return newCustomMetricsLabeler
}

// sendLogAsCustomMessageF formats into a msg to be consumed by sendLogAsCustomMessageW
func sendLogAsCustomMessageF(labels map[string]string, format string, values ...any) error {
return sendLogAsCustomMessageW(fmt.Sprintf(format, values...), labels)
}

// sendLogAsCustomMessageV allows the consumer to pass in variable number of label key value pairs
func sendLogAsCustomMessageV(msg string, labelKVs ...string) error {
if len(labelKVs)%2 != 0 {
return fmt.Errorf("labelKVs must be provided in key-value pairs")
}

labels := make(map[string]string)
for i := 0; i < len(labelKVs); i += 2 {
key := labelKVs[i]
value := labelKVs[i+1]
labels[key] = value
}

return sendLogAsCustomMessageF(labels, msg)
}

func sendLogAsCustomMessageW(msg string, labels map[string]string) error {
protoLabels := make(map[string]*valuespb.Value)
for _, l := range labels {
protoLabels[l] = &valuespb.Value{Value: &valuespb.Value_StringValue{StringValue: labels[l]}}
}
// Define a custom protobuf payload to emit
payload := &beholderpb.BaseMessage{
Msg: msg,
Labels: protoLabels,
}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
return fmt.Errorf("sending custom message failed to marshal protobuf: %w", err)
}

err = beholder.GetEmitter().Emit(context.Background(), payloadBytes,
"beholder_data_schema", "/beholder-base-message/versions/1", // required
"beholder_data_type", "custom_message",
)
if err != nil {
return fmt.Errorf("sending custom message failed on emit: %w", err)
}

return nil
}
15 changes: 15 additions & 0 deletions core/monitoring/monitoring_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package monitoring

import (
"github.com/stretchr/testify/assert"
"testing"
)

// tests CustomMessageAgent does not share state across new instances created by `With`
func Test_CustomMessageAgent(t *testing.T) {
cma := NewCustomMessageLabeler()
cma1 := cma.With("key1", "value1")
cma2 := cma1.With("key2", "value2")

assert.NotEqual(t, cma1.labels, cma2.labels)
}
11 changes: 11 additions & 0 deletions core/monitoring/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package monitoring

import "go.opentelemetry.io/otel/attribute"

func KvMapToOtelAttributes(kvmap map[string]string) []attribute.KeyValue {
otelKVs := make([]attribute.KeyValue, len(kvmap))
for k, v := range kvmap {
otelKVs = append(otelKVs, attribute.String(k, v))
}
return otelKVs
}
7 changes: 7 additions & 0 deletions core/monitoring/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package monitoring

import "testing"

func Test_KvMapToOtelAttributes(t *testing.T) {

}
46 changes: 46 additions & 0 deletions core/services/registrysyncer/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package registrysyncer

import (
"context"
"fmt"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink/v2/core/monitoring"
"go.opentelemetry.io/otel/metric"
)

var remoteRegistrySyncFailureCounter metric.Int64Counter
var launcherFailureCounter metric.Int64Counter

func initMonitoringResources() (err error) {
remoteRegistrySyncFailureCounter, err = beholder.GetMeter().Int64Counter("RemoteRegistrySyncFailure")
if err != nil {
return fmt.Errorf("failed to register sync failure counter: %w", err)
}

launcherFailureCounter, err = beholder.GetMeter().Int64Counter("LauncherFailureCounter")
if err != nil {
return fmt.Errorf("failed to register launcher failure counter: %w", err)
}

return nil
}

// syncerMetricLabeler wraps monitoring.MetricsLabeler to provide workflow specific utilities
// for monitoring resources
type syncerMetricLabeler struct {
monitoring.MetricsLabeler
}

func (c syncerMetricLabeler) with(keyValues ...string) syncerMetricLabeler {
return syncerMetricLabeler{c.With(keyValues...)}
}

func (c syncerMetricLabeler) incrementRemoteRegistryFailureCounter(ctx context.Context) {
otelLabels := monitoring.KvMapToOtelAttributes(c.Labels)
remoteRegistrySyncFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c syncerMetricLabeler) incrementLauncherFailureCounter(ctx context.Context) {
otelLabels := monitoring.KvMapToOtelAttributes(c.Labels)
launcherFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}
17 changes: 17 additions & 0 deletions core/services/registrysyncer/monitoring_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package registrysyncer

import (
"github.com/smartcontractkit/chainlink/v2/core/monitoring"
"github.com/stretchr/testify/require"
"testing"
)

func Test_InitMonitoringResources(t *testing.T) {
require.NoError(t, initMonitoringResources())
}

func Test_SyncerMetricsLabeler(t *testing.T) {
testSyncerMetricLabeler := syncerMetricLabeler{monitoring.NewMetricsLabeler()}
testSyncerMetricLabeler2 := testSyncerMetricLabeler.with("foo", "baz")
require.EqualValues(t, testSyncerMetricLabeler2.Labels["foo"], "baz")
}
8 changes: 7 additions & 1 deletion core/services/registrysyncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type RegistrySyncer interface {

type registrySyncer struct {
services.StateMachine
metrics syncerMetricLabeler
stopCh services.StopChan
launchers []Launcher
reader types.ContractReader
Expand Down Expand Up @@ -130,6 +131,8 @@ func newReader(ctx context.Context, lggr logger.Logger, relayer ContractReaderFa

func (s *registrySyncer) Start(ctx context.Context) error {
return s.StartOnce("RegistrySyncer", func() error {
initMonitoringResources()

s.wg.Add(1)
go func() {
defer s.wg.Done()
Expand All @@ -153,7 +156,8 @@ func (s *registrySyncer) syncLoop() {

// Sync for a first time outside the loop; this means we'll start a remote
// sync immediately once spinning up syncLoop, as by default a ticker will
// fire for the first time at T+N, where N is the interval.
// fire for the first time at T+N, where N is the interval. We do not
// increment RemoteRegistryFailureCounter the first time
s.lggr.Debug("starting initial sync with remote registry")
err := s.Sync(ctx, true)
if err != nil {
Expand All @@ -169,6 +173,7 @@ func (s *registrySyncer) syncLoop() {
err := s.Sync(ctx, false)
if err != nil {
s.lggr.Errorw("failed to sync with remote registry", "error", err)
s.metrics.incrementRemoteRegistryFailureCounter(ctx)
}
}
}
Expand Down Expand Up @@ -319,6 +324,7 @@ func (s *registrySyncer) Sync(ctx context.Context, isInitialSync bool) error {
lrCopy := deepCopyLocalRegistry(lr)
if err := h.Launch(ctx, &lrCopy); err != nil {
s.lggr.Errorf("error calling launcher: %s", err)
s.metrics.incrementLauncherFailureCounter(ctx)
}
}

Expand Down
29 changes: 15 additions & 14 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/smartcontractkit/chainlink/v2/core/monitoring"
"sync"
"time"

Expand Down Expand Up @@ -91,8 +92,8 @@ func (sucm *stepUpdateManager) len() int64 {
// Engine handles the lifecycle of a single workflow and its executions.
type Engine struct {
services.StateMachine
cma customMessageLabeler
metrics customMetricsLabeler
cma monitoring.CustomMessageLabeler
metrics workflowsMetricLabeler
logger logger.Logger
registry core.CapabilitiesRegistry
workflow *workflow
Expand Down Expand Up @@ -159,7 +160,7 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
tg, err := e.registry.GetTrigger(ctx, t.ID)
if err != nil {
e.logger.With(cIDKey, t.ID).Errorf("failed to get trigger capability: %s", err)
e.cma.with(cIDKey, t.ID).sendLogAsCustomMessage(fmt.Sprintf("failed to resolve trigger: %s", err))
e.cma.With(cIDKey, t.ID).SendLogAsCustomMessage(fmt.Sprintf("failed to resolve trigger: %s", err))
// we don't immediately return here, since we want to retry all triggers
// to notify the user of all errors at once.
triggersInitialized = false
Expand Down Expand Up @@ -189,7 +190,7 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {

err := e.initializeCapability(ctx, s)
if err != nil {
e.cma.with(wIDKey, e.workflow.id, sIDKey, s.ID, sRKey, s.Ref).sendLogAsCustomMessage(fmt.Sprintf("failed to initialize capability for step: %s", err))
e.cma.With(wIDKey, e.workflow.id, sIDKey, s.ID, sRKey, s.Ref).SendLogAsCustomMessage(fmt.Sprintf("failed to initialize capability for step: %s", err))
return &workflowError{err: err, reason: "failed to initialize capability for step",
labels: map[string]string{
wIDKey: e.workflow.id,
Expand Down Expand Up @@ -337,7 +338,7 @@ func (e *Engine) init(ctx context.Context) {
terr := e.registerTrigger(ctx, t, idx)
if terr != nil {
e.logger.With(cIDKey, t.ID).Errorf("failed to register trigger: %s", terr)
cerr := e.cma.with(cIDKey, t.ID).sendLogAsCustomMessage(fmt.Sprintf("failed to register trigger: %s", terr))
cerr := e.cma.With(cIDKey, t.ID).SendLogAsCustomMessage(fmt.Sprintf("failed to register trigger: %s", terr))
if cerr != nil {
e.logger.Errorf("failed to send custom message for trigger: %s", terr)
}
Expand Down Expand Up @@ -580,7 +581,7 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event *

func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.WorkflowExecutionStep, workflowCreatedAt *time.Time) error {
l := e.logger.With(eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
cma := e.cma.with(eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
cma := e.cma.With(eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)

// If we've been executing for too long, let's time the workflow step out and continue.
if workflowCreatedAt != nil && e.clock.Since(*workflowCreatedAt) > e.maxExecutionDuration {
Expand Down Expand Up @@ -613,7 +614,7 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.Workflow
// This is to ensure that any side effects are executed consistently, since otherwise
// the async nature of the workflow engine would provide no guarantees.
}
err = cma.sendLogAsCustomMessage(fmt.Sprintf("execution status: %s", status))
err = cma.SendLogAsCustomMessage(fmt.Sprintf("execution status: %s", status))
if err != nil {
return err
}
Expand Down Expand Up @@ -734,7 +735,7 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
// Instantiate a child logger; in addition to the WorkflowID field the workflow
// logger will already have, this adds the `stepRef` and `executionID`
l := e.logger.With(sRKey, msg.stepRef, eIDKey, msg.state.ExecutionID)
cma := e.cma.with(sRKey, msg.stepRef, eIDKey, msg.state.ExecutionID)
cma := e.cma.With(sRKey, msg.stepRef, eIDKey, msg.state.ExecutionID)

l.Debug("executing on a step event")
stepState := &store.WorkflowExecutionStep{
Expand All @@ -744,22 +745,22 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
}

// TODO ks-462 inputs
cma.sendLogAsCustomMessage("executing step")
cma.SendLogAsCustomMessage("executing step")
inputs, outputs, err := e.executeStep(ctx, msg)
var stepStatus string
switch {
case errors.Is(capabilities.ErrStopExecution, err):
lmsg := "step executed successfully with a termination"
l.Info(lmsg)
cmErr := cma.sendLogAsCustomMessage(lmsg)
cmErr := cma.SendLogAsCustomMessage(lmsg)
if cmErr != nil {
l.Errorf("failed to send custom message with msg: %s", lmsg)
}
stepStatus = store.StatusCompletedEarlyExit
case err != nil:
lmsg := "step executed successfully with a termination"
l.Errorf("error executing step request: %s", err)
cmErr := cma.sendLogAsCustomMessage(fmt.Sprintf("error executing step request: %s", err))
cmErr := cma.SendLogAsCustomMessage(fmt.Sprintf("error executing step request: %s", err))
if cmErr != nil {
l.Errorf("failed to send custom message with msg: %s", lmsg)
}
Expand All @@ -768,7 +769,7 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
lmsg := "step executed successfully with a termination"
l.With("outputs", outputs).Info("step executed successfully")
// TODO ks-462 emit custom message with outputs
cmErr := cma.sendLogAsCustomMessage("step executed successfully")
cmErr := cma.SendLogAsCustomMessage("step executed successfully")
if cmErr != nil {
l.Errorf("failed to send custom message with msg: %s", lmsg)
}
Expand Down Expand Up @@ -1152,8 +1153,8 @@ func NewEngine(cfg Config) (engine *Engine, err error) {

engine = &Engine{
logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID),
cma: NewCustomMessageLabeler().with(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name),
metrics: NewCustomMetricsLabeler().with(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name),
cma: monitoring.NewCustomMessageLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name),
metrics: workflowsMetricLabeler{monitoring.NewMetricsLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name)},
registry: cfg.Registry,
workflow: workflow,
env: exec.Env{
Expand Down
Loading

0 comments on commit 2be3b36

Please sign in to comment.