Skip to content

Commit

Permalink
Global JetStream API request queue limiting (#5900)
Browse files Browse the repository at this point in the history
This should provide some protection against the server being completely
overwhelmed by a build-up of JS API requests.

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
derekcollison committed Sep 19, 2024
2 parents 0d285e8 + 1b7997d commit 9870b6a
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 92 deletions.
4 changes: 4 additions & 0 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type jetStream struct {
storeReserved int64
memUsed int64
storeUsed int64
queueLimit int64
clustered int32
mu sync.RWMutex
srv *Server
Expand Down Expand Up @@ -416,6 +417,9 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error {
}
s.gcbMu.Unlock()

// TODO: Not currently reloadable.
atomic.StoreInt64(&js.queueLimit, s.getOpts().JetStreamRequestQueueLimit)

s.js.Store(js)

// FIXME(dlc) - Allow memory only operation?
Expand Down
25 changes: 22 additions & 3 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ const (
// JSAdvisoryServerRemoved notification that a server has been removed from the system.
JSAdvisoryServerRemoved = "$JS.EVENT.ADVISORY.SERVER.REMOVED"

// JSAdvisoryAPILimitReached notification that a server has reached the JS API hard limit.
JSAdvisoryAPILimitReached = "$JS.EVENT.ADVISORY.API.LIMIT_REACHED"

// JSAuditAdvisory is a notification about JetStream API access.
// FIXME - Add in details about who..
JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
Expand Down Expand Up @@ -354,6 +357,10 @@ const JSMaxMetadataLen = 128 * 1024
// Picked 255 as it seems to be a widely used file name limit
const JSMaxNameLen = 255

// JSDefaultRequestQueueLimit is the default number of entries that we will
// put on the global request queue before we react.
const JSDefaultRequestQueueLimit = 10_000

// Responses for API calls.

// ApiResponse is a standard response from the JetStream JSON API
Expand Down Expand Up @@ -862,10 +869,22 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
// Copy the state. Note the JSAPI only uses the hdr index to piece apart the
// header from the msg body. No other references are needed.
// Check pending and warn if getting backed up.
const warnThresh = 128
pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
if pending >= warnThresh {
s.rateLimitFormatWarnf("JetStream request queue has high pending count: %d", pending)
limit := atomic.LoadInt64(&js.queueLimit)
if pending >= int(limit) {
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
s.jsAPIRoutedReqs.drain()

s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
TypedEvent: TypedEvent{
Type: JSAPILimitReachedAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Server: s.Name(),
Domain: js.config.Domain,
Dropped: int64(pending),
})
}
}

Expand Down
71 changes: 71 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3832,3 +3832,74 @@ func TestJetStreamClusterAckDeleted(t *testing.T) {
)
}
}

func TestJetStreamClusterAPILimitDefault(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

for _, s := range c.servers {
s.optsMu.RLock()
lim := s.opts.JetStreamRequestQueueLimit
s.optsMu.RUnlock()

require_Equal(t, lim, JSDefaultRequestQueueLimit)
require_Equal(t, atomic.LoadInt64(&s.getJetStream().queueLimit), JSDefaultRequestQueueLimit)
}
}

