Skip to content

Commit

Permalink
Refactor subcollector tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kuskoman committed Apr 27, 2023
1 parent abf3537 commit 518ca46
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 98 deletions.
77 changes: 0 additions & 77 deletions collectors/nodestats/nodestats_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,80 +152,3 @@ func TestCollectError(t *testing.T) {
t.Error("Expected err not to be nil")
}
}

func TestIsPipelineHealthy(t *testing.T) {
collector := NewPipelineSubcollector()
tests := []struct {
name string
stats responses.PipelineReloadResponse
expected float64
}{
{
name: "Both timestamps nil",
stats: responses.PipelineReloadResponse{
LastFailureTimestamp: nil,
LastSuccessTimestamp: nil,
},
expected: 1,
},
{
name: "Failure timestamp set",
stats: responses.PipelineReloadResponse{
LastFailureTimestamp: &time.Time{},
LastSuccessTimestamp: nil,
},
expected: 0,
},
{
name: "Success timestamp earlier than failure timestamp",
stats: responses.PipelineReloadResponse{
LastFailureTimestamp: &time.Time{},
LastSuccessTimestamp: func() *time.Time { t := time.Time{}.Add(-1 * time.Hour); return &t }(),
},
expected: 0,
},
{
name: "Success timestamp later than failure timestamp",
stats: responses.PipelineReloadResponse{
LastFailureTimestamp: &time.Time{},
LastSuccessTimestamp: func() *time.Time { t := time.Time{}.Add(1 * time.Hour); return &t }(),
},
expected: 1,
},
{
name: "Missing fields, assume healthy",
stats: responses.PipelineReloadResponse{},
expected: 1,
},
}

// Run test cases
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := collector.isPipelineHealthy(tt.stats)
if result != tt.expected {
t.Errorf("Expected %v, but got %v", tt.expected, result)
return
}
})
}
}

func TestTruncatePluginId(t *testing.T) {
testCases := []struct {
input string
output string
}{
{"plain_2c897236-b1fd-42e6-ab7a-f468-b6e6-e404", "b6e6e404"},
{"552b7810244be6259a4cc88fe34833088a23437c5ee9b4c788b2ec4e502c819f", "502c819f"},
{"pipeline_custom_filter_foobar", "pipeline_custom_filter_foobar"},
{"filter_0001", "filter_0001"},
}

for _, tc := range testCases {
got := TruncatePluginId(tc.input)
if got != tc.output {
t.Errorf("TruncatePluginId(%v) = %v; want %v", tc.input, got, tc.output)
}
}
}
57 changes: 36 additions & 21 deletions collectors/nodestats/pipeline_subcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import (
"github.com/kuskoman/logstash-exporter/prometheus_helper"
)

const (
CollectorUnhealthy = 0
CollectorHealthy = 1
)

// PipelineSubcollector is a subcollector that collects metrics about the
// pipelines of a logstash node.
// The collector is created once for each pipeline of the node.
Expand All @@ -32,9 +37,9 @@ type PipelineSubcollector struct {
QueueEventsQueueSize *prometheus.Desc
QueueMaxQueueSizeInBytes *prometheus.Desc

PipelinePluginEventsIn *prometheus.Desc
PipelinePluginEventsOut *prometheus.Desc
PipelinePluginEventsDuration *prometheus.Desc
PipelinePluginEventsIn *prometheus.Desc
PipelinePluginEventsOut *prometheus.Desc
PipelinePluginEventsDuration *prometheus.Desc
PipelinePluginEventsQueuePushDuration *prometheus.Desc
}

