Skip to content

Commit

Permalink
Add metric for pipelines indicating if they are up (#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
excalq committed Apr 27, 2023
1 parent 19d3914 commit ded28c1
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 27 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ Table of exported metrics:
| logstash_stats_pipeline_reloads_successes | counter | Number of successful pipeline reloads. |
| logstash_stats_pipeline_reloads_last_failure_timestamp | gauge | Timestamp of last failed pipeline reload. |
| logstash_stats_pipeline_reloads_last_success_timestamp | gauge | Timestamp of last successful pipeline reload. |
| logstash_stats_pipeline_up | gauge | Whether the pipeline is up or not. |
| logstash_stats_process_cpu_load_average_1m | gauge | Total 1m system load average. |
| logstash_stats_process_cpu_load_average_5m | gauge | Total 5m system load average. |
| logstash_stats_process_cpu_load_average_15m | gauge | Total 15m system load average. |
Expand Down
20 changes: 1 addition & 19 deletions collectors/nodestats/nodestats_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestCollectNotNil(t *testing.T) {
"logstash_stats_jvm_threads_count",
"logstash_stats_jvm_threads_peak_count",
"logstash_stats_jvm_uptime_millis",
"logstash_stats_pipeline_up",
"logstash_stats_pipeline_events_duration",
"logstash_stats_pipeline_events_filtered",
"logstash_stats_pipeline_events_in",
Expand Down Expand Up @@ -151,22 +152,3 @@ func TestCollectError(t *testing.T) {
t.Error("Expected err not to be nil")
}
}

func TestTruncatePluginId(t *testing.T) {
testCases := []struct {
input string
output string
}{
{"plain_2c897236-b1fd-42e6-ab7a-f468-b6e6-e404", "b6e6e404"},
{"552b7810244be6259a4cc88fe34833088a23437c5ee9b4c788b2ec4e502c819f", "502c819f"},
{"pipeline_custom_filter_foobar", "pipeline_custom_filter_foobar"},
{"filter_0001", "filter_0001"},
}

for _, tc := range testCases {
got := TruncatePluginId(tc.input)
if got != tc.output {
t.Errorf("TruncatePluginId(%v) = %v; want %v", tc.input, got, tc.output)
}
}
}
55 changes: 47 additions & 8 deletions collectors/nodestats/pipeline_subcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ import (
"github.com/kuskoman/logstash-exporter/prometheus_helper"
)

const (
CollectorUnhealthy = 0
CollectorHealthy = 1
)

// PipelineSubcollector is a subcollector that collects metrics about the
// pipelines of a logstash node.
// The collector is created once for each pipeline of the node.
type PipelineSubcollector struct {
Up *prometheus.Desc
EventsOut *prometheus.Desc
EventsFiltered *prometheus.Desc
EventsIn *prometheus.Desc
Expand All @@ -31,15 +37,16 @@ type PipelineSubcollector struct {
QueueEventsQueueSize *prometheus.Desc
QueueMaxQueueSizeInBytes *prometheus.Desc

PipelinePluginEventsIn *prometheus.Desc
PipelinePluginEventsOut *prometheus.Desc
PipelinePluginEventsDuration *prometheus.Desc
PipelinePluginEventsIn *prometheus.Desc
PipelinePluginEventsOut *prometheus.Desc
PipelinePluginEventsDuration *prometheus.Desc
PipelinePluginEventsQueuePushDuration *prometheus.Desc
}

func NewPipelineSubcollector() *PipelineSubcollector {
descHelper := prometheus_helper.SimpleDescHelper{Namespace: namespace, Subsystem: fmt.Sprintf("%s_pipeline", subsystem)}
return &PipelineSubcollector{
Up: descHelper.NewDescWithHelpAndLabels("up", "Whether the pipeline is up or not.", "pipeline"),
EventsOut: descHelper.NewDescWithHelpAndLabels("events_out", "Number of events that have been processed by this pipeline.", "pipeline"),
EventsFiltered: descHelper.NewDescWithHelpAndLabels("events_filtered", "Number of events that have been filtered out by this pipeline.", "pipeline"),
EventsIn: descHelper.NewDescWithHelpAndLabels("events_in", "Number of events that have been inputted into this pipeline.", "pipeline"),
Expand All @@ -56,11 +63,10 @@ func NewPipelineSubcollector() *PipelineSubcollector {
QueueEventsQueueSize: descHelper.NewDescWithHelpAndLabels("queue_events_queue_size", "Number of events that the queue can accommodate", "pipeline"),
QueueMaxQueueSizeInBytes: descHelper.NewDescWithHelpAndLabels("queue_max_size_in_bytes", "Maximum size of given queue in bytes.", "pipeline"),

PipelinePluginEventsIn: descHelper.NewDescWithHelpAndLabels("plugin_events_in", "Number of events received this pipeline.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsOut: descHelper.NewDescWithHelpAndLabels("plugin_events_out", "Number of events output by this pipeline.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_duration", "Time spent processing events in this plugin.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsQueuePushDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_queue_push_duration", "Time spent pushing events into the input queue.", "pipeline", "plugin_type", "plugin", "plugin_id"),

PipelinePluginEventsIn: descHelper.NewDescWithHelpAndLabels("plugin_events_in", "Number of events received this pipeline.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsOut: descHelper.NewDescWithHelpAndLabels("plugin_events_out", "Number of events output by this pipeline.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_duration", "Time spent processing events in this plugin.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsQueuePushDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_queue_push_duration", "Time spent pushing events into the input queue.", "pipeline", "plugin_type", "plugin", "plugin_id"),
}
}

Expand All @@ -74,6 +80,8 @@ func (collector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipeli
ch <- prometheus.MustNewConstMetric(collector.EventsDuration, prometheus.CounterValue, float64(pipeStats.Events.DurationInMillis), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.EventsQueuePushDuration, prometheus.CounterValue, float64(pipeStats.Events.QueuePushDurationInMillis), pipelineID)

ch <- prometheus.MustNewConstMetric(collector.Up, prometheus.GaugeValue, float64(collector.isPipelineHealthy(pipeStats.Reloads)), pipelineID)

ch <- prometheus.MustNewConstMetric(collector.ReloadsSuccesses, prometheus.CounterValue, float64(pipeStats.Reloads.Successes), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.ReloadsFailures, prometheus.CounterValue, float64(pipeStats.Reloads.Failures), pipelineID)

Expand Down Expand Up @@ -129,6 +137,37 @@ func (collector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipeli
log.Printf("collected pipeline stats for pipeline %s in %s", pipelineID, collectingEnd.Sub(collectingStart))
}

// isPipelineHealthy returns 1 if the pipeline is healthy, 0 if it is not
// A pipeline is considered healthy if:
// 1. last_failure_timestamp is nil
// 2. last_success_timestamp > last_failure_timestamp
// 3. last_failure_timestamp and last_success_timestamp are either missing (likely due to version incompatibility)
// or set to the same value (likely due to a bug in the pipeline):
// lacking information, assume healthy
//
// A pipeline is considered unhealthy if:
// 1. last_failure_timestamp is not nil and last_success_timestamp is nil
// 2. last_failure_timestamp > last_success_timestamp
func (collector *PipelineSubcollector) isPipelineHealthy(pipeReloadStats responses.PipelineReloadResponse) float64 {
if pipeReloadStats.LastFailureTimestamp == nil {
return CollectorHealthy
}

if pipeReloadStats.LastFailureTimestamp != nil && pipeReloadStats.LastSuccessTimestamp == nil {
return CollectorUnhealthy
}

if pipeReloadStats.LastSuccessTimestamp.Before(*pipeReloadStats.LastFailureTimestamp) {
return CollectorUnhealthy
}

if pipeReloadStats.LastSuccessTimestamp.After(*pipeReloadStats.LastFailureTimestamp) {
return CollectorHealthy
}

return CollectorHealthy
}

// Plugins have non-unique names, so use both name and id as labels
// By default ids are a 36-char UUID, optionally prefixed the a plugin type, or a 64-char SHA256 hash
// If the id is set by the user, keep it. If it's a UUID, truncate it to the last 8 chars (1% chance of collision per 9291)
Expand Down
101 changes: 101 additions & 0 deletions collectors/nodestats/pipeline_subcollector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package nodestats

import (
"testing"
"time"

"github.com/kuskoman/logstash-exporter/fetcher/responses"
)

func TestIsPipelineHealthy(t *testing.T) {
t.Parallel()
collector := NewPipelineSubcollector()

now := time.Now()
oneHourBefore := now.Add(-1 * time.Hour)
oneHourAfter := now.Add(1 * time.Hour)

tests := []struct {
name string
stats responses.PipelineReloadResponse
expected float64
}{
{
name: "Both timestamps nil",
stats: responses.PipelineReloadResponse{
LastFailureTimestamp: nil,
LastSuccessTimestamp: nil,
},
expected: 1,
},
{
name: "Failure timestamp set",
stats: responses.PipelineReloadResponse{
LastFailureTimestamp: &now,
LastSuccessTimestamp: nil,
},
expected: 0,
},
{
name: "Success timestamp earlier than failure timestamp",
stats: responses.PipelineReloadResponse{
LastFailureTimestamp: &now,
LastSuccessTimestamp: &oneHourBefore,
},
expected: 0,
},
{
name: "Success timestamp later than failure timestamp",
stats: responses.PipelineReloadResponse{
LastFailureTimestamp: &now,
LastSuccessTimestamp: &oneHourAfter,
},
expected: 1,
},
{
name: "Missing fields, assume healthy",
stats: responses.PipelineReloadResponse{},
expected: 1,
},
{
name: "Success timestamp equal to failure timestamp",
stats: responses.PipelineReloadResponse{
LastFailureTimestamp: &now,
LastSuccessTimestamp: &now,
},
expected: 1,
},
}

// Run test cases
for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
localTestCase := testCase
t.Parallel()
result := collector.isPipelineHealthy(localTestCase.stats)
if result != localTestCase.expected {
t.Errorf("Expected %v, but got %v", localTestCase.expected, result)
return
}
})
}
}

func TestTruncatePluginId(t *testing.T) {
testCases := []struct {
input string
output string
}{
{"plain_2c897236-b1fd-42e6-ab7a-f468-b6e6-e404", "b6e6e404"},
{"552b7810244be6259a4cc88fe34833088a23437c5ee9b4c788b2ec4e502c819f", "502c819f"},
{"pipeline_custom_filter_foobar", "pipeline_custom_filter_foobar"},
{"filter_0001", "filter_0001"},
}

for _, tc := range testCases {
got := TruncatePluginId(tc.input)
if got != tc.output {
t.Errorf("TruncatePluginId(%v) = %v; want %v", tc.input, got, tc.output)
}
}
}
1 change: 1 addition & 0 deletions scripts/snapshots/metric_names.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ logstash_stats_jvm_mem_non_heap_committed_bytes
logstash_stats_jvm_threads_count
logstash_stats_jvm_threads_peak_count
logstash_stats_jvm_uptime_millis
logstash_stats_pipeline_up
logstash_stats_pipeline_events_duration
logstash_stats_pipeline_events_filtered
logstash_stats_pipeline_events_in
Expand Down

0 comments on commit ded28c1

Please sign in to comment.