Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KS-420] Add rate-limiting to Dispatcher #14239

Merged
merged 18 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions core/capabilities/remote/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ package remote

import (
"context"
"errors"
"fmt"
sync "sync"
"sync"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

Expand All @@ -25,11 +27,13 @@ var (

// dispatcher en/decodes messages and routes traffic between peers and capabilities
type dispatcher struct {
cfg config.Dispatcher
peerWrapper p2ptypes.PeerWrapper
peer p2ptypes.Peer
peerID p2ptypes.PeerID
signer p2ptypes.Signer
registry core.CapabilitiesRegistry
rateLimiter *common.RateLimiter
receivers map[key]*receiver
mu sync.RWMutex
stopCh services.StopChan
Expand All @@ -44,17 +48,26 @@ type key struct {

var _ services.Service = &dispatcher{}

const supportedVersion = 1

func NewDispatcher(peerWrapper p2ptypes.PeerWrapper, signer p2ptypes.Signer, registry core.CapabilitiesRegistry, lggr logger.Logger) *dispatcher {
func NewDispatcher(cfg config.Dispatcher, peerWrapper p2ptypes.PeerWrapper, signer p2ptypes.Signer, registry core.CapabilitiesRegistry, lggr logger.Logger) (*dispatcher, error) {
rl, err := common.NewRateLimiter(common.RateLimiterConfig{
GlobalRPS: cfg.RateLimit().GlobalRPS(),
GlobalBurst: cfg.RateLimit().GlobalBurst(),
PerSenderRPS: cfg.RateLimit().RPS(),
PerSenderBurst: cfg.RateLimit().Burst(),
})
if err != nil {
return nil, errors.Wrap(err, "failed to create rate limiter")
}
return &dispatcher{
cfg: cfg,
peerWrapper: peerWrapper,
signer: signer,
registry: registry,
rateLimiter: rl,
receivers: make(map[key]*receiver),
stopCh: make(services.StopChan),
lggr: lggr.Named("Dispatcher"),
}
}, nil
}

func (d *dispatcher) Start(ctx context.Context) error {
Expand Down Expand Up @@ -85,14 +98,12 @@ var capReceiveChannelUsage = promauto.NewGaugeVec(prometheus.GaugeOpts{
Help: "The usage of the receive channel for each capability, 0 indicates empty, 1 indicates full.",
}, []string{"capabilityId", "donId"})

const receiverBufferSize = 10000

type receiver struct {
cancel context.CancelFunc
ch chan *remotetypes.MessageBody
ch chan *types.MessageBody
}

func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec remotetypes.Receiver) error {
func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec types.Receiver) error {
d.mu.Lock()
defer d.mu.Unlock()
k := key{capabilityId, donId}
Expand All @@ -101,7 +112,7 @@ func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec remotety
return fmt.Errorf("%w: receiver already exists for capability %s and don %d", ErrReceiverExists, capabilityId, donId)
}

receiverCh := make(chan *remotetypes.MessageBody, receiverBufferSize)
receiverCh := make(chan *types.MessageBody, d.cfg.ReceiverBufferSize())

ctx, cancelCtx := d.stopCh.NewCtx()
d.wg.Add(1)
Expand Down Expand Up @@ -139,8 +150,8 @@ func (d *dispatcher) RemoveReceiver(capabilityId string, donId uint32) {
}
}

func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error {
msgBody.Version = supportedVersion
func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *types.MessageBody) error {
msgBody.Version = uint32(d.cfg.SupportedVersion())
msgBody.Sender = d.peerID[:]
msgBody.Receiver = peerID[:]
msgBody.Timestamp = time.Now().UnixMilli()
Expand All @@ -152,7 +163,7 @@ func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBo
if err != nil {
return err
}
msg := &remotetypes.Message{Signature: signature, Body: rawBody}
msg := &types.Message{Signature: signature, Body: rawBody}
rawMsg, err := proto.Marshal(msg)
if err != nil {
return err
Expand All @@ -168,6 +179,11 @@ func (d *dispatcher) receive() {
d.lggr.Info("stopped - exiting receive")
return
case msg := <-recvCh:
if !d.rateLimiter.Allow(msg.Sender.String()) {
d.lggr.Debugw("rate limit exceeded, dropping message", "sender", msg.Sender)
// TODO: do we just drop the message, or do we need to respond with an error?
vyzaldysanchez marked this conversation as resolved.
Show resolved Hide resolved
continue
}
body, err := ValidateMessage(msg, d.peerID)
if err != nil {
d.lggr.Debugw("received invalid message", "error", err)
Expand All @@ -184,7 +200,7 @@ func (d *dispatcher) receive() {
continue
}

receiverQueueUsage := float64(len(receiver.ch)) / receiverBufferSize
receiverQueueUsage := float64(len(receiver.ch)) / float64(d.cfg.ReceiverBufferSize())
capReceiveChannelUsage.WithLabelValues(k.capId, fmt.Sprint(k.donId)).Set(receiverQueueUsage)
select {
case receiver.ch <- body:
Expand All @@ -195,7 +211,7 @@ func (d *dispatcher) receive() {
}
}

func (d *dispatcher) tryRespondWithError(peerID p2ptypes.PeerID, body *remotetypes.MessageBody, errType types.Error) {
func (d *dispatcher) tryRespondWithError(peerID p2ptypes.PeerID, body *types.MessageBody, errType types.Error) {
if body == nil {
return
}
Expand Down
80 changes: 76 additions & 4 deletions core/capabilities/remote/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
Expand All @@ -32,6 +33,47 @@ func (r *testReceiver) Receive(_ context.Context, msg *remotetypes.MessageBody)
r.ch <- msg
}

type testRateLimitConfig struct {
globalRPS float64
globalBurst int
rps float64
burst int
}

func (c testRateLimitConfig) GlobalRPS() float64 {
return c.globalRPS
}

func (c testRateLimitConfig) GlobalBurst() int {
return c.globalBurst
}

func (c testRateLimitConfig) RPS() float64 {
return c.rps
}

func (c testRateLimitConfig) Burst() int {
return c.burst
}

type testConfig struct {
supportedVersion int
receiverBufferSize int
rateLimit testRateLimitConfig
}

func (c testConfig) SupportedVersion() int {
return c.supportedVersion
}

func (c testConfig) ReceiverBufferSize() int {
return c.receiverBufferSize
}

func (c testConfig) RateLimit() config.DispatcherRateLimit {
return c.rateLimit
}

func TestDispatcher_CleanStartClose(t *testing.T) {
lggr := logger.TestLogger(t)
ctx := testutils.Context(t)
Expand All @@ -44,7 +86,17 @@ func TestDispatcher_CleanStartClose(t *testing.T) {
signer := mocks.NewSigner(t)
registry := commonMocks.NewCapabilitiesRegistry(t)

dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr)
dispatcher, err := remote.NewDispatcher(testConfig{
supportedVersion: 1,
receiverBufferSize: 10000,
rateLimit: testRateLimitConfig{
globalRPS: 3.0,
globalBurst: 3,
rps: 1.0,
burst: 2,
},
}, wrapper, signer, registry, lggr)
require.NoError(t, err)
require.NoError(t, dispatcher.Start(ctx))
require.NoError(t, dispatcher.Close())
}
Expand All @@ -65,11 +117,21 @@ func TestDispatcher_Receive(t *testing.T) {
signer.On("Sign", mock.Anything).Return(nil, errors.New("not implemented"))
registry := commonMocks.NewCapabilitiesRegistry(t)

dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr)
dispatcher, err := remote.NewDispatcher(testConfig{
supportedVersion: 1,
receiverBufferSize: 10000,
rateLimit: testRateLimitConfig{
globalRPS: 3.0,
globalBurst: 3,
rps: 1.0,
burst: 2,
},
}, wrapper, signer, registry, lggr)
require.NoError(t, err)
require.NoError(t, dispatcher.Start(ctx))

rcv := newReceiver()
err := dispatcher.SetReceiver(capId1, donId1, rcv)
err = dispatcher.SetReceiver(capId1, donId1, rcv)
require.NoError(t, err)

// supported capability
Expand Down Expand Up @@ -113,7 +175,17 @@ func TestDispatcher_RespondWithError(t *testing.T) {
signer.On("Sign", mock.Anything).Return([]byte{}, nil)
registry := commonMocks.NewCapabilitiesRegistry(t)

dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr)
dispatcher, err := remote.NewDispatcher(testConfig{
supportedVersion: 1,
receiverBufferSize: 10000,
rateLimit: testRateLimitConfig{
globalRPS: 3.0,
globalBurst: 3,
rps: 1.0,
burst: 2,
},
}, wrapper, signer, registry, lggr)
require.NoError(t, err)
require.NoError(t, dispatcher.Start(ctx))

// unknown capability
Expand Down
1 change: 1 addition & 0 deletions core/config/capabilities_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ type CapabilitiesExternalRegistry interface {

type Capabilities interface {
Peering() P2P
Dispatcher() Dispatcher
ExternalRegistry() CapabilitiesExternalRegistry
}
14 changes: 14 additions & 0 deletions core/config/dispatcher_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package config

type DispatcherRateLimit interface {
GlobalRPS() float64
GlobalBurst() int
RPS() float64
Burst() int
vyzaldysanchez marked this conversation as resolved.
Show resolved Hide resolved
}

type Dispatcher interface {
SupportedVersion() int
ReceiverBufferSize() int
RateLimit() DispatcherRateLimit
}
41 changes: 41 additions & 0 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1434,8 +1434,49 @@ func (r *ExternalRegistry) setFrom(f *ExternalRegistry) {
}
}

type Dispatcher struct {
SupportedVersion *int
ReceiverBufferSize *int
RateLimit DispatcherRateLimit
}

func (d *Dispatcher) setFrom(f *Dispatcher) {
d.RateLimit.setFrom(&f.RateLimit)

if f.ReceiverBufferSize != nil {
d.ReceiverBufferSize = f.ReceiverBufferSize
}

if f.SupportedVersion != nil {
d.SupportedVersion = f.SupportedVersion
}
}

type DispatcherRateLimit struct {
GlobalRPS *float64
GlobalBurst *int
RPS *float64
vyzaldysanchez marked this conversation as resolved.
Show resolved Hide resolved
Burst *int
}

func (drl *DispatcherRateLimit) setFrom(f *DispatcherRateLimit) {
if f.GlobalRPS != nil {
drl.GlobalRPS = f.GlobalRPS
}
if f.GlobalBurst != nil {
drl.GlobalBurst = f.GlobalBurst
}
if f.RPS != nil {
drl.RPS = f.RPS
}
if f.Burst != nil {
drl.Burst = f.Burst
}
}

type Capabilities struct {
Peering P2P `toml:",omitempty"`
Dispatcher Dispatcher `toml:"omitempty"`
ExternalRegistry ExternalRegistry `toml:",omitempty"`
}

Expand Down
5 changes: 4 additions & 1 deletion core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), opts.DS, globalLogger)
signer := externalPeer
externalPeerWrapper = externalPeer
remoteDispatcher := remote.NewDispatcher(externalPeerWrapper, signer, opts.CapabilitiesRegistry, globalLogger)
remoteDispatcher, err := remote.NewDispatcher(cfg.Capabilities().Dispatcher(), externalPeerWrapper, signer, opts.CapabilitiesRegistry, globalLogger)
if err != nil {
return nil, fmt.Errorf("could not create dispatcher: %w", err)
}
dispatcher = remoteDispatcher
} else {
dispatcher = opts.CapabilitiesDispatcher
Expand Down
Loading