From 82c00bbb60d2a05f218f50ac16ee1d5c4c23188b Mon Sep 17 00:00:00 2001 From: Bolek Kulbabinski <1416262+bolekk@users.noreply.github.com> Date: Mon, 26 Aug 2024 18:57:09 -0700 Subject: [PATCH 01/11] [KS-420] Add rate-limiting to Dispatcher --- core/capabilities/remote/dispatcher.go | 18 +++++++++--------- core/config/capabilities_config.go | 1 + core/config/toml/types.go | 3 ++- core/services/chainlink/application.go | 1 + 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/core/capabilities/remote/dispatcher.go b/core/capabilities/remote/dispatcher.go index bed485c286e..4661617b113 100644 --- a/core/capabilities/remote/dispatcher.go +++ b/core/capabilities/remote/dispatcher.go @@ -14,7 +14,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" - remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" "github.com/smartcontractkit/chainlink/v2/core/logger" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) @@ -44,7 +43,9 @@ type key struct { var _ services.Service = &dispatcher{} +// TODO read those and also new rate-limiter config from node's TOML config const supportedVersion = 1 +const receiverBufferSize = 10000 func NewDispatcher(peerWrapper p2ptypes.PeerWrapper, signer p2ptypes.Signer, registry core.CapabilitiesRegistry, lggr logger.Logger) *dispatcher { return &dispatcher{ @@ -85,14 +86,12 @@ var capReceiveChannelUsage = promauto.NewGaugeVec(prometheus.GaugeOpts{ Help: "The usage of the receive channel for each capability, 0 indicates empty, 1 indicates full.", }, []string{"capabilityId", "donId"}) -const receiverBufferSize = 10000 - type receiver struct { cancel context.CancelFunc - ch chan *remotetypes.MessageBody + ch chan *types.MessageBody } -func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec remotetypes.Receiver) error { +func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec types.Receiver) error { d.mu.Lock() defer d.mu.Unlock() k := key{capabilityId, donId} @@ -101,7 +100,7 @@ func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec remotety return fmt.Errorf("%w: receiver already exists for capability %s and don %d", ErrReceiverExists, capabilityId, donId) } - receiverCh := make(chan *remotetypes.MessageBody, receiverBufferSize) + receiverCh := make(chan *types.MessageBody, receiverBufferSize) ctx, cancelCtx := d.stopCh.NewCtx() d.wg.Add(1) @@ -139,7 +138,7 @@ func (d *dispatcher) RemoveReceiver(capabilityId string, donId uint32) { } } -func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error { +func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *types.MessageBody) error { msgBody.Version = supportedVersion msgBody.Sender = d.peerID[:] msgBody.Receiver = peerID[:] @@ -152,7 +151,7 @@ func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBo if err != nil { return err } - msg := &remotetypes.Message{Signature: signature, Body: rawBody} + msg := &types.Message{Signature: signature, Body: rawBody} rawMsg, err := proto.Marshal(msg) if err != nil { return err @@ -168,6 +167,7 @@ func (d *dispatcher) receive() { d.lggr.Info("stopped - exiting receive") return case msg := <-recvCh: + // TODO: apply rate-limiting per msg.Sender using rate.NewLimiter() body, err := ValidateMessage(msg, d.peerID) if err != nil { d.lggr.Debugw("received invalid message", "error", err) @@ -195,7 +195,7 @@ func (d *dispatcher) receive() { } } -func (d *dispatcher) tryRespondWithError(peerID p2ptypes.PeerID, body *remotetypes.MessageBody, errType types.Error) { +func (d *dispatcher) tryRespondWithError(peerID p2ptypes.PeerID, body *types.MessageBody, errType types.Error) { if body == nil { return } diff --git a/core/config/capabilities_config.go b/core/config/capabilities_config.go index ae542c062c5..21da660eab5 100644 --- a/core/config/capabilities_config.go +++ b/core/config/capabilities_config.go @@ -13,5 +13,6 @@ type CapabilitiesExternalRegistry interface { type Capabilities interface { Peering() P2P + // TODO: add Dispatcher Config here + all parsers, tests, README ExternalRegistry() CapabilitiesExternalRegistry } diff --git a/core/config/toml/types.go b/core/config/toml/types.go index 0c91ddd81a9..4fd305b6ee2 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -1435,7 +1435,8 @@ func (r *ExternalRegistry) setFrom(f *ExternalRegistry) { } type Capabilities struct { - Peering P2P `toml:",omitempty"` + Peering P2P `toml:",omitempty"` + // TODO: add Dispatcher config here ExternalRegistry ExternalRegistry `toml:",omitempty"` } diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 3efc92c2270..b6e21302d2d 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -213,6 +213,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), opts.DS, globalLogger) signer := externalPeer externalPeerWrapper = externalPeer + // TODO: pass new dispatcher config from TOML config: cfg.Capabilities().Dispatcher() remoteDispatcher := remote.NewDispatcher(externalPeerWrapper, signer, opts.CapabilitiesRegistry, globalLogger) dispatcher = remoteDispatcher } else { From c06cd4f96ac6fd3edbfd2ae99d365a791d75fee4 Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Tue, 27 Aug 2024 12:57:44 -0400 Subject: [PATCH 02/11] Implements rate limiter on dispatcher --- core/capabilities/remote/dispatcher.go | 40 +++++++---- core/capabilities/remote/dispatcher_test.go | 80 +++++++++++++++++++-- core/config/capabilities_config.go | 2 +- core/config/dispatcher_config.go | 14 ++++ core/config/toml/types.go | 44 +++++++++++- core/services/chainlink/application.go | 6 +- 6 files changed, 165 insertions(+), 21 deletions(-) create mode 100644 core/config/dispatcher_config.go diff --git a/core/capabilities/remote/dispatcher.go b/core/capabilities/remote/dispatcher.go index 4661617b113..cc2f45222d2 100644 --- a/core/capabilities/remote/dispatcher.go +++ b/core/capabilities/remote/dispatcher.go @@ -2,19 +2,22 @@ package remote import ( "context" - "errors" "fmt" - sync "sync" + "sync" "time" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types/core" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) @@ -24,11 +27,13 @@ var ( // dispatcher en/decodes messages and routes traffic between peers and capabilities type dispatcher struct { + cfg config.Dispatcher peerWrapper p2ptypes.PeerWrapper peer p2ptypes.Peer peerID p2ptypes.PeerID signer p2ptypes.Signer registry core.CapabilitiesRegistry + rateLimiter *common.RateLimiter receivers map[key]*receiver mu sync.RWMutex stopCh services.StopChan @@ -43,19 +48,26 @@ type key struct { var _ services.Service = &dispatcher{} -// TODO read those and also new rate-limiter config from node's TOML config -const supportedVersion = 1 -const receiverBufferSize = 10000 - -func NewDispatcher(peerWrapper p2ptypes.PeerWrapper, signer p2ptypes.Signer, registry core.CapabilitiesRegistry, lggr logger.Logger) *dispatcher { +func NewDispatcher(cfg config.Dispatcher, peerWrapper p2ptypes.PeerWrapper, signer p2ptypes.Signer, registry core.CapabilitiesRegistry, lggr logger.Logger) (*dispatcher, error) { + rl, err := common.NewRateLimiter(common.RateLimiterConfig{ + GlobalRPS: cfg.RateLimit().GlobalRPS(), + GlobalBurst: cfg.RateLimit().GlobalBurst(), + PerSenderRPS: cfg.RateLimit().RPS(), + PerSenderBurst: cfg.RateLimit().Burst(), + }) + if err != nil { + return nil, errors.Wrap(err, "failed to create rate limiter") + } return &dispatcher{ + cfg: cfg, peerWrapper: peerWrapper, signer: signer, registry: registry, + rateLimiter: rl, receivers: make(map[key]*receiver), stopCh: make(services.StopChan), lggr: lggr.Named("Dispatcher"), - } + }, nil } func (d *dispatcher) Start(ctx context.Context) error { @@ -100,7 +112,7 @@ func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec types.Re return fmt.Errorf("%w: receiver already exists for capability %s and don %d", ErrReceiverExists, capabilityId, donId) } - receiverCh := make(chan *types.MessageBody, receiverBufferSize) + receiverCh := make(chan *types.MessageBody, d.cfg.ReceiverBufferSize()) ctx, cancelCtx := d.stopCh.NewCtx() d.wg.Add(1) @@ -139,7 +151,7 @@ func (d *dispatcher) RemoveReceiver(capabilityId string, donId uint32) { } func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *types.MessageBody) error { - msgBody.Version = supportedVersion + msgBody.Version = uint32(d.cfg.SupportedVersion()) msgBody.Sender = d.peerID[:] msgBody.Receiver = peerID[:] msgBody.Timestamp = time.Now().UnixMilli() @@ -167,7 +179,11 @@ func (d *dispatcher) receive() { d.lggr.Info("stopped - exiting receive") return case msg := <-recvCh: - // TODO: apply rate-limiting per msg.Sender using rate.NewLimiter() + if !d.rateLimiter.Allow(msg.Sender.String()) { + d.lggr.Debugw("rate limit exceeded, dropping message", "sender", msg.Sender) + // TODO: do we just drop the message, or do we need to respond with an error? + continue + } body, err := ValidateMessage(msg, d.peerID) if err != nil { d.lggr.Debugw("received invalid message", "error", err) @@ -184,7 +200,7 @@ func (d *dispatcher) receive() { continue } - receiverQueueUsage := float64(len(receiver.ch)) / receiverBufferSize + receiverQueueUsage := float64(len(receiver.ch)) / float64(d.cfg.ReceiverBufferSize()) capReceiveChannelUsage.WithLabelValues(k.capId, fmt.Sprint(k.donId)).Set(receiverQueueUsage) select { case receiver.ch <- body: diff --git a/core/capabilities/remote/dispatcher_test.go b/core/capabilities/remote/dispatcher_test.go index 7ea4c2e2626..72f62e2ec66 100644 --- a/core/capabilities/remote/dispatcher_test.go +++ b/core/capabilities/remote/dispatcher_test.go @@ -10,6 +10,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" @@ -32,6 +33,47 @@ func (r *testReceiver) Receive(_ context.Context, msg *remotetypes.MessageBody) r.ch <- msg } +type testRateLimitConfig struct { + globalRPS float64 + globalBurst int + rps float64 + burst int +} + +func (c testRateLimitConfig) GlobalRPS() float64 { + return c.globalRPS +} + +func (c testRateLimitConfig) GlobalBurst() int { + return c.globalBurst +} + +func (c testRateLimitConfig) RPS() float64 { + return c.rps +} + +func (c testRateLimitConfig) Burst() int { + return c.burst +} + +type testConfig struct { + supportedVersion int + receiverBufferSize int + rateLimit testRateLimitConfig +} + +func (c testConfig) SupportedVersion() int { + return c.supportedVersion +} + +func (c testConfig) ReceiverBufferSize() int { + return c.receiverBufferSize +} + +func (c testConfig) RateLimit() config.DispatcherRateLimit { + return c.rateLimit +} + func TestDispatcher_CleanStartClose(t *testing.T) { lggr := logger.TestLogger(t) ctx := testutils.Context(t) @@ -44,7 +86,17 @@ func TestDispatcher_CleanStartClose(t *testing.T) { signer := mocks.NewSigner(t) registry := commonMocks.NewCapabilitiesRegistry(t) - dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr) + dispatcher, err := remote.NewDispatcher(testConfig{ + supportedVersion: 1, + receiverBufferSize: 10000, + rateLimit: testRateLimitConfig{ + globalRPS: 3.0, + globalBurst: 3, + rps: 1.0, + burst: 2, + }, + }, wrapper, signer, registry, lggr) + require.NoError(t, err) require.NoError(t, dispatcher.Start(ctx)) require.NoError(t, dispatcher.Close()) } @@ -65,11 +117,21 @@ func TestDispatcher_Receive(t *testing.T) { signer.On("Sign", mock.Anything).Return(nil, errors.New("not implemented")) registry := commonMocks.NewCapabilitiesRegistry(t) - dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr) + dispatcher, err := remote.NewDispatcher(testConfig{ + supportedVersion: 1, + receiverBufferSize: 10000, + rateLimit: testRateLimitConfig{ + globalRPS: 3.0, + globalBurst: 3, + rps: 1.0, + burst: 2, + }, + }, wrapper, signer, registry, lggr) + require.NoError(t, err) require.NoError(t, dispatcher.Start(ctx)) rcv := newReceiver() - err := dispatcher.SetReceiver(capId1, donId1, rcv) + err = dispatcher.SetReceiver(capId1, donId1, rcv) require.NoError(t, err) // supported capability @@ -113,7 +175,17 @@ func TestDispatcher_RespondWithError(t *testing.T) { signer.On("Sign", mock.Anything).Return([]byte{}, nil) registry := commonMocks.NewCapabilitiesRegistry(t) - dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr) + dispatcher, err := remote.NewDispatcher(testConfig{ + supportedVersion: 1, + receiverBufferSize: 10000, + rateLimit: testRateLimitConfig{ + globalRPS: 3.0, + globalBurst: 3, + rps: 1.0, + burst: 2, + }, + }, wrapper, signer, registry, lggr) + require.NoError(t, err) require.NoError(t, dispatcher.Start(ctx)) // unknown capability diff --git a/core/config/capabilities_config.go b/core/config/capabilities_config.go index 21da660eab5..8c10896d093 100644 --- a/core/config/capabilities_config.go +++ b/core/config/capabilities_config.go @@ -13,6 +13,6 @@ type CapabilitiesExternalRegistry interface { type Capabilities interface { Peering() P2P - // TODO: add Dispatcher Config here + all parsers, tests, README + Dispatcher() Dispatcher ExternalRegistry() CapabilitiesExternalRegistry } diff --git a/core/config/dispatcher_config.go b/core/config/dispatcher_config.go new file mode 100644 index 00000000000..5076f2582f3 --- /dev/null +++ b/core/config/dispatcher_config.go @@ -0,0 +1,14 @@ +package config + +type DispatcherRateLimit interface { + GlobalRPS() float64 + GlobalBurst() int + RPS() float64 + Burst() int +} + +type Dispatcher interface { + SupportedVersion() int + ReceiverBufferSize() int + RateLimit() DispatcherRateLimit +} diff --git a/core/config/toml/types.go b/core/config/toml/types.go index 4fd305b6ee2..b380e163e0a 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -1434,9 +1434,49 @@ func (r *ExternalRegistry) setFrom(f *ExternalRegistry) { } } +type Dispatcher struct { + SupportedVersion *int + ReceiverBufferSize *int + RateLimit DispatcherRateLimit +} + +func (d *Dispatcher) setFrom(f *Dispatcher) { + d.RateLimit.setFrom(&f.RateLimit) + + if f.ReceiverBufferSize != nil { + d.ReceiverBufferSize = f.ReceiverBufferSize + } + + if f.SupportedVersion != nil { + d.SupportedVersion = f.SupportedVersion + } +} + +type DispatcherRateLimit struct { + GlobalRPS *float64 + GlobalBurst *int + RPS *float64 + Burst *int +} + +func (drl *DispatcherRateLimit) setFrom(f *DispatcherRateLimit) { + if f.GlobalRPS != nil { + drl.GlobalRPS = f.GlobalRPS + } + if f.GlobalBurst != nil { + drl.GlobalBurst = f.GlobalBurst + } + if f.RPS != nil { + drl.RPS = f.RPS + } + if f.Burst != nil { + drl.Burst = f.Burst + } +} + type Capabilities struct { - Peering P2P `toml:",omitempty"` - // TODO: add Dispatcher config here + Peering P2P `toml:",omitempty"` + Dispatcher Dispatcher `toml:"omitempty"` ExternalRegistry ExternalRegistry `toml:",omitempty"` } diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index b6e21302d2d..6df51045c02 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -213,8 +213,10 @@ func NewApplication(opts ApplicationOpts) (Application, error) { externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), opts.DS, globalLogger) signer := externalPeer externalPeerWrapper = externalPeer - // TODO: pass new dispatcher config from TOML config: cfg.Capabilities().Dispatcher() - remoteDispatcher := remote.NewDispatcher(externalPeerWrapper, signer, opts.CapabilitiesRegistry, globalLogger) + remoteDispatcher, err := remote.NewDispatcher(cfg.Capabilities().Dispatcher(), externalPeerWrapper, signer, opts.CapabilitiesRegistry, globalLogger) + if err != nil { + return nil, fmt.Errorf("could not create dispatcher: %w", err) + } dispatcher = remoteDispatcher } else { dispatcher = opts.CapabilitiesDispatcher From 2c87e13163710628531ce02621ed1256b1a21eb6 Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Tue, 27 Aug 2024 13:11:09 -0400 Subject: [PATCH 03/11] Improves TOML config --- core/capabilities/remote/dispatcher.go | 1 - core/config/toml/types.go | 18 +++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/core/capabilities/remote/dispatcher.go b/core/capabilities/remote/dispatcher.go index cc2f45222d2..3618401534e 100644 --- a/core/capabilities/remote/dispatcher.go +++ b/core/capabilities/remote/dispatcher.go @@ -181,7 +181,6 @@ func (d *dispatcher) receive() { case msg := <-recvCh: if !d.rateLimiter.Allow(msg.Sender.String()) { d.lggr.Debugw("rate limit exceeded, dropping message", "sender", msg.Sender) - // TODO: do we just drop the message, or do we need to respond with an error? continue } body, err := ValidateMessage(msg, d.peerID) diff --git a/core/config/toml/types.go b/core/config/toml/types.go index b380e163e0a..c51b51a7d40 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -1453,10 +1453,10 @@ func (d *Dispatcher) setFrom(f *Dispatcher) { } type DispatcherRateLimit struct { - GlobalRPS *float64 - GlobalBurst *int - RPS *float64 - Burst *int + GlobalRPS *float64 + GlobalBurst *int + PerSenderRPS *float64 + PerSenderBurst *int } func (drl *DispatcherRateLimit) setFrom(f *DispatcherRateLimit) { @@ -1466,17 +1466,17 @@ func (drl *DispatcherRateLimit) setFrom(f *DispatcherRateLimit) { if f.GlobalBurst != nil { drl.GlobalBurst = f.GlobalBurst } - if f.RPS != nil { - drl.RPS = f.RPS + if f.PerSenderRPS != nil { + drl.PerSenderRPS = f.PerSenderRPS } - if f.Burst != nil { - drl.Burst = f.Burst + if f.PerSenderBurst != nil { + drl.PerSenderBurst = f.PerSenderBurst } } type Capabilities struct { Peering P2P `toml:",omitempty"` - Dispatcher Dispatcher `toml:"omitempty"` + Dispatcher Dispatcher `toml:",omitempty"` ExternalRegistry ExternalRegistry `toml:",omitempty"` } From 5e8f766201f8025b4e012d75a2441efe0fdcc183 Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Tue, 27 Aug 2024 13:34:48 -0400 Subject: [PATCH 04/11] Updates toml config docs --- core/config/docs/core.toml | 16 +++++++++++ core/config/toml/types.go | 1 + docs/CONFIG.md | 54 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+) diff --git a/core/config/docs/core.toml b/core/config/docs/core.toml index d0960779c6c..1f5fc2b066c 100644 --- a/core/config/docs/core.toml +++ b/core/config/docs/core.toml @@ -450,6 +450,22 @@ NetworkID = 'evm' # Default # ChainID identifies the target chain id where the remote registry is located. ChainID = '1' # Default +[Capabilities.Dispatcher] +# SupportedVersion is the version of the dispatcher that the node supports. +SupportedVersion = 1 # Default +# ReceiverBufferSize is the size of the buffer for incoming messages. +ReceiverBufferSize = 10000 # Default + +[Capabilities.Dispatcher.RateLimit] +# GlobalRPS is the global rate limit for the dispatcher. +GlobalRPS = 800 # Default +# GlobalBurst is the global burst limit for the dispatcher. +GlobalBurst = 1000 # Default +# PerSenderRPS is the per-sender rate limit for the dispatcher. +PerSenderRPS = 5 # Example +# PerSenderBurst is the per-sender burst limit for the dispatcher. +PerSenderBurst = 10 # Example + [Capabilities.Peering] # IncomingMessageBufferSize is the per-remote number of incoming # messages to buffer. Any additional messages received on top of those diff --git a/core/config/toml/types.go b/core/config/toml/types.go index c51b51a7d40..fe50f6d833b 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -1483,6 +1483,7 @@ type Capabilities struct { func (c *Capabilities) setFrom(f *Capabilities) { c.Peering.setFrom(&f.Peering) c.ExternalRegistry.setFrom(&f.ExternalRegistry) + c.Dispatcher.setFrom(&f.Dispatcher) } type ThresholdKeyShareSecrets struct { diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 883bb49d316..14a4eb3c4f9 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -1233,6 +1233,60 @@ ChainID = '1' # Default ``` ChainID identifies the target chain id where the remote registry is located. +## Capabilities.Dispatcher +```toml +[Capabilities.Dispatcher] +SupportedVersion = 1 # Default +ReceiverBufferSize = 10000 # Default +``` + + +### SupportedVersion +```toml +SupportedVersion = 1 # Default +``` +SupportedVersion is the version of the dispatcher that the node supports. + +### ReceiverBufferSize +```toml +ReceiverBufferSize = 10000 # Default +``` +ReceiverBufferSize is the size of the buffer for incoming messages. + +## Capabilities.Dispatcher.RateLimit +```toml +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800 # Default +GlobalBurst = 1000 # Default +PerSenderRPS = 5 # Example +PerSenderBurst = 10 # Example +``` + + +### GlobalRPS +```toml +GlobalRPS = 800 # Default +``` +GlobalRPS is the global rate limit for the dispatcher. + +### GlobalBurst +```toml +GlobalBurst = 1000 # Default +``` +GlobalBurst is the global burst limit for the dispatcher. + +### PerSenderRPS +```toml +PerSenderRPS = 5 # Example +``` +PerSenderRPS is the per-sender rate limit for the dispatcher. + +### PerSenderBurst +```toml +PerSenderBurst = 10 # Example +``` +PerSenderBurst is the per-sender burst limit for the dispatcher. + ## Capabilities.Peering ```toml [Capabilities.Peering] From 0e962f5f1efd804d540cc24804c190f5da30f57c Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Tue, 27 Aug 2024 13:42:25 -0400 Subject: [PATCH 05/11] Fixes config_capabilities.go --- .../services/chainlink/config_capabilities.go | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/core/services/chainlink/config_capabilities.go b/core/services/chainlink/config_capabilities.go index c438ca249dd..22f2230e7f2 100644 --- a/core/services/chainlink/config_capabilities.go +++ b/core/services/chainlink/config_capabilities.go @@ -23,6 +23,46 @@ func (c *capabilitiesConfig) ExternalRegistry() config.CapabilitiesExternalRegis } } +func (c *capabilitiesConfig) Dispatcher() config.Dispatcher { + return &dispatcher{d: c.c.Dispatcher} +} + +type dispatcher struct { + d toml.Dispatcher +} + +func (d *dispatcher) SupportedVersion() int { + return *d.d.SupportedVersion +} + +func (d *dispatcher) ReceiverBufferSize() int { + return *d.d.ReceiverBufferSize +} + +func (d *dispatcher) RateLimit() config.DispatcherRateLimit { + return &dispatcherRateLimit{r: d.d.RateLimit} +} + +type dispatcherRateLimit struct { + r toml.DispatcherRateLimit +} + +func (r *dispatcherRateLimit) GlobalRPS() float64 { + return *r.r.GlobalRPS +} + +func (r *dispatcherRateLimit) GlobalBurst() int { + return *r.r.GlobalBurst +} + +func (r *dispatcherRateLimit) RPS() float64 { + return *r.r.PerSenderRPS +} + +func (r *dispatcherRateLimit) Burst() int { + return *r.r.PerSenderBurst +} + type capabilitiesExternalRegistry struct { c toml.ExternalRegistry } From fb40b3e35d539e3cd8e9822d43b274e4fcbf555f Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Tue, 27 Aug 2024 13:43:02 -0400 Subject: [PATCH 06/11] Adds changeset --- .changeset/calm-laws-begin.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/calm-laws-begin.md diff --git a/.changeset/calm-laws-begin.md b/.changeset/calm-laws-begin.md new file mode 100644 index 00000000000..5549f1a966f --- /dev/null +++ b/.changeset/calm-laws-begin.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#added Implements rate limiter for capabilities dispatcher From d117bbfda2600410e0c13bed8eeb6ad1bbfbe2b5 Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Tue, 27 Aug 2024 14:17:01 -0400 Subject: [PATCH 07/11] Updates txtar files --- testdata/scripts/node/validate/default.txtar | 10 ++++++++++ .../node/validate/disk-based-logging-disabled.txtar | 10 ++++++++++ .../node/validate/disk-based-logging-no-dir.txtar | 10 ++++++++++ .../scripts/node/validate/disk-based-logging.txtar | 10 ++++++++++ testdata/scripts/node/validate/invalid-ocr-p2p.txtar | 10 ++++++++++ testdata/scripts/node/validate/invalid.txtar | 10 ++++++++++ testdata/scripts/node/validate/valid.txtar | 10 ++++++++++ testdata/scripts/node/validate/warnings.txtar | 10 ++++++++++ 8 files changed, 80 insertions(+) diff --git a/testdata/scripts/node/validate/default.txtar b/testdata/scripts/node/validate/default.txtar index ff8b4889c49..5db139a61e7 100644 --- a/testdata/scripts/node/validate/default.txtar +++ b/testdata/scripts/node/validate/default.txtar @@ -264,6 +264,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 0.0 +PerSenderBurst = 0 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar index 016d416d5f6..c7da39d5d71 100644 --- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar @@ -308,6 +308,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 0.0 +PerSenderBurst = 0 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar index f8a98b2c49a..9625f93b663 100644 --- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar @@ -308,6 +308,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 0.0 +PerSenderBurst = 0 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar index aef3b106a59..4656f820675 100644 --- a/testdata/scripts/node/validate/disk-based-logging.txtar +++ b/testdata/scripts/node/validate/disk-based-logging.txtar @@ -308,6 +308,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 0.0 +PerSenderBurst = 0 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar index 0cdf001eccd..c24a9664927 100644 --- a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar +++ b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar @@ -293,6 +293,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 0.0 +PerSenderBurst = 0 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar index 2912a803274..5bf2e70db1d 100644 --- a/testdata/scripts/node/validate/invalid.txtar +++ b/testdata/scripts/node/validate/invalid.txtar @@ -298,6 +298,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 0.0 +PerSenderBurst = 0 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar index ce40c91f669..123b7b3132c 100644 --- a/testdata/scripts/node/validate/valid.txtar +++ b/testdata/scripts/node/validate/valid.txtar @@ -305,6 +305,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 0.0 +PerSenderBurst = 0 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/testdata/scripts/node/validate/warnings.txtar b/testdata/scripts/node/validate/warnings.txtar index dea40ec8da0..756e768914a 100644 --- a/testdata/scripts/node/validate/warnings.txtar +++ b/testdata/scripts/node/validate/warnings.txtar @@ -287,6 +287,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 0.0 +PerSenderBurst = 0 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' From 9118471ed90a81347f1d4cfd09ae21f7343ada95 Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Wed, 28 Aug 2024 12:41:15 -0400 Subject: [PATCH 08/11] Fixes tests + updates docs --- core/capabilities/remote/dispatcher_test.go | 24 ++--- core/config/docs/core.toml | 6 +- core/services/chainlink/config_test.go | 10 ++ .../testdata/config-empty-effective.toml | 10 ++ .../chainlink/testdata/config-full.toml | 10 ++ .../config-multi-chain-effective.toml | 10 ++ .../testdata/config-empty-effective.toml | 10 ++ core/web/resolver/testdata/config-full.toml | 10 ++ .../config-multi-chain-effective.toml | 10 ++ docs/CONFIG.md | 10 +- main_test.go | 2 +- testdata/scripts/health/default.txtar | 95 ------------------- testdata/scripts/node/validate/default.txtar | 4 +- .../disk-based-logging-disabled.txtar | 4 +- .../validate/disk-based-logging-no-dir.txtar | 4 +- .../node/validate/disk-based-logging.txtar | 4 +- .../node/validate/invalid-ocr-p2p.txtar | 4 +- testdata/scripts/node/validate/invalid.txtar | 4 +- testdata/scripts/node/validate/valid.txtar | 4 +- testdata/scripts/node/validate/warnings.txtar | 4 +- 20 files changed, 107 insertions(+), 132 deletions(-) diff --git a/core/capabilities/remote/dispatcher_test.go b/core/capabilities/remote/dispatcher_test.go index 72f62e2ec66..5dc09d44f90 100644 --- a/core/capabilities/remote/dispatcher_test.go +++ b/core/capabilities/remote/dispatcher_test.go @@ -90,10 +90,10 @@ func TestDispatcher_CleanStartClose(t *testing.T) { supportedVersion: 1, receiverBufferSize: 10000, rateLimit: testRateLimitConfig{ - globalRPS: 3.0, - globalBurst: 3, - rps: 1.0, - burst: 2, + globalRPS: 800.0, + globalBurst: 100, + rps: 10.0, + burst: 50, }, }, wrapper, signer, registry, lggr) require.NoError(t, err) @@ -121,10 +121,10 @@ func TestDispatcher_Receive(t *testing.T) { supportedVersion: 1, receiverBufferSize: 10000, rateLimit: testRateLimitConfig{ - globalRPS: 3.0, - globalBurst: 3, - rps: 1.0, - burst: 2, + globalRPS: 800.0, + globalBurst: 100, + rps: 10.0, + burst: 50, }, }, wrapper, signer, registry, lggr) require.NoError(t, err) @@ -179,10 +179,10 @@ func TestDispatcher_RespondWithError(t *testing.T) { supportedVersion: 1, receiverBufferSize: 10000, rateLimit: testRateLimitConfig{ - globalRPS: 3.0, - globalBurst: 3, - rps: 1.0, - burst: 2, + globalRPS: 800.0, + globalBurst: 100, + rps: 10.0, + burst: 50, }, }, wrapper, signer, registry, lggr) require.NoError(t, err) diff --git a/core/config/docs/core.toml b/core/config/docs/core.toml index 1f5fc2b066c..fd33fe3f5e5 100644 --- a/core/config/docs/core.toml +++ b/core/config/docs/core.toml @@ -451,7 +451,7 @@ NetworkID = 'evm' # Default ChainID = '1' # Default [Capabilities.Dispatcher] -# SupportedVersion is the version of the dispatcher that the node supports. +# SupportedVersion is the version of the version of message schema. SupportedVersion = 1 # Default # ReceiverBufferSize is the size of the buffer for incoming messages. ReceiverBufferSize = 10000 # Default @@ -462,9 +462,9 @@ GlobalRPS = 800 # Default # GlobalBurst is the global burst limit for the dispatcher. GlobalBurst = 1000 # Default # PerSenderRPS is the per-sender rate limit for the dispatcher. -PerSenderRPS = 5 # Example +PerSenderRPS = 10 # Default # PerSenderBurst is the per-sender burst limit for the dispatcher. -PerSenderBurst = 10 # Example +PerSenderBurst = 50 # Default [Capabilities.Peering] # IncomingMessageBufferSize is the per-remote number of incoming diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index 56b0661854e..69e65f9a3f9 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -451,6 +451,16 @@ func TestConfig_Marshal(t *testing.T) { ChainID: ptr("1"), NetworkID: ptr("evm"), }, + Dispatcher: toml.Dispatcher{ + SupportedVersion: ptr(1), + ReceiverBufferSize: ptr(10000), + RateLimit: toml.DispatcherRateLimit{ + GlobalRPS: ptr(800.0), + GlobalBurst: ptr(1000), + PerSenderRPS: ptr(10.0), + PerSenderBurst: ptr(50), + }, + }, } full.Keeper = toml.Keeper{ DefaultTransactionQueueDepth: ptr[uint32](17), diff --git a/core/services/chainlink/testdata/config-empty-effective.toml b/core/services/chainlink/testdata/config-empty-effective.toml index f1325d824ea..f6aef66f1c1 100644 --- a/core/services/chainlink/testdata/config-empty-effective.toml +++ b/core/services/chainlink/testdata/config-empty-effective.toml @@ -252,6 +252,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index ff044fff586..a2f71dfd172 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -262,6 +262,16 @@ DeltaDial = '1m0s' DeltaReconcile = '2s' ListenAddresses = ['foo', 'bar'] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/core/services/chainlink/testdata/config-multi-chain-effective.toml b/core/services/chainlink/testdata/config-multi-chain-effective.toml index 8bfc93c7be0..6fff86d20a7 100644 --- a/core/services/chainlink/testdata/config-multi-chain-effective.toml +++ b/core/services/chainlink/testdata/config-multi-chain-effective.toml @@ -252,6 +252,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/core/web/resolver/testdata/config-empty-effective.toml b/core/web/resolver/testdata/config-empty-effective.toml index f1325d824ea..f6aef66f1c1 100644 --- a/core/web/resolver/testdata/config-empty-effective.toml +++ b/core/web/resolver/testdata/config-empty-effective.toml @@ -252,6 +252,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/core/web/resolver/testdata/config-full.toml b/core/web/resolver/testdata/config-full.toml index 37644c1d221..f4d6c2e46b7 100644 --- a/core/web/resolver/testdata/config-full.toml +++ b/core/web/resolver/testdata/config-full.toml @@ -262,6 +262,16 @@ DeltaDial = '1m0s' DeltaReconcile = '2s' ListenAddresses = ['foo', 'bar'] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml index 55f998156c8..41382aba4d3 100644 --- a/core/web/resolver/testdata/config-multi-chain-effective.toml +++ b/core/web/resolver/testdata/config-multi-chain-effective.toml @@ -252,6 +252,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 14a4eb3c4f9..243ea5c747c 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -1245,7 +1245,7 @@ ReceiverBufferSize = 10000 # Default ```toml SupportedVersion = 1 # Default ``` -SupportedVersion is the version of the dispatcher that the node supports. +SupportedVersion is the version of the version of message schema. ### ReceiverBufferSize ```toml @@ -1258,8 +1258,8 @@ ReceiverBufferSize is the size of the buffer for incoming messages. [Capabilities.Dispatcher.RateLimit] GlobalRPS = 800 # Default GlobalBurst = 1000 # Default -PerSenderRPS = 5 # Example -PerSenderBurst = 10 # Example +PerSenderRPS = 10 # Default +PerSenderBurst = 50 # Default ``` @@ -1277,13 +1277,13 @@ GlobalBurst is the global burst limit for the dispatcher. ### PerSenderRPS ```toml -PerSenderRPS = 5 # Example +PerSenderRPS = 10 # Default ``` PerSenderRPS is the per-sender rate limit for the dispatcher. ### PerSenderBurst ```toml -PerSenderBurst = 10 # Example +PerSenderBurst = 50 # Default ``` PerSenderBurst is the per-sender burst limit for the dispatcher. diff --git a/main_test.go b/main_test.go index 81e056e3b84..d26286b7e2c 100644 --- a/main_test.go +++ b/main_test.go @@ -56,7 +56,7 @@ func TestScripts(t *testing.T) { Dir: path, Setup: commonEnv, ContinueOnError: true, - // UpdateScripts: true, // uncomment to update golden files + UpdateScripts: true, // uncomment to update golden files }) }) return nil diff --git a/testdata/scripts/health/default.txtar b/testdata/scripts/health/default.txtar index 777d3e5e126..3fece1014e5 100644 --- a/testdata/scripts/health/default.txtar +++ b/testdata/scripts/health/default.txtar @@ -31,99 +31,4 @@ fj293fbBnlQ!f9vNs HTTPPort = $PORT -- out.txt -- -ok HeadReporter -ok JobSpawner -ok Mailbox.Monitor -ok Mercury.WSRPCPool -ok Mercury.WSRPCPool.CacheSet -ok PipelineORM -ok PipelineRunner -ok PipelineRunner.BridgeCache -ok TelemetryManager - -- out.json -- -{ - "data": [ - { - "type": "checks", - "id": "HeadReporter", - "attributes": { - "name": "HeadReporter", - "status": "passing", - "output": "" - } - }, - { - "type": "checks", - "id": "JobSpawner", - "attributes": { - "name": "JobSpawner", - "status": "passing", - "output": "" - } - }, - { - "type": "checks", - "id": "Mailbox.Monitor", - "attributes": { - "name": "Mailbox.Monitor", - "status": "passing", - "output": "" - } - }, - { - "type": "checks", - "id": "Mercury.WSRPCPool", - "attributes": { - "name": "Mercury.WSRPCPool", - "status": "passing", - "output": "" - } - }, - { - "type": "checks", - "id": "Mercury.WSRPCPool.CacheSet", - "attributes": { - "name": "Mercury.WSRPCPool.CacheSet", - "status": "passing", - "output": "" - } - }, - { - "type": "checks", - "id": "PipelineORM", - "attributes": { - "name": "PipelineORM", - "status": "passing", - "output": "" - } - }, - { - "type": "checks", - "id": "PipelineRunner", - "attributes": { - "name": "PipelineRunner", - "status": "passing", - "output": "" - } - }, - { - "type": "checks", - "id": "PipelineRunner.BridgeCache", - "attributes": { - "name": "PipelineRunner.BridgeCache", - "status": "passing", - "output": "" - } - }, - { - "type": "checks", - "id": "TelemetryManager", - "attributes": { - "name": "TelemetryManager", - "status": "passing", - "output": "" - } - } - ] -} diff --git a/testdata/scripts/node/validate/default.txtar b/testdata/scripts/node/validate/default.txtar index 5db139a61e7..6eaa50d81d4 100644 --- a/testdata/scripts/node/validate/default.txtar +++ b/testdata/scripts/node/validate/default.txtar @@ -271,8 +271,8 @@ ReceiverBufferSize = 10000 [Capabilities.Dispatcher.RateLimit] GlobalRPS = 800.0 GlobalBurst = 1000 -PerSenderRPS = 0.0 -PerSenderBurst = 0 +PerSenderRPS = 10.0 +PerSenderBurst = 50 [Capabilities.ExternalRegistry] Address = '' diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar index c7da39d5d71..fd9a1f49443 100644 --- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar @@ -315,8 +315,8 @@ ReceiverBufferSize = 10000 [Capabilities.Dispatcher.RateLimit] GlobalRPS = 800.0 GlobalBurst = 1000 -PerSenderRPS = 0.0 -PerSenderBurst = 0 +PerSenderRPS = 10.0 +PerSenderBurst = 50 [Capabilities.ExternalRegistry] Address = '' diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar index 9625f93b663..aae6a70ffd9 100644 --- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar @@ -315,8 +315,8 @@ ReceiverBufferSize = 10000 [Capabilities.Dispatcher.RateLimit] GlobalRPS = 800.0 GlobalBurst = 1000 -PerSenderRPS = 0.0 -PerSenderBurst = 0 +PerSenderRPS = 10.0 +PerSenderBurst = 50 [Capabilities.ExternalRegistry] Address = '' diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar index 4656f820675..7f43405a8f1 100644 --- a/testdata/scripts/node/validate/disk-based-logging.txtar +++ b/testdata/scripts/node/validate/disk-based-logging.txtar @@ -315,8 +315,8 @@ ReceiverBufferSize = 10000 [Capabilities.Dispatcher.RateLimit] GlobalRPS = 800.0 GlobalBurst = 1000 -PerSenderRPS = 0.0 -PerSenderBurst = 0 +PerSenderRPS = 10.0 +PerSenderBurst = 50 [Capabilities.ExternalRegistry] Address = '' diff --git a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar index c24a9664927..3b9f1d8170a 100644 --- a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar +++ b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar @@ -300,8 +300,8 @@ ReceiverBufferSize = 10000 [Capabilities.Dispatcher.RateLimit] GlobalRPS = 800.0 GlobalBurst = 1000 -PerSenderRPS = 0.0 -PerSenderBurst = 0 +PerSenderRPS = 10.0 +PerSenderBurst = 50 [Capabilities.ExternalRegistry] Address = '' diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar index 5bf2e70db1d..e98de349304 100644 --- a/testdata/scripts/node/validate/invalid.txtar +++ b/testdata/scripts/node/validate/invalid.txtar @@ -305,8 +305,8 @@ ReceiverBufferSize = 10000 [Capabilities.Dispatcher.RateLimit] GlobalRPS = 800.0 GlobalBurst = 1000 -PerSenderRPS = 0.0 -PerSenderBurst = 0 +PerSenderRPS = 10.0 +PerSenderBurst = 50 [Capabilities.ExternalRegistry] Address = '' diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar index 123b7b3132c..01a740b3c10 100644 --- a/testdata/scripts/node/validate/valid.txtar +++ b/testdata/scripts/node/validate/valid.txtar @@ -312,8 +312,8 @@ ReceiverBufferSize = 10000 [Capabilities.Dispatcher.RateLimit] GlobalRPS = 800.0 GlobalBurst = 1000 -PerSenderRPS = 0.0 -PerSenderBurst = 0 +PerSenderRPS = 10.0 +PerSenderBurst = 50 [Capabilities.ExternalRegistry] Address = '' diff --git a/testdata/scripts/node/validate/warnings.txtar b/testdata/scripts/node/validate/warnings.txtar index 756e768914a..8e4d3af9ac3 100644 --- a/testdata/scripts/node/validate/warnings.txtar +++ b/testdata/scripts/node/validate/warnings.txtar @@ -294,8 +294,8 @@ ReceiverBufferSize = 10000 [Capabilities.Dispatcher.RateLimit] GlobalRPS = 800.0 GlobalBurst = 1000 -PerSenderRPS = 0.0 -PerSenderBurst = 0 +PerSenderRPS = 10.0 +PerSenderBurst = 50 [Capabilities.ExternalRegistry] Address = '' From 72b2b9564ea86c70a3b2e9df52dd14ea0841cc39 Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Wed, 28 Aug 2024 21:02:16 -0400 Subject: [PATCH 09/11] Renames interface methods --- core/capabilities/remote/dispatcher.go | 4 ++-- core/capabilities/remote/dispatcher_test.go | 4 ++-- core/config/dispatcher_config.go | 4 ++-- core/services/chainlink/config_capabilities.go | 4 ++-- main_test.go | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/capabilities/remote/dispatcher.go b/core/capabilities/remote/dispatcher.go index 3618401534e..f27d691bb66 100644 --- a/core/capabilities/remote/dispatcher.go +++ b/core/capabilities/remote/dispatcher.go @@ -52,8 +52,8 @@ func NewDispatcher(cfg config.Dispatcher, peerWrapper p2ptypes.PeerWrapper, sign rl, err := common.NewRateLimiter(common.RateLimiterConfig{ GlobalRPS: cfg.RateLimit().GlobalRPS(), GlobalBurst: cfg.RateLimit().GlobalBurst(), - PerSenderRPS: cfg.RateLimit().RPS(), - PerSenderBurst: cfg.RateLimit().Burst(), + PerSenderRPS: cfg.RateLimit().PerSenderRPS(), + PerSenderBurst: cfg.RateLimit().PerSenderBurst(), }) if err != nil { return nil, errors.Wrap(err, "failed to create rate limiter") diff --git a/core/capabilities/remote/dispatcher_test.go b/core/capabilities/remote/dispatcher_test.go index 5dc09d44f90..50edc5f3530 100644 --- a/core/capabilities/remote/dispatcher_test.go +++ b/core/capabilities/remote/dispatcher_test.go @@ -48,11 +48,11 @@ func (c testRateLimitConfig) GlobalBurst() int { return c.globalBurst } -func (c testRateLimitConfig) RPS() float64 { +func (c testRateLimitConfig) PerSenderRPS() float64 { return c.rps } -func (c testRateLimitConfig) Burst() int { +func (c testRateLimitConfig) PerSenderBurst() int { return c.burst } diff --git a/core/config/dispatcher_config.go b/core/config/dispatcher_config.go index 5076f2582f3..ec6f13e8f4a 100644 --- a/core/config/dispatcher_config.go +++ b/core/config/dispatcher_config.go @@ -3,8 +3,8 @@ package config type DispatcherRateLimit interface { GlobalRPS() float64 GlobalBurst() int - RPS() float64 - Burst() int + PerSenderRPS() float64 + PerSenderBurst() int } type Dispatcher interface { diff --git a/core/services/chainlink/config_capabilities.go b/core/services/chainlink/config_capabilities.go index 22f2230e7f2..734a5ae2701 100644 --- a/core/services/chainlink/config_capabilities.go +++ b/core/services/chainlink/config_capabilities.go @@ -55,11 +55,11 @@ func (r *dispatcherRateLimit) GlobalBurst() int { return *r.r.GlobalBurst } -func (r *dispatcherRateLimit) RPS() float64 { +func (r *dispatcherRateLimit) PerSenderRPS() float64 { return *r.r.PerSenderRPS } -func (r *dispatcherRateLimit) Burst() int { +func (r *dispatcherRateLimit) PerSenderBurst() int { return *r.r.PerSenderBurst } diff --git a/main_test.go b/main_test.go index d26286b7e2c..81e056e3b84 100644 --- a/main_test.go +++ b/main_test.go @@ -56,7 +56,7 @@ func TestScripts(t *testing.T) { Dir: path, Setup: commonEnv, ContinueOnError: true, - UpdateScripts: true, // uncomment to update golden files + // UpdateScripts: true, // uncomment to update golden files }) }) return nil From 8701bfc8fb59b7e3114e4dd2da5da9348f71936c Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Thu, 29 Aug 2024 10:37:19 -0400 Subject: [PATCH 10/11] Fixes CI --- testdata/scripts/health/default.txtar | 109 ++++++++++++++++++++++---- 1 file changed, 95 insertions(+), 14 deletions(-) diff --git a/testdata/scripts/health/default.txtar b/testdata/scripts/health/default.txtar index 3fece1014e5..25dee9efdb1 100644 --- a/testdata/scripts/health/default.txtar +++ b/testdata/scripts/health/default.txtar @@ -1,34 +1,115 @@ -# start node -exec sh -c 'eval "echo \"$(cat config.toml.tmpl)\" > config.toml"' -exec chainlink node -c config.toml start -p password -a creds & - -# initialize client -env NODEURL=http://localhost:$PORT -exec curl --retry 10 --retry-max-time 60 --retry-connrefused $NODEURL -exec chainlink --remote-node-url $NODEURL admin login -file creds --bypass-version-check - -exec chainlink --remote-node-url $NODEURL health cmp stdout out.txt - exec chainlink --remote-node-url $NODEURL health -json cp stdout compact.json exec jq . compact.json cmp stdout out.json - -- testdb.txt -- CL_DATABASE_URL -- testport.txt -- PORT - -- password -- T.tLHkcmwePT/p,]sYuntjwHKAsrhm#4eRs4LuKHwvHejWYAC2JP4M8HimwgmbaZ -- creds -- notreal@fakeemail.ch fj293fbBnlQ!f9vNs - -- config.toml.tmpl -- [Webserver] HTTPPort = $PORT -- out.txt -- +ok HeadReporter +ok JobSpawner +ok Mailbox.Monitor +ok Mercury.WSRPCPool +ok Mercury.WSRPCPool.CacheSet +ok PipelineORM +ok PipelineRunner +ok PipelineRunner.BridgeCache +ok TelemetryManager + -- out.json -- +{ + "data": [ + { + "type": "checks", + "id": "HeadReporter", + "attributes": { + "name": "HeadReporter", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "JobSpawner", + "attributes": { + "name": "JobSpawner", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Mailbox.Monitor", + "attributes": { + "name": "Mailbox.Monitor", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Mercury.WSRPCPool", + "attributes": { + "name": "Mercury.WSRPCPool", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Mercury.WSRPCPool.CacheSet", + "attributes": { + "name": "Mercury.WSRPCPool.CacheSet", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "PipelineORM", + "attributes": { + "name": "PipelineORM", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "PipelineRunner", + "attributes": { + "name": "PipelineRunner", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "PipelineRunner.BridgeCache", + "attributes": { + "name": "PipelineRunner.BridgeCache", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "TelemetryManager", + "attributes": { + "name": "TelemetryManager", + "status": "passing", + "output": "" + } + } + ] +} \ No newline at end of file From bce02c3b342dfdf8badd0957f0925b8cfa50fc5e Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Thu, 29 Aug 2024 10:44:31 -0400 Subject: [PATCH 11/11] Fixes CI --- testdata/scripts/health/default.txtar | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/testdata/scripts/health/default.txtar b/testdata/scripts/health/default.txtar index 25dee9efdb1..8480345e273 100644 --- a/testdata/scripts/health/default.txtar +++ b/testdata/scripts/health/default.txtar @@ -1,17 +1,31 @@ +# start node +exec sh -c 'eval "echo \"$(cat config.toml.tmpl)\" > config.toml"' +exec chainlink node -c config.toml start -p password -a creds & + +# initialize client +env NODEURL=http://localhost:$PORT +exec curl --retry 10 --retry-max-time 60 --retry-connrefused $NODEURL +exec chainlink --remote-node-url $NODEURL admin login -file creds --bypass-version-check + +exec chainlink --remote-node-url $NODEURL health cmp stdout out.txt + exec chainlink --remote-node-url $NODEURL health -json cp stdout compact.json exec jq . compact.json cmp stdout out.json + -- testdb.txt -- CL_DATABASE_URL -- testport.txt -- PORT + -- password -- T.tLHkcmwePT/p,]sYuntjwHKAsrhm#4eRs4LuKHwvHejWYAC2JP4M8HimwgmbaZ -- creds -- notreal@fakeemail.ch fj293fbBnlQ!f9vNs + -- config.toml.tmpl -- [Webserver] HTTPPort = $PORT