From 1b7997d235cc0b1bd83d61f8b012d05131527834 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 19 Sep 2024 12:18:08 +0100 Subject: [PATCH] Global JetStream API request queue limiting Signed-off-by: Neil Twigg --- server/jetstream.go | 4 + server/jetstream_api.go | 25 +++- server/jetstream_cluster_4_test.go | 71 ++++++++++++ server/jetstream_events.go | 11 ++ server/opts.go | 176 +++++++++++++++-------------- server/opts_test.go | 13 ++- 6 files changed, 208 insertions(+), 92 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index ac7c8a1e11f..a83ded83a36 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -107,6 +107,7 @@ type jetStream struct { storeReserved int64 memUsed int64 storeUsed int64 + queueLimit int64 clustered int32 mu sync.RWMutex srv *Server @@ -416,6 +417,9 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { } s.gcbMu.Unlock() + // TODO: Not currently reloadable. + atomic.StoreInt64(&js.queueLimit, s.getOpts().JetStreamRequestQueueLimit) + s.js.Store(js) // FIXME(dlc) - Allow memory only operation? diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 6bb29884fdf..4b96617401b 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -307,6 +307,9 @@ const ( // JSAdvisoryServerRemoved notification that a server has been removed from the system. JSAdvisoryServerRemoved = "$JS.EVENT.ADVISORY.SERVER.REMOVED" + // JSAdvisoryAPILimitReached notification that a server has reached the JS API hard limit. + JSAdvisoryAPILimitReached = "$JS.EVENT.ADVISORY.API.LIMIT_REACHED" + // JSAuditAdvisory is a notification about JetStream API access. // FIXME - Add in details about who.. JSAuditAdvisory = "$JS.EVENT.ADVISORY.API" @@ -354,6 +357,10 @@ const JSMaxMetadataLen = 128 * 1024 // Picked 255 as it seems to be a widely used file name limit const JSMaxNameLen = 255 +// JSDefaultRequestQueueLimit is the default number of entries that we will +// put on the global request queue before we react. +const JSDefaultRequestQueueLimit = 10_000 + // Responses for API calls. // ApiResponse is a standard response from the JetStream JSON API @@ -862,10 +869,22 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub // Copy the state. Note the JSAPI only uses the hdr index to piece apart the // header from the msg body. No other references are needed. // Check pending and warn if getting backed up. - const warnThresh = 128 pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa}) - if pending >= warnThresh { - s.rateLimitFormatWarnf("JetStream request queue has high pending count: %d", pending) + limit := atomic.LoadInt64(&js.queueLimit) + if pending >= int(limit) { + s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending) + s.jsAPIRoutedReqs.drain() + + s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{ + TypedEvent: TypedEvent{ + Type: JSAPILimitReachedAdvisoryType, + ID: nuid.Next(), + Time: time.Now().UTC(), + }, + Server: s.Name(), + Domain: js.config.Domain, + Dropped: int64(pending), + }) } } diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 13a941c10c7..79b62e13c24 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -3832,3 +3832,74 @@ func TestJetStreamClusterAckDeleted(t *testing.T) { ) } } + +func TestJetStreamClusterAPILimitDefault(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + for _, s := range c.servers { + s.optsMu.RLock() + lim := s.opts.JetStreamRequestQueueLimit + s.optsMu.RUnlock() + + require_Equal(t, lim, JSDefaultRequestQueueLimit) + require_Equal(t, atomic.LoadInt64(&s.getJetStream().queueLimit), JSDefaultRequestQueueLimit) + } +} + +func TestJetStreamClusterAPILimitAdvisory(t *testing.T) { + // Hit the limit straight away. + const queueLimit = 1 + + config := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: { + max_mem_store: 256MB + max_file_store: 2GB + store_dir: '%s' + request_queue_limit: ` + fmt.Sprintf("%d", queueLimit) + ` + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } + ` + c := createJetStreamClusterWithTemplate(t, config, "R3S", 3) + defer c.shutdown() + + c.waitOnLeader() + s := c.randomNonLeader() + + for _, s := range c.servers { + lim := atomic.LoadInt64(&s.getJetStream().queueLimit) + require_Equal(t, lim, queueLimit) + } + + nc, _ := jsClientConnect(t, s) + defer nc.Close() + + snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!")) + defer snc.Close() + + sub, err := snc.SubscribeSync(JSAdvisoryAPILimitReached) + require_NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + require_NoError(t, nc.PublishMsg(&nats.Msg{ + Subject: fmt.Sprintf(JSApiConsumerListT, "TEST"), + Reply: nc.NewInbox(), + })) + + // Wait for the advisory to come in. + msg, err := sub.NextMsgWithContext(ctx) + require_NoError(t, err) + var advisory JSAPILimitReachedAdvisory + require_NoError(t, json.Unmarshal(msg.Data, &advisory)) + require_Equal(t, advisory.Domain, _EMPTY_) // No JetStream domain was set. + require_Equal(t, advisory.Dropped, queueLimit) // Configured queue limit. +} diff --git a/server/jetstream_events.go b/server/jetstream_events.go index b39e7ccaee8..8e7300f1031 100644 --- a/server/jetstream_events.go +++ b/server/jetstream_events.go @@ -295,3 +295,14 @@ type JSServerRemovedAdvisory struct { Cluster string `json:"cluster"` Domain string `json:"domain,omitempty"` } + +// JSAPILimitReachedAdvisoryType is sent when the JS API request queue limit is reached. +const JSAPILimitReachedAdvisoryType = "io.nats.jetstream.advisory.v1.api_limit_reached" + +// JSAPILimitReachedAdvisory is a advisory published when JetStream hits the queue length limit. +type JSAPILimitReachedAdvisory struct { + TypedEvent + Server string `json:"server"` // Server that created the event, name or ID + Domain string `json:"domain,omitempty"` // Domain the server belongs to + Dropped int64 `json:"dropped"` // How many messages did we drop from the queue +} diff --git a/server/opts.go b/server/opts.go index 6bf2cc8ef3d..64a797eb93b 100644 --- a/server/opts.go +++ b/server/opts.go @@ -277,89 +277,90 @@ type AuthCallout struct { // NOTE: This structure is no longer used for monitoring endpoints // and json tags are deprecated and may be removed in the future. type Options struct { - ConfigFile string `json:"-"` - ServerName string `json:"server_name"` - Host string `json:"addr"` - Port int `json:"port"` - DontListen bool `json:"dont_listen"` - ClientAdvertise string `json:"-"` - Trace bool `json:"-"` - Debug bool `json:"-"` - TraceVerbose bool `json:"-"` - NoLog bool `json:"-"` - NoSigs bool `json:"-"` - NoSublistCache bool `json:"-"` - NoHeaderSupport bool `json:"-"` - DisableShortFirstPing bool `json:"-"` - Logtime bool `json:"-"` - LogtimeUTC bool `json:"-"` - MaxConn int `json:"max_connections"` - MaxSubs int `json:"max_subscriptions,omitempty"` - MaxSubTokens uint8 `json:"-"` - Nkeys []*NkeyUser `json:"-"` - Users []*User `json:"-"` - Accounts []*Account `json:"-"` - NoAuthUser string `json:"-"` - SystemAccount string `json:"-"` - NoSystemAccount bool `json:"-"` - Username string `json:"-"` - Password string `json:"-"` - Authorization string `json:"-"` - AuthCallout *AuthCallout `json:"-"` - PingInterval time.Duration `json:"ping_interval"` - MaxPingsOut int `json:"ping_max"` - HTTPHost string `json:"http_host"` - HTTPPort int `json:"http_port"` - HTTPBasePath string `json:"http_base_path"` - HTTPSPort int `json:"https_port"` - AuthTimeout float64 `json:"auth_timeout"` - MaxControlLine int32 `json:"max_control_line"` - MaxPayload int32 `json:"max_payload"` - MaxPending int64 `json:"max_pending"` - Cluster ClusterOpts `json:"cluster,omitempty"` - Gateway GatewayOpts `json:"gateway,omitempty"` - LeafNode LeafNodeOpts `json:"leaf,omitempty"` - JetStream bool `json:"jetstream"` - JetStreamMaxMemory int64 `json:"-"` - JetStreamMaxStore int64 `json:"-"` - JetStreamDomain string `json:"-"` - JetStreamExtHint string `json:"-"` - JetStreamKey string `json:"-"` - JetStreamOldKey string `json:"-"` - JetStreamCipher StoreCipher `json:"-"` - JetStreamUniqueTag string - JetStreamLimits JSLimitOpts - JetStreamTpm JSTpmOpts - JetStreamMaxCatchup int64 - StreamMaxBufferedMsgs int `json:"-"` - StreamMaxBufferedSize int64 `json:"-"` - StoreDir string `json:"-"` - SyncInterval time.Duration `json:"-"` - SyncAlways bool `json:"-"` - JsAccDefaultDomain map[string]string `json:"-"` // account to domain name mapping - Websocket WebsocketOpts `json:"-"` - MQTT MQTTOpts `json:"-"` - ProfPort int `json:"-"` - ProfBlockRate int `json:"-"` - PidFile string `json:"-"` - PortsFileDir string `json:"-"` - LogFile string `json:"-"` - LogSizeLimit int64 `json:"-"` - LogMaxFiles int64 `json:"-"` - Syslog bool `json:"-"` - RemoteSyslog string `json:"-"` - Routes []*url.URL `json:"-"` - RoutesStr string `json:"-"` - TLSTimeout float64 `json:"tls_timeout"` - TLS bool `json:"-"` - TLSVerify bool `json:"-"` - TLSMap bool `json:"-"` - TLSCert string `json:"-"` - TLSKey string `json:"-"` - TLSCaCert string `json:"-"` - TLSConfig *tls.Config `json:"-"` - TLSPinnedCerts PinnedCertSet `json:"-"` - TLSRateLimit int64 `json:"-"` + ConfigFile string `json:"-"` + ServerName string `json:"server_name"` + Host string `json:"addr"` + Port int `json:"port"` + DontListen bool `json:"dont_listen"` + ClientAdvertise string `json:"-"` + Trace bool `json:"-"` + Debug bool `json:"-"` + TraceVerbose bool `json:"-"` + NoLog bool `json:"-"` + NoSigs bool `json:"-"` + NoSublistCache bool `json:"-"` + NoHeaderSupport bool `json:"-"` + DisableShortFirstPing bool `json:"-"` + Logtime bool `json:"-"` + LogtimeUTC bool `json:"-"` + MaxConn int `json:"max_connections"` + MaxSubs int `json:"max_subscriptions,omitempty"` + MaxSubTokens uint8 `json:"-"` + Nkeys []*NkeyUser `json:"-"` + Users []*User `json:"-"` + Accounts []*Account `json:"-"` + NoAuthUser string `json:"-"` + SystemAccount string `json:"-"` + NoSystemAccount bool `json:"-"` + Username string `json:"-"` + Password string `json:"-"` + Authorization string `json:"-"` + AuthCallout *AuthCallout `json:"-"` + PingInterval time.Duration `json:"ping_interval"` + MaxPingsOut int `json:"ping_max"` + HTTPHost string `json:"http_host"` + HTTPPort int `json:"http_port"` + HTTPBasePath string `json:"http_base_path"` + HTTPSPort int `json:"https_port"` + AuthTimeout float64 `json:"auth_timeout"` + MaxControlLine int32 `json:"max_control_line"` + MaxPayload int32 `json:"max_payload"` + MaxPending int64 `json:"max_pending"` + Cluster ClusterOpts `json:"cluster,omitempty"` + Gateway GatewayOpts `json:"gateway,omitempty"` + LeafNode LeafNodeOpts `json:"leaf,omitempty"` + JetStream bool `json:"jetstream"` + JetStreamMaxMemory int64 `json:"-"` + JetStreamMaxStore int64 `json:"-"` + JetStreamDomain string `json:"-"` + JetStreamExtHint string `json:"-"` + JetStreamKey string `json:"-"` + JetStreamOldKey string `json:"-"` + JetStreamCipher StoreCipher `json:"-"` + JetStreamUniqueTag string + JetStreamLimits JSLimitOpts + JetStreamTpm JSTpmOpts + JetStreamMaxCatchup int64 + JetStreamRequestQueueLimit int64 + StreamMaxBufferedMsgs int `json:"-"` + StreamMaxBufferedSize int64 `json:"-"` + StoreDir string `json:"-"` + SyncInterval time.Duration `json:"-"` + SyncAlways bool `json:"-"` + JsAccDefaultDomain map[string]string `json:"-"` // account to domain name mapping + Websocket WebsocketOpts `json:"-"` + MQTT MQTTOpts `json:"-"` + ProfPort int `json:"-"` + ProfBlockRate int `json:"-"` + PidFile string `json:"-"` + PortsFileDir string `json:"-"` + LogFile string `json:"-"` + LogSizeLimit int64 `json:"-"` + LogMaxFiles int64 `json:"-"` + Syslog bool `json:"-"` + RemoteSyslog string `json:"-"` + Routes []*url.URL `json:"-"` + RoutesStr string `json:"-"` + TLSTimeout float64 `json:"tls_timeout"` + TLS bool `json:"-"` + TLSVerify bool `json:"-"` + TLSMap bool `json:"-"` + TLSCert string `json:"-"` + TLSKey string `json:"-"` + TLSCaCert string `json:"-"` + TLSConfig *tls.Config `json:"-"` + TLSPinnedCerts PinnedCertSet `json:"-"` + TLSRateLimit int64 `json:"-"` // When set to true, the server will perform the TLS handshake before // sending the INFO protocol. For clients that are not configured // with a similar option, their connection will fail with some sort @@ -2417,6 +2418,12 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } opts.StreamMaxBufferedMsgs = int(mlen) + case "request_queue_limit": + lim, ok := mv.(int64) + if !ok { + return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} + } + opts.JetStreamRequestQueueLimit = lim default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -5604,6 +5611,9 @@ func setBaselineOptions(opts *Options) { if opts.SyncInterval == 0 && !opts.syncSet { opts.SyncInterval = defaultSyncInterval } + if opts.JetStreamRequestQueueLimit <= 0 { + opts.JetStreamRequestQueueLimit = JSDefaultRequestQueueLimit + } } func getDefaultAuthTimeout(tls *tls.Config, tlsTimeout float64) float64 { diff --git a/server/opts_test.go b/server/opts_test.go index 8231ee3bde9..96e91d32442 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -67,12 +67,13 @@ func TestDefaultOptions(t *testing.T) { LeafNode: LeafNodeOpts{ ReconnectInterval: DEFAULT_LEAF_NODE_RECONNECT, }, - ConnectErrorReports: DEFAULT_CONNECT_ERROR_REPORTS, - ReconnectErrorReports: DEFAULT_RECONNECT_ERROR_REPORTS, - MaxTracedMsgLen: 0, - JetStreamMaxMemory: -1, - JetStreamMaxStore: -1, - SyncInterval: 2 * time.Minute, + ConnectErrorReports: DEFAULT_CONNECT_ERROR_REPORTS, + ReconnectErrorReports: DEFAULT_RECONNECT_ERROR_REPORTS, + MaxTracedMsgLen: 0, + JetStreamMaxMemory: -1, + JetStreamMaxStore: -1, + SyncInterval: 2 * time.Minute, + JetStreamRequestQueueLimit: JSDefaultRequestQueueLimit, } opts := &Options{}