Skip to content

Commit

Permalink
(2.11) [ADDED] Monitoring: SlowConsumersStats in PING/STATSZ messages (
Browse files Browse the repository at this point in the history
…#5894)

(also fixed a flapper)

Resolves #5877

Signed-off-by: Ivan Kozlovic <[email protected]>
  • Loading branch information
derekcollison authored Sep 17, 2024
2 parents 2faea26 + 8430ccd commit cec21b7
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 30 deletions.
41 changes: 26 additions & 15 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,21 +329,22 @@ type ClientInfo struct {

// ServerStats hold various statistics that we will periodically send out.
type ServerStats struct {
Start time.Time `json:"start"`
Mem int64 `json:"mem"`
Cores int `json:"cores"`
CPU float64 `json:"cpu"`
Connections int `json:"connections"`
TotalConnections uint64 `json:"total_connections"`
ActiveAccounts int `json:"active_accounts"`
NumSubs uint32 `json:"subscriptions"`
Sent DataStats `json:"sent"`
Received DataStats `json:"received"`
SlowConsumers int64 `json:"slow_consumers"`
Routes []*RouteStat `json:"routes,omitempty"`
Gateways []*GatewayStat `json:"gateways,omitempty"`
ActiveServers int `json:"active_servers,omitempty"`
JetStream *JetStreamVarz `json:"jetstream,omitempty"`
Start time.Time `json:"start"`
Mem int64 `json:"mem"`
Cores int `json:"cores"`
CPU float64 `json:"cpu"`
Connections int `json:"connections"`
TotalConnections uint64 `json:"total_connections"`
ActiveAccounts int `json:"active_accounts"`
NumSubs uint32 `json:"subscriptions"`
Sent DataStats `json:"sent"`
Received DataStats `json:"received"`
SlowConsumers int64 `json:"slow_consumers"`
SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats,omitempty"`
Routes []*RouteStat `json:"routes,omitempty"`
Gateways []*GatewayStat `json:"gateways,omitempty"`
ActiveServers int `json:"active_servers,omitempty"`
JetStream *JetStreamVarz `json:"jetstream,omitempty"`
}

// RouteStat holds route statistics.
Expand Down Expand Up @@ -880,6 +881,16 @@ func (s *Server) sendStatsz(subj string) {
m.Stats.Sent.Msgs = atomic.LoadInt64(&s.outMsgs)
m.Stats.Sent.Bytes = atomic.LoadInt64(&s.outBytes)
m.Stats.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
// Evaluate the slow consumer stats, but set it only if one of the value is not 0.
scs := &SlowConsumersStats{
Clients: s.NumSlowConsumersClients(),
Routes: s.NumSlowConsumersRoutes(),
Gateways: s.NumSlowConsumersGateways(),
Leafs: s.NumSlowConsumersLeafs(),
}
if scs.Clients != 0 || scs.Routes != 0 || scs.Gateways != 0 || scs.Leafs != 0 {
m.Stats.SlowConsumersStats = scs
}
m.Stats.NumSubs = s.numSubscriptions()
// Routes
s.forEachRoute(func(r *client) {
Expand Down
61 changes: 61 additions & 0 deletions server/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3687,6 +3687,7 @@ func TestServerEventsStatsZJetStreamApiLevel(t *testing.T) {
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
ncs, err := nats.Connect(url, createUserCreds(t, s, akp))
require_NoError(t, err)
defer ncs.Close()

msg, err := ncs.Request("$SYS.REQ.SERVER.PING.STATSZ", nil, time.Second)
require_NoError(t, err)
Expand All @@ -3697,3 +3698,63 @@ func TestServerEventsStatsZJetStreamApiLevel(t *testing.T) {

require_Equal(t, stats.Stats.JetStream.Stats.API.Level, JSApiLevel)
}

func TestServerEventsPingStatsSlowConsumersStats(t *testing.T) {
s, _ := runTrustedServer(t)
defer s.Shutdown()

acc, akp := createAccount(s)
s.setSystemAccount(acc)
ncs, err := nats.Connect(s.ClientURL(), createUserCreds(t, s, akp))
require_NoError(t, err)
defer ncs.Close()

const statsz = "STATSZ"
for _, test := range []struct {
name string
f func() string
expectTwo bool
}{
{"server stats ping request subject", func() string { return serverStatsPingReqSubj }, true},
{"server ping request subject", func() string { return fmt.Sprintf(serverPingReqSubj, statsz) }, true},
{"server direct request subject", func() string { return fmt.Sprintf(serverDirectReqSubj, s.ID(), statsz) }, false},
} {
t.Run(test.name, func(t *testing.T) {
// Clear all slow consumers values
s.scStats.clients.Store(0)
s.scStats.routes.Store(0)
s.scStats.gateways.Store(0)
s.scStats.leafs.Store(0)

msg, err := ncs.Request(test.f(), nil, time.Second)
require_NoError(t, err)

var ssm ServerStatsMsg
err = json.Unmarshal(msg.Data, &ssm)
require_NoError(t, err)

// No slow consumer stats, so should be nil
require_True(t, ssm.Stats.SlowConsumersStats == nil)

// Now set some values
s.scStats.clients.Store(1)
s.scStats.routes.Store(2)
s.scStats.gateways.Store(3)
s.scStats.leafs.Store(4)

msg, err = ncs.Request(test.f(), nil, time.Second)
require_NoError(t, err)

ssm = ServerStatsMsg{}
err = json.Unmarshal(msg.Data, &ssm)
require_NoError(t, err)

require_NotNil(t, ssm.Stats.SlowConsumersStats)
scs := ssm.Stats.SlowConsumersStats
require_Equal(t, scs.Clients, 1)
require_Equal(t, scs.Routes, 2)
require_Equal(t, scs.Gateways, 3)
require_Equal(t, scs.Leafs, 4)
})
}
}
33 changes: 18 additions & 15 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10811,21 +10811,24 @@ func TestNoRaceJetStreamStandaloneDontReplyToAckBeforeProcessingIt(t *testing.T)
for i := 0; i < total; i++ {
go func() {
defer wg.Done()
msgs, err := sub.Fetch(1)
if err != nil {
errCh <- err
return
}
msg := msgs[0]
err = msg.AckSync()
if err != nil {
errCh <- err
return
}
_, err = js.Publish(msg.Subject, []byte("hello"))
if err != nil {
errCh <- err
return
for {
msgs, err := sub.Fetch(1)
if err != nil {
time.Sleep(5 * time.Millisecond)
continue
}
msg := msgs[0]
err = msg.AckSync()
if err != nil {
time.Sleep(5 * time.Millisecond)
continue
}
_, err = js.Publish(msg.Subject, []byte("hello"))
if err != nil {
errCh <- err
return
}
break
}
}()
}
Expand Down

0 comments on commit cec21b7

Please sign in to comment.