Skip to content

Commit

Permalink
kuskoman#93: Adds logstash_stats_pipeline_up metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
excalq committed Apr 20, 2023
1 parent 3ae9f36 commit dbb3cc4
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 18 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,10 @@ Table of exported metrics:
| logstash_stats_pipeline_queue_max_size_in_bytes | counter | Maximum size of given queue in bytes. |
| logstash_stats_pipeline_reloads_failures | counter | Number of failed pipeline reloads. |
| logstash_stats_pipeline_reloads_successes | counter | Number of successful pipeline reloads. |
| logstash_stats_process_cpu_load_average_15m | gauge | Total 15m system load average. |
| logstash_stats_pipeline_up | gauge | Whether the pipeline is up or not. |
| logstash_stats_process_cpu_load_average_1m | gauge | Total 1m system load average. |
| logstash_stats_process_cpu_load_average_5m | gauge | Total 5m system load average. |
| logstash_stats_process_cpu_load_average_15m | gauge | Total 15m system load average. |
| logstash_stats_process_cpu_percent | gauge | CPU usage of the process. |
| logstash_stats_process_cpu_total_millis | gauge | Total CPU time used by the process. |
| logstash_stats_process_max_file_descriptors | gauge | Limit of open file descriptors. |
Expand Down
1 change: 1 addition & 0 deletions collectors/nodestats/nodestats_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestCollectNotNil(t *testing.T) {
"logstash_stats_jvm_threads_count",
"logstash_stats_jvm_threads_peak_count",
"logstash_stats_jvm_uptime_millis",
"logstash_stats_pipeline_up",
"logstash_stats_pipeline_events_duration",
"logstash_stats_pipeline_events_filtered",
"logstash_stats_pipeline_events_in",
Expand Down
22 changes: 21 additions & 1 deletion collectors/nodestats/pipeline_subcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
// pipelines of a logstash node.
// The collector is created once for each pipeline of the node.
type PipelineSubcollector struct {
Up *prometheus.Desc
EventsOut *prometheus.Desc
EventsFiltered *prometheus.Desc
EventsIn *prometheus.Desc
Expand All @@ -34,6 +35,7 @@ type PipelineSubcollector struct {
func NewPipelineSubcollector() *PipelineSubcollector {
descHelper := prometheus_helper.SimpleDescHelper{Namespace: namespace, Subsystem: fmt.Sprintf("%s_pipeline", subsystem)}
return &PipelineSubcollector{
Up: descHelper.NewDescWithHelpAndLabel("up", "Whether the pipeline is up or not.", "pipeline_id"),
EventsOut: descHelper.NewDescWithHelpAndLabel("events_out", "Number of events that have been processed by this pipeline.", "pipeline_id"),
EventsFiltered: descHelper.NewDescWithHelpAndLabel("events_filtered", "Number of events that have been filtered out by this pipeline.", "pipeline_id"),
EventsIn: descHelper.NewDescWithHelpAndLabel("events_in", "Number of events that have been inputted into this pipeline.", "pipeline_id"),
Expand All @@ -59,7 +61,7 @@ func (collector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipeli
ch <- prometheus.MustNewConstMetric(collector.EventsDuration, prometheus.CounterValue, float64(pipeStats.Events.DurationInMillis), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.EventsQueuePushDuration, prometheus.CounterValue, float64(pipeStats.Events.QueuePushDurationInMillis), pipelineID)

// todo: add restart timestamps
ch <- prometheus.MustNewConstMetric(collector.Up, prometheus.GaugeValue, float64(collector.isPipelineHealthy(pipeStats.Reloads)), pipelineID)

ch <- prometheus.MustNewConstMetric(collector.ReloadsSuccesses, prometheus.CounterValue, float64(pipeStats.Reloads.Successes), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.ReloadsFailures, prometheus.CounterValue, float64(pipeStats.Reloads.Failures), pipelineID)
Expand All @@ -71,3 +73,21 @@ func (collector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipeli
collectingEnd := time.Now()
log.Printf("collected pipeline stats for pipeline %s in %s", pipelineID, collectingEnd.Sub(collectingStart))
}

func (collector *PipelineSubcollector) isPipelineHealthy(pipeReloadStats responses.PipelineReloadResponse) float64 {
// 1. If both timestamps are nil, the pipeline is healthy
if pipeReloadStats.LastSuccessTimestamp == nil && pipeReloadStats.LastFailureTimestamp == nil {
return 1
// 2. If last_failure_timestamp is set and last success timestamp is nil, the pipeline is unhealthy
} else if pipeReloadStats.LastFailureTimestamp != nil && pipeReloadStats.LastSuccessTimestamp == nil {
return 0
// 3. If last_success_timestamp < last_failure_timestamp, the pipeline is unhealthy
} else if pipeReloadStats.LastSuccessTimestamp.Before(*pipeReloadStats.LastFailureTimestamp) {
return 0
// 4. If last_success_timestamp > last_failure_timestamp, the pipeline is healthy
} else if pipeReloadStats.LastSuccessTimestamp.After(*pipeReloadStats.LastFailureTimestamp) {
return 1
}
// Missing field, likely due to version incompatibility - lacking information, assume healthy
return 1
}
4 changes: 2 additions & 2 deletions fetcher/responses/__snapshots__/nodestats_response_test.snap
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ responses.NodeStatsResponse{
Outputs: {
},
},
Reloads: struct { LastFailureTimestamp interface {} "json:\"last_failure_timestamp\""; Successes int "json:\"successes\""; Failures int "json:\"failures\""; LastSuccessTimestamp interface {} "json:\"last_success_timestamp\""; LastError interface {} "json:\"last_error\"" }{},
Reloads: responses.PipelineReloadResponse{},
Queue: struct { Type string "json:\"type\""; EventsCount int "json:\"events_count\""; QueueSizeInBytes int "json:\"queue_size_in_bytes\""; MaxQueueSizeInBytes int "json:\"max_queue_size_in_bytes\"" }{},
Hash: "",
EphemeralID: "",
Expand Down Expand Up @@ -125,7 +125,7 @@ responses.NodeStatsResponse{
},
},
},
Reloads: struct { LastFailureTimestamp interface {} "json:\"last_failure_timestamp\""; Successes int "json:\"successes\""; Failures int "json:\"failures\""; LastSuccessTimestamp interface {} "json:\"last_success_timestamp\""; LastError interface {} "json:\"last_error\"" }{},
Reloads: responses.PipelineReloadResponse{},
Queue: struct { Type string "json:\"type\""; EventsCount int "json:\"events_count\""; QueueSizeInBytes int "json:\"queue_size_in_bytes\""; MaxQueueSizeInBytes int "json:\"max_queue_size_in_bytes\"" }{Type:"memory", EventsCount:0, QueueSizeInBytes:0, MaxQueueSizeInBytes:0},
Hash: "a73729cc9c29203931db21553c5edba063820a7e40d16cb5053be75cc3811a17",
EphemeralID: "a5c63d09-1ba6-4d67-90a5-075f468a7ab0",
Expand Down
24 changes: 10 additions & 14 deletions fetcher/responses/nodestats_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,7 @@ type SinglePipelineResponse struct {
} `json:"events"`
} `json:"outputs"`
} `json:"plugins"`
Reloads struct {
LastFailureTimestamp interface{} `json:"last_failure_timestamp"`
Successes int `json:"successes"`
Failures int `json:"failures"`
LastSuccessTimestamp interface{} `json:"last_success_timestamp"`
LastError interface{} `json:"last_error"`
} `json:"reloads"`
Reloads PipelineReloadResponse `json:"reloads"`
Queue struct {
Type string `json:"type"`
EventsCount int `json:"events_count"`
Expand Down Expand Up @@ -212,16 +206,18 @@ type PipelineLogstashMonitoringResponse struct {
Filters []interface{} `json:"filters"`
Outputs []interface{} `json:"outputs"`
} `json:"plugins"`
Reloads struct {
LastFailureTimestamp *time.Time `json:"last_failure_timestamp,omitempty"`
Successes int `json:"successes"`
Failures int `json:"failures"`
LastSuccessTimestamp *time.Time `json:"last_success_timestamp,omitempty"`
LastError string `json:"last_error,omitempty"`
} `json:"reloads"`
Reloads PipelineReloadResponse `json:"reloads"`
Queue interface{} `json:"queue,omitempty"`
}

type PipelineReloadResponse struct {
LastFailureTimestamp *time.Time `json:"last_failure_timestamp,omitempty"`
Successes int `json:"successes"`
Failures int `json:"failures"`
LastSuccessTimestamp *time.Time `json:"last_success_timestamp,omitempty"`
LastError string `json:"last_error,omitempty"`
}

type ReloadResponse struct {
Successes int `json:"successes"`
Failures int `json:"failures"`
Expand Down
1 change: 1 addition & 0 deletions scripts/snapshots/metric_names.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ logstash_stats_jvm_mem_non_heap_committed_bytes
logstash_stats_jvm_threads_count
logstash_stats_jvm_threads_peak_count
logstash_stats_jvm_uptime_millis
logstash_stats_pipeline_up
logstash_stats_pipeline_events_duration
logstash_stats_pipeline_events_filtered
logstash_stats_pipeline_events_in
Expand Down

0 comments on commit dbb3cc4

Please sign in to comment.