func TestJetStreamClusterAPILimitAdvisory(t *testing.T) {
// Hit the limit straight away.
const queueLimit = 1

config := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {
max_mem_store: 256MB
max_file_store: 2GB
store_dir: '%s'
request_queue_limit: ` + fmt.Sprintf("%d", queueLimit) + `
}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`
c := createJetStreamClusterWithTemplate(t, config, "R3S", 3)
defer c.shutdown()

c.waitOnLeader()
s := c.randomNonLeader()

for _, s := range c.servers {
lim := atomic.LoadInt64(&s.getJetStream().queueLimit)
require_Equal(t, lim, queueLimit)
}

nc, _ := jsClientConnect(t, s)
defer nc.Close()

snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()

sub, err := snc.SubscribeSync(JSAdvisoryAPILimitReached)
require_NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

require_NoError(t, nc.PublishMsg(&nats.Msg{
Subject: fmt.Sprintf(JSApiConsumerListT, "TEST"),
Reply: nc.NewInbox(),
}))

// Wait for the advisory to come in.
msg, err := sub.NextMsgWithContext(ctx)
require_NoError(t, err)
var advisory JSAPILimitReachedAdvisory
require_NoError(t, json.Unmarshal(msg.Data, &advisory))
require_Equal(t, advisory.Domain, _EMPTY_) // No JetStream domain was set.
require_Equal(t, advisory.Dropped, queueLimit) // Configured queue limit.
}
11 changes: 11 additions & 0 deletions server/jetstream_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,14 @@ type JSServerRemovedAdvisory struct {
Cluster string `json:"cluster"`
Domain string `json:"domain,omitempty"`
}

// JSAPILimitReachedAdvisoryType is sent when the JS API request queue limit is reached.
const JSAPILimitReachedAdvisoryType = "io.nats.jetstream.advisory.v1.api_limit_reached"

// JSAPILimitReachedAdvisory is a advisory published when JetStream hits the queue length limit.
type JSAPILimitReachedAdvisory struct {
TypedEvent
Server string `json:"server"` // Server that created the event, name or ID
Domain string `json:"domain,omitempty"` // Domain the server belongs to
Dropped int64 `json:"dropped"` // How many messages did we drop from the queue
}
176 changes: 93 additions & 83 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,89 +277,90 @@ type AuthCallout struct {
// NOTE: This structure is no longer used for monitoring endpoints
// and json tags are deprecated and may be removed in the future.
type Options struct {
ConfigFile string `json:"-"`
ServerName string `json:"server_name"`
Host string `json:"addr"`
Port int `json:"port"`
DontListen bool `json:"dont_listen"`
ClientAdvertise string `json:"-"`
Trace bool `json:"-"`
Debug bool `json:"-"`
TraceVerbose bool `json:"-"`
NoLog bool `json:"-"`
NoSigs bool `json:"-"`
NoSublistCache bool `json:"-"`
NoHeaderSupport bool `json:"-"`
DisableShortFirstPing bool `json:"-"`
Logtime bool `json:"-"`
LogtimeUTC bool `json:"-"`
MaxConn int `json:"max_connections"`
MaxSubs int `json:"max_subscriptions,omitempty"`
MaxSubTokens uint8 `json:"-"`
Nkeys []*NkeyUser `json:"-"`
Users []*User `json:"-"`
Accounts []*Account `json:"-"`
NoAuthUser string `json:"-"`
SystemAccount string `json:"-"`
NoSystemAccount bool `json:"-"`
Username string `json:"-"`
Password string `json:"-"`
Authorization string `json:"-"`
AuthCallout *AuthCallout `json:"-"`
PingInterval time.Duration `json:"ping_interval"`
MaxPingsOut int `json:"ping_max"`
HTTPHost string `json:"http_host"`
HTTPPort int `json:"http_port"`
HTTPBasePath string `json:"http_base_path"`
HTTPSPort int `json:"https_port"`
AuthTimeout float64 `json:"auth_timeout"`
MaxControlLine int32 `json:"max_control_line"`
MaxPayload int32 `json:"max_payload"`
MaxPending int64 `json:"max_pending"`
Cluster ClusterOpts `json:"cluster,omitempty"`
Gateway GatewayOpts `json:"gateway,omitempty"`
LeafNode LeafNodeOpts `json:"leaf,omitempty"`
JetStream bool `json:"jetstream"`
JetStreamMaxMemory int64 `json:"-"`
JetStreamMaxStore int64 `json:"-"`
JetStreamDomain string `json:"-"`
JetStreamExtHint string `json:"-"`
JetStreamKey string `json:"-"`
JetStreamOldKey string `json:"-"`
JetStreamCipher StoreCipher `json:"-"`
JetStreamUniqueTag string
JetStreamLimits JSLimitOpts
JetStreamTpm JSTpmOpts
JetStreamMaxCatchup int64
StreamMaxBufferedMsgs int `json:"-"`
StreamMaxBufferedSize int64 `json:"-"`
StoreDir string `json:"-"`
SyncInterval time.Duration `json:"-"`
SyncAlways bool `json:"-"`
JsAccDefaultDomain map[string]string `json:"-"` // account to domain name mapping
Websocket WebsocketOpts `json:"-"`
MQTT MQTTOpts `json:"-"`
ProfPort int `json:"-"`
ProfBlockRate int `json:"-"`
PidFile string `json:"-"`
PortsFileDir string `json:"-"`
LogFile string `json:"-"`
LogSizeLimit int64 `json:"-"`
LogMaxFiles int64 `json:"-"`
Syslog bool `json:"-"`
RemoteSyslog string `json:"-"`
Routes []*url.URL `json:"-"`
RoutesStr string `json:"-"`
TLSTimeout float64 `json:"tls_timeout"`
TLS bool `json:"-"`
TLSVerify bool `json:"-"`
TLSMap bool `json:"-"`
TLSCert string `json:"-"`
TLSKey string `json:"-"`
TLSCaCert string `json:"-"`
TLSConfig *tls.Config `json:"-"`
TLSPinnedCerts PinnedCertSet `json:"-"`
TLSRateLimit int64 `json:"-"`
ConfigFile string `json:"-"`
ServerName string `json:"server_name"`
Host string `json:"addr"`
Port int `json:"port"`
DontListen bool `json:"dont_listen"`
ClientAdvertise string `json:"-"`
Trace bool `json:"-"`
Debug bool `json:"-"`
TraceVerbose bool `json:"-"`
NoLog bool `json:"-"`
NoSigs bool `json:"-"`
NoSublistCache bool `json:"-"`
NoHeaderSupport bool `json:"-"`
DisableShortFirstPing bool `json:"-"`
Logtime bool `json:"-"`
LogtimeUTC bool `json:"-"`
MaxConn int `json:"max_connections"`
MaxSubs int `json:"max_subscriptions,omitempty"`
MaxSubTokens uint8 `json:"-"`
Nkeys []*NkeyUser `json:"-"`
Users []*User `json:"-"`
Accounts []*Account `json:"-"`
NoAuthUser string `json:"-"`
SystemAccount string `json:"-"`
NoSystemAccount bool `json:"-"`
Username string `json:"-"`
Password string `json:"-"`
Authorization string `json:"-"`
AuthCallout *AuthCallout `json:"-"`
PingInterval time.Duration `json:"ping_interval"`
MaxPingsOut int `json:"ping_max"`
HTTPHost string `json:"http_host"`
HTTPPort int `json:"http_port"`
HTTPBasePath string `json:"http_base_path"`
HTTPSPort int `json:"https_port"`
AuthTimeout float64 `json:"auth_timeout"`
MaxControlLine int32 `json:"max_control_line"`
MaxPayload int32 `json:"max_payload"`
MaxPending int64 `json:"max_pending"`
Cluster ClusterOpts `json:"cluster,omitempty"`
Gateway GatewayOpts `json:"gateway,omitempty"`
LeafNode LeafNodeOpts `json:"leaf,omitempty"`
JetStream bool `json:"jetstream"`
JetStreamMaxMemory int64 `json:"-"`
JetStreamMaxStore int64 `json:"-"`
JetStreamDomain string `json:"-"`
JetStreamExtHint string `json:"-"`
JetStreamKey string `json:"-"`
JetStreamOldKey string `json:"-"`
JetStreamCipher StoreCipher `json:"-"`
JetStreamUniqueTag string
JetStreamLimits JSLimitOpts
JetStreamTpm JSTpmOpts
JetStreamMaxCatchup int64
JetStreamRequestQueueLimit int64
StreamMaxBufferedMsgs int `json:"-"`
StreamMaxBufferedSize int64 `json:"-"`
StoreDir string `json:"-"`
SyncInterval time.Duration `json:"-"`
SyncAlways bool `json:"-"`
JsAccDefaultDomain map[string]string `json:"-"` // account to domain name mapping
Websocket WebsocketOpts `json:"-"`
MQTT MQTTOpts `json:"-"`
ProfPort int `json:"-"`
ProfBlockRate int `json:"-"`
PidFile string `json:"-"`
PortsFileDir string `json:"-"`
LogFile string `json:"-"`
LogSizeLimit int64 `json:"-"`
LogMaxFiles int64 `json:"-"`
Syslog bool `json:"-"`
RemoteSyslog string `json:"-"`
Routes []*url.URL `json:"-"`
RoutesStr string `json:"-"`
TLSTimeout float64 `json:"tls_timeout"`
TLS bool `json:"-"`
TLSVerify bool `json:"-"`
TLSMap bool `json:"-"`
TLSCert string `json:"-"`
TLSKey string `json:"-"`
TLSCaCert string `json:"-"`
TLSConfig *tls.Config `json:"-"`
TLSPinnedCerts PinnedCertSet `json:"-"`
TLSRateLimit int64 `json:"-"`
// When set to true, the server will perform the TLS handshake before
// sending the INFO protocol. For clients that are not configured
// with a similar option, their connection will fail with some sort
Expand Down Expand Up @@ -2417,6 +2418,12 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er
return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)}
}
opts.StreamMaxBufferedMsgs = int(mlen)
case "request_queue_limit":
lim, ok := mv.(int64)
if !ok {
return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)}
}
opts.JetStreamRequestQueueLimit = lim
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down Expand Up @@ -5604,6 +5611,9 @@ func setBaselineOptions(opts *Options) {
if opts.SyncInterval == 0 && !opts.syncSet {
opts.SyncInterval = defaultSyncInterval
}
if opts.JetStreamRequestQueueLimit <= 0 {
opts.JetStreamRequestQueueLimit = JSDefaultRequestQueueLimit
}
}

func getDefaultAuthTimeout(tls *tls.Config, tlsTimeout float64) float64 {
Expand Down
13 changes: 7 additions & 6 deletions server/opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ func TestDefaultOptions(t *testing.T) {
LeafNode: LeafNodeOpts{
ReconnectInterval: DEFAULT_LEAF_NODE_RECONNECT,
},
ConnectErrorReports: DEFAULT_CONNECT_ERROR_REPORTS,
ReconnectErrorReports: DEFAULT_RECONNECT_ERROR_REPORTS,
MaxTracedMsgLen: 0,
JetStreamMaxMemory: -1,
JetStreamMaxStore: -1,
SyncInterval: 2 * time.Minute,
ConnectErrorReports: DEFAULT_CONNECT_ERROR_REPORTS,
ReconnectErrorReports: DEFAULT_RECONNECT_ERROR_REPORTS,
MaxTracedMsgLen: 0,
JetStreamMaxMemory: -1,
JetStreamMaxStore: -1,
SyncInterval: 2 * time.Minute,
JetStreamRequestQueueLimit: JSDefaultRequestQueueLimit,
}

opts := &Options{}
Expand Down

0 comments on commit 9870b6a

Please sign in to comment.