Expand All @@ -58,11 +63,10 @@ func NewPipelineSubcollector() *PipelineSubcollector {
QueueEventsQueueSize: descHelper.NewDescWithHelpAndLabels("queue_events_queue_size", "Number of events that the queue can accommodate", "pipeline"),
QueueMaxQueueSizeInBytes: descHelper.NewDescWithHelpAndLabels("queue_max_size_in_bytes", "Maximum size of given queue in bytes.", "pipeline"),

PipelinePluginEventsIn: descHelper.NewDescWithHelpAndLabels("plugin_events_in", "Number of events received this pipeline.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsOut: descHelper.NewDescWithHelpAndLabels("plugin_events_out", "Number of events output by this pipeline.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_duration", "Time spent processing events in this plugin.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsQueuePushDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_queue_push_duration", "Time spent pushing events into the input queue.", "pipeline", "plugin_type", "plugin", "plugin_id"),

PipelinePluginEventsIn: descHelper.NewDescWithHelpAndLabels("plugin_events_in", "Number of events received this pipeline.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsOut: descHelper.NewDescWithHelpAndLabels("plugin_events_out", "Number of events output by this pipeline.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_duration", "Time spent processing events in this plugin.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsQueuePushDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_queue_push_duration", "Time spent pushing events into the input queue.", "pipeline", "plugin_type", "plugin", "plugin_id"),
}
}

Expand Down Expand Up @@ -140,22 +144,33 @@ func (collector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipeli
log.Printf("collected pipeline stats for pipeline %s in %s", pipelineID, collectingEnd.Sub(collectingStart))
}

// isPipelineHealthy returns 1 if the pipeline is healthy, 0 if it is not
// A pipeline is considered healthy if:
// 1. last_failure_timestamp is nil
// 2. last_failure_timestamp is set and last_success_timestamp is nil
// 3. last_success_timestamp < last_failure_timestamp
// 4. last_success_timestamp > last_failure_timestamp
// 5. last_failure_timestamp and last_success_timestamp are either missing (likely due to version incompatibility)
// or set to the same value (likely due to a bug in the pipeline):
// lacking information, assume healthy
func (collector *PipelineSubcollector) isPipelineHealthy(pipeReloadStats responses.PipelineReloadResponse) float64 {
// 1. If last failure timestamp (or both) are nil, the pipeline is healthy
if pipeReloadStats.LastFailureTimestamp == nil {
return 1
// 2. If last_failure_timestamp is set and last success timestamp is nil, the pipeline is unhealthy
} else if pipeReloadStats.LastFailureTimestamp != nil && pipeReloadStats.LastSuccessTimestamp == nil {
return 0
// 3. If last_success_timestamp < last_failure_timestamp, the pipeline is unhealthy
} else if pipeReloadStats.LastSuccessTimestamp.Before(*pipeReloadStats.LastFailureTimestamp) {
return 0
// 4. If last_success_timestamp > last_failure_timestamp, the pipeline is healthy
} else if pipeReloadStats.LastSuccessTimestamp.After(*pipeReloadStats.LastFailureTimestamp) {
return 1
return CollectorHealthy
}

if pipeReloadStats.LastFailureTimestamp != nil && pipeReloadStats.LastSuccessTimestamp == nil {
return CollectorUnhealthy
}
// Missing field, likely due to version incompatibility - lacking information, assume healthy
return 1

if pipeReloadStats.LastSuccessTimestamp.Before(*pipeReloadStats.LastFailureTimestamp) {
return CollectorUnhealthy
}

if pipeReloadStats.LastSuccessTimestamp.After(*pipeReloadStats.LastFailureTimestamp) {
return CollectorHealthy
}

return CollectorHealthy
}

// Plugins have non-unique names, so use both name and id as labels
Expand Down
96 changes: 96 additions & 0 deletions collectors/nodestats/pipeline_subcollector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package nodestats

import (
"testing"
"time"

"github.com/kuskoman/logstash-exporter/fetcher/responses"
)

func TestIsPipelineHealthy(t *testing.T) {
t.Parallel()
collector := NewPipelineSubcollector()
tests := []struct {
name string
stats responses.PipelineReloadResponse
expected float64
}{
{
name: "Both timestamps nil",
stats: responses.PipelineReloadResponse{
LastFailureTimestamp: nil,
LastSuccessTimestamp: nil,
},
expected: CollectorHealthy,
},
{
name: "Failure timestamp set",
stats: responses.PipelineReloadResponse{
LastFailureTimestamp: &time.Time{},
LastSuccessTimestamp: nil,
},
expected: CollectorUnhealthy,
},
{
name: "Success timestamp earlier than failure timestamp",
stats: responses.PipelineReloadResponse{
LastFailureTimestamp: &time.Time{},
LastSuccessTimestamp: func() *time.Time { t := time.Time{}.Add(-1 * time.Hour); return &t }(),
},
expected: CollectorUnhealthy,
},
{
name: "Success timestamp later than failure timestamp",
stats: responses.PipelineReloadResponse{
LastFailureTimestamp: &time.Time{},
LastSuccessTimestamp: func() *time.Time { t := time.Time{}.Add(1 * time.Hour); return &t }(),
},
expected: CollectorHealthy,
},
{
name: "Missing fields, assume healthy",
stats: responses.PipelineReloadResponse{},
expected: CollectorHealthy,
},
{
name: "Success timestamp equal to failure timestamp",
stats: responses.PipelineReloadResponse{
LastFailureTimestamp: func() *time.Time { t := time.Now(); return &t }(),
LastSuccessTimestamp: func() *time.Time { t := time.Now(); return &t }(),
},
expected: CollectorHealthy,
},
}

// Run test cases
for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
localTestCase := testCase
t.Parallel()
result := collector.isPipelineHealthy(localTestCase.stats)
if result != localTestCase.expected {
t.Errorf("Expected %v, but got %v", localTestCase.expected, result)
return
}
})
}
}

func TestTruncatePluginId(t *testing.T) {
testCases := []struct {
input string
output string
}{
{"plain_2c897236-b1fd-42e6-ab7a-f468-b6e6-e404", "b6e6e404"},
{"552b7810244be6259a4cc88fe34833088a23437c5ee9b4c788b2ec4e502c819f", "502c819f"},
{"pipeline_custom_filter_foobar", "pipeline_custom_filter_foobar"},
{"filter_0001", "filter_0001"},
}

for _, tc := range testCases {
got := TruncatePluginId(tc.input)
if got != tc.output {
t.Errorf("TruncatePluginId(%v) = %v; want %v", tc.input, got, tc.output)
}
}
}

0 comments on commit 518ca46

Please sign in to comment.