Skip to content

Commit

Permalink
rpc: backport experimental buffer size control parameters from #7230 …
Browse files Browse the repository at this point in the history
…(tm v0.35.x) (#7276)

* Update error message to correspond to changes in v0.34.x
* Add buffer size and client-close config parameters

Signed-off-by: Thane Thomson <[email protected]>
(cherry picked from commit 035da42)
  • Loading branch information
thanethomson authored and lklimek committed Mar 25, 2022
1 parent ef5886e commit b4e6535
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Special thanks to external contributors on this release:

- CLI/RPC/Config

- [config] \#7276 rpc: Add experimental config params to allow for subscription buffer size control (@thanethomson).

- Apps

- P2P Protocol
Expand Down
40 changes: 40 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ var (

defaultNodeKeyPath = filepath.Join(defaultConfigDir, defaultNodeKeyName)
defaultAddrBookPath = filepath.Join(defaultConfigDir, defaultAddrBookName)

minSubscriptionBufferSize = 100
defaultSubscriptionBufferSize = 200
)

// Config defines the top level configuration for a Tendermint node
Expand Down Expand Up @@ -533,6 +536,29 @@ type RPCConfig struct {
// to the estimated maximum number of broadcast_tx_commit calls per block.
MaxSubscriptionsPerClient int `mapstructure:"max-subscriptions-per-client"`

// The number of events that can be buffered per subscription before
// returning `ErrOutOfCapacity`.
SubscriptionBufferSize int `mapstructure:"experimental-subscription-buffer-size"`

// The maximum number of responses that can be buffered per WebSocket
// client. If clients cannot read from the WebSocket endpoint fast enough,
// they will be disconnected, so increasing this parameter may reduce the
// chances of them being disconnected (but will cause the node to use more
// memory).
//
// Must be at least the same as `SubscriptionBufferSize`, otherwise
// connections may be dropped unnecessarily.
WebSocketWriteBufferSize int `mapstructure:"experimental-websocket-write-buffer-size"`

// If a WebSocket client cannot read fast enough, at present we may
// silently drop events instead of generating an error or disconnecting the
// client.
//
// Enabling this parameter will cause the WebSocket connection to be closed
// instead if it cannot read fast enough, allowing for greater
// predictability in subscription behavior.
CloseOnSlowClient bool `mapstructure:"experimental-close-on-slow-client"`

// How long to wait for a tx to be committed during /broadcast_tx_commit
// WARNING: Using a value larger than 10s will result in increasing the
// global HTTP write timeout, which applies to all connections and endpoints.
Expand Down Expand Up @@ -582,7 +608,9 @@ func DefaultRPCConfig() *RPCConfig {

MaxSubscriptionClients: 100,
MaxSubscriptionsPerClient: 5,
SubscriptionBufferSize: defaultSubscriptionBufferSize,
TimeoutBroadcastTxCommit: 10 * time.Second,
WebSocketWriteBufferSize: defaultSubscriptionBufferSize,

MaxBodyBytes: int64(1000000), // 1MB
MaxHeaderBytes: 1 << 20, // same as the net/http default
Expand Down Expand Up @@ -616,6 +644,18 @@ func (cfg *RPCConfig) ValidateBasic() error {
if cfg.MaxSubscriptionsPerClient < 0 {
return errors.New("max-subscriptions-per-client can't be negative")
}
if cfg.SubscriptionBufferSize < minSubscriptionBufferSize {
return fmt.Errorf(
"experimental-subscription-buffer-size must be >= %d",
minSubscriptionBufferSize,
)
}
if cfg.WebSocketWriteBufferSize < cfg.SubscriptionBufferSize {
return fmt.Errorf(
"experimental-websocket-write-buffer-size must be >= experimental-subscription-buffer-size (%d)",
cfg.SubscriptionBufferSize,
)
}
if cfg.TimeoutBroadcastTxCommit < 0 {
return errors.New("timeout-broadcast-tx-commit can't be negative")
}
Expand Down
27 changes: 27 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,33 @@ max-subscription-clients = {{ .RPC.MaxSubscriptionClients }}
# the estimated # maximum number of broadcast_tx_commit calls per block.
max-subscriptions-per-client = {{ .RPC.MaxSubscriptionsPerClient }}
# Experimental parameter to specify the maximum number of events a node will
# buffer, per subscription, before returning an error and closing the
# subscription. Must be set to at least 100, but higher values will accommodate
# higher event throughput rates (and will use more memory).
experimental-subscription-buffer-size = {{ .RPC.SubscriptionBufferSize }}
# Experimental parameter to specify the maximum number of RPC responses that
# can be buffered per WebSocket client. If clients cannot read from the
# WebSocket endpoint fast enough, they will be disconnected, so increasing this
# parameter may reduce the chances of them being disconnected (but will cause
# the node to use more memory).
#
# Must be at least the same as "experimental-subscription-buffer-size",
# otherwise connections could be dropped unnecessarily. This value should
# ideally be somewhat higher than "experimental-subscription-buffer-size" to
# accommodate non-subscription-related RPC responses.
experimental-websocket-write-buffer-size = {{ .RPC.WebSocketWriteBufferSize }}
# If a WebSocket client cannot read fast enough, at present we may
# silently drop events instead of generating an error or disconnecting the
# client.
#
# Enabling this experimental parameter will cause the WebSocket connection to
# be closed instead if it cannot read fast enough, allowing for greater
# predictability in subscription behavior.
experimental-close-on-slow-client = {{ .RPC.CloseOnSlowClient }}
# How long to wait for a tx to be committed during /broadcast_tx_commit.
# WARNING: Using a value larger than 10s will result in increasing the
# global HTTP write timeout, which applies to all connections and endpoints.
Expand Down
19 changes: 15 additions & 4 deletions internal/rpc/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ import (
)

const (
// Buffer on the Tendermint (server) side to allow some slowness in clients.
subBufferSize = 100

// maxQueryLength is the maximum length of a query string that will be
// accepted. This is just a safety check to avoid outlandish queries.
maxQueryLength = 512
Expand Down Expand Up @@ -44,11 +41,13 @@ func (env *Environment) Subscribe(ctx *rpctypes.Context, query string) (*coretyp
subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout)
defer cancel()

sub, err := env.EventBus.Subscribe(subCtx, addr, q, subBufferSize)
sub, err := env.EventBus.Subscribe(subCtx, addr, q, env.Config.SubscriptionBufferSize)
if err != nil {
return nil, err
}

closeIfSlow := env.Config.CloseOnSlowClient

// Capture the current ID, since it can change in the future.
subscriptionID := ctx.JSONReq.ID
go func() {
Expand All @@ -64,6 +63,18 @@ func (env *Environment) Subscribe(ctx *rpctypes.Context, query string) (*coretyp
if err := ctx.WSConn.WriteRPCResponse(writeCtx, resp); err != nil {
env.Logger.Info("Can't write response (slow client)",
"to", addr, "subscriptionID", subscriptionID, "err", err)

if closeIfSlow {
var (
err = errors.New("subscription was canceled (reason: slow client)")
resp = rpctypes.RPCServerError(subscriptionID, err)
)
if !ctx.WSConn.TryWriteRPCResponse(resp) {
env.Logger.Info("Can't write response (slow client)",
"to", addr, "subscriptionID", subscriptionID, "err", err)
}
return
}
}
case <-sub.Canceled():
if sub.Err() != tmpubsub.ErrUnsubscribed {
Expand Down
2 changes: 1 addition & 1 deletion libs/pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var (

// ErrOutOfCapacity is returned by Err when a client is not pulling messages
// fast enough. Note the client's subscription will be terminated.
ErrOutOfCapacity = errors.New("client is not pulling messages fast enough")
ErrOutOfCapacity = errors.New("internal subscription event buffer is out of capacity")
)

// A Subscription represents a client subscription for a particular query and
Expand Down
1 change: 1 addition & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) {
}
}),
rpcserver.ReadLimit(cfg.MaxBodyBytes),
rpcserver.WriteChanCapacity(n.config.RPC.WebSocketWriteBufferSize),
)
wm.SetLogger(wmLogger)
mux.HandleFunc("/websocket", wm.WebsocketHandler)
Expand Down

0 comments on commit b4e6535

Please sign in to comment.