Skip to content

Commit

Permalink
Merge pull request #119 from kuskoman/flow-metrics
Browse files Browse the repository at this point in the history
Add support for pipeline flow metrics
  • Loading branch information
kuskoman committed May 16, 2023
2 parents 284e7ff + 9c5243b commit 7326fea
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 37 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,20 @@ Table of exported metrics:
| logstash_stats_pipeline_events_in | counter | Number of events that have been inputted into this pipeline. |
| logstash_stats_pipeline_events_out | counter | Number of events that have been processed by this pipeline. |
| logstash_stats_pipeline_events_queue_push_duration | counter | Time needed to push event to queue. |
| logstash_stats_pipeline_flow_filter_current | gauge | Current number of events in the filter queue. |
| logstash_stats_pipeline_flow_filter_lifetime | counter | Lifetime number of events in the filter queue. |
| logstash_stats_pipeline_flow_input_current | gauge | Current number of events in the input queue. |
| logstash_stats_pipeline_flow_input_lifetime | counter | Lifetime number of events in the input queue. |
| logstash_stats_pipeline_flow_output_current | gauge | Current number of events in the output queue. |
| logstash_stats_pipeline_flow_output_lifetime | counter | Lifetime number of events in the output queue. |
| logstash_stats_pipeline_flow_queue_backpressure_current | gauge | Current number of events in the backpressure queue. |
| logstash_stats_pipeline_flow_queue_backpressure_lifetime | counter | Lifetime number of events in the backpressure queue. |
| logstash_stats_pipeline_flow_worker_concurrency_current | gauge | Current number of workers. |
| logstash_stats_pipeline_flow_worker_concurrency_lifetime | counter | Lifetime number of workers. |
| logstash_stats_pipeline_plugin_bulk_requests_errors | counter | Number of bulk request errors. |
| logstash_stats_pipeline_plugin_bulk_requests_responses | counter | Bulk request HTTP response counts by code. |
| logstash_stats_pipeline_plugin_documents_non_retryable_failures | counter | Number of output events with non-retryable failures. |
| logstash_stats_pipeline_plugin_documents_successes | counter | Number of successful bulk requests. |
| logstash_stats_pipeline_plugin_events_duration | counter | Time spent processing events in this plugin. |
| logstash_stats_pipeline_plugin_events_in | counter | Number of events received this pipeline. |
| logstash_stats_pipeline_plugin_events_out | counter | Number of events output by this pipeline. |
Expand Down
50 changes: 42 additions & 8 deletions collectors/nodestats/pipeline_subcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,21 @@ type PipelineSubcollector struct {
PipelinePluginEventsDuration *prometheus.Desc
PipelinePluginEventsQueuePushDuration *prometheus.Desc

PipelinePluginDocumentsSuccesses *prometheus.Desc
PipelinePluginDocumentsSuccesses *prometheus.Desc
PipelinePluginDocumentsNonRetryableFailures *prometheus.Desc
PipelinePluginBulkRequestErrors *prometheus.Desc
PipelinePluginBulkRequestResponses *prometheus.Desc
PipelinePluginBulkRequestErrors *prometheus.Desc
PipelinePluginBulkRequestResponses *prometheus.Desc

FlowInputCurrent *prometheus.Desc
FlowInputLifetime *prometheus.Desc
FlowFilterCurrent *prometheus.Desc
FlowFilterLifetime *prometheus.Desc
FlowOutputCurrent *prometheus.Desc
FlowOutputLifetime *prometheus.Desc
FlowQueueBackpressureCurrent *prometheus.Desc
FlowQueueBackpressureLifetime *prometheus.Desc
FlowWorkerConcurrencyCurrent *prometheus.Desc
FlowWorkerConcurrencyLifetime *prometheus.Desc
}

func NewPipelineSubcollector() *PipelineSubcollector {
Expand Down Expand Up @@ -73,10 +84,21 @@ func NewPipelineSubcollector() *PipelineSubcollector {
PipelinePluginEventsDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_duration", "Time spent processing events in this plugin.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsQueuePushDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_queue_push_duration", "Time spent pushing events into the input queue.", "pipeline", "plugin_type", "plugin", "plugin_id"),

PipelinePluginDocumentsSuccesses: descHelper.NewDescWithHelpAndLabels("plugin_documents_successes", "Number of successful bulk requests.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginDocumentsSuccesses: descHelper.NewDescWithHelpAndLabels("plugin_documents_successes", "Number of successful bulk requests.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginDocumentsNonRetryableFailures: descHelper.NewDescWithHelpAndLabels("plugin_documents_non_retryable_failures", "Number of output events with non-retryable failures.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginBulkRequestErrors: descHelper.NewDescWithHelpAndLabels("plugin_bulk_requests_errors", "Number of bulk request errors.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginBulkRequestResponses: descHelper.NewDescWithHelpAndLabels("plugin_bulk_requests_responses", "Bulk request HTTP response counts by code.", "pipeline", "plugin_type", "plugin", "plugin_id", "code"),
PipelinePluginBulkRequestErrors: descHelper.NewDescWithHelpAndLabels("plugin_bulk_requests_errors", "Number of bulk request errors.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginBulkRequestResponses: descHelper.NewDescWithHelpAndLabels("plugin_bulk_requests_responses", "Bulk request HTTP response counts by code.", "pipeline", "plugin_type", "plugin", "plugin_id", "code"),

FlowInputCurrent: descHelper.NewDescWithHelpAndLabels("flow_input_current", "Current number of events in the input queue.", "pipeline"),
FlowInputLifetime: descHelper.NewDescWithHelpAndLabels("flow_input_lifetime", "Lifetime number of events in the input queue.", "pipeline"),
FlowFilterCurrent: descHelper.NewDescWithHelpAndLabels("flow_filter_current", "Current number of events in the filter queue.", "pipeline"),
FlowFilterLifetime: descHelper.NewDescWithHelpAndLabels("flow_filter_lifetime", "Lifetime number of events in the filter queue.", "pipeline"),
FlowOutputCurrent: descHelper.NewDescWithHelpAndLabels("flow_output_current", "Current number of events in the output queue.", "pipeline"),
FlowOutputLifetime: descHelper.NewDescWithHelpAndLabels("flow_output_lifetime", "Lifetime number of events in the output queue.", "pipeline"),
FlowQueueBackpressureCurrent: descHelper.NewDescWithHelpAndLabels("flow_queue_backpressure_current", "Current number of events in the backpressure queue.", "pipeline"),
FlowQueueBackpressureLifetime: descHelper.NewDescWithHelpAndLabels("flow_queue_backpressure_lifetime", "Lifetime number of events in the backpressure queue.", "pipeline"),
FlowWorkerConcurrencyCurrent: descHelper.NewDescWithHelpAndLabels("flow_worker_concurrency_current", "Current number of workers.", "pipeline"),
FlowWorkerConcurrencyLifetime: descHelper.NewDescWithHelpAndLabels("flow_worker_concurrency_lifetime", "Lifetime number of workers.", "pipeline"),
}
}

Expand Down Expand Up @@ -106,17 +128,29 @@ func (collector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipeli
ch <- prometheus.MustNewConstMetric(collector.QueueEventsQueueSize, prometheus.CounterValue, float64(pipeStats.Queue.QueueSizeInBytes), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.QueueMaxQueueSizeInBytes, prometheus.CounterValue, float64(pipeStats.Queue.MaxQueueSizeInBytes), pipelineID)

flowStats := pipeStats.Flow
ch <- prometheus.MustNewConstMetric(collector.FlowInputCurrent, prometheus.GaugeValue, float64(flowStats.InputThroughput.Current), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowInputLifetime, prometheus.CounterValue, float64(flowStats.InputThroughput.Lifetime), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowFilterCurrent, prometheus.GaugeValue, float64(flowStats.FilterThroughput.Current), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowFilterLifetime, prometheus.CounterValue, float64(flowStats.FilterThroughput.Lifetime), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowOutputCurrent, prometheus.GaugeValue, float64(flowStats.OutputThroughput.Current), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowOutputLifetime, prometheus.CounterValue, float64(flowStats.OutputThroughput.Lifetime), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowQueueBackpressureCurrent, prometheus.GaugeValue, float64(flowStats.QueueBackpressure.Current), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowQueueBackpressureLifetime, prometheus.CounterValue, float64(flowStats.QueueBackpressure.Lifetime), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowWorkerConcurrencyCurrent, prometheus.GaugeValue, float64(flowStats.WorkerConcurrency.Current), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowWorkerConcurrencyLifetime, prometheus.CounterValue, float64(flowStats.WorkerConcurrency.Lifetime), pipelineID)

// Output error metrics
for _, output := range pipeStats.Plugins.Outputs {
pluginID := TruncatePluginId(output.ID)
pluginType := "output"
log.Printf("collecting output error stats for pipeline %s :: plugin type:%s name:%s id:%s", pipelineID, pluginType, output.Name, pluginID)

// Response codes returned by output Bulk Requests
for code, count := range output.BulkRequests.Responses {
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginBulkRequestResponses, prometheus.CounterValue, float64(count), pipelineID, pluginType, output.Name, pluginID, code)
}

ch <- prometheus.MustNewConstMetric(collector.PipelinePluginDocumentsSuccesses, prometheus.CounterValue, float64(output.Documents.Successes), pipelineID, pluginType, output.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginDocumentsNonRetryableFailures, prometheus.CounterValue, float64(output.Documents.NonRetryableFailures), pipelineID, pluginType, output.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginBulkRequestErrors, prometheus.CounterValue, float64(output.BulkRequests.WithErrors), pipelineID, pluginType, output.Name, pluginID)
Expand Down
35 changes: 7 additions & 28 deletions fetcher/responses/nodestats_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,17 @@ type SinglePipelineResponse struct {
DurationInMillis int `json:"duration_in_millis"`
} `json:"events"`
Documents struct {
Successes int `json:"successes"`
Successes int `json:"successes"`
NonRetryableFailures int `json:"non_retryable_failures"`
} `json:"documents"`
BulkRequests struct {
WithErrors int `json:"with_errors"`
Responses map[string]int `json:"responses"`
WithErrors int `json:"with_errors"`
Responses map[string]int `json:"responses"`
} `json:"bulk_requests"`
} `json:"outputs"`
} `json:"plugins"`
Reloads PipelineReloadResponse `json:"reloads"`
Queue struct {
Queue struct {
Type string `json:"type"`
EventsCount int `json:"events_count"`
QueueSizeInBytes int `json:"queue_size_in_bytes"`
Expand All @@ -186,36 +186,15 @@ type PipelineLogstashMonitoringResponse struct {
DurationInMillis int `json:"duration_in_millis"`
QueuePushDurationInMillis int `json:"queue_push_duration_in_millis"`
} `json:"events"`
Flow struct {
OutputThroughput struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
} `json:"output_throughput"`
WorkerConcurrency struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
} `json:"worker_concurrency"`
InputThroughput struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
} `json:"input_throughput"`
FilterThroughput struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
} `json:"filter_throughput"`
QueueBackpressure struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
} `json:"queue_backpressure"`
} `json:"flow"`
Flow FlowResponse `json:"flow"`
Plugins struct {
Inputs []interface{} `json:"inputs"`
Codecs []interface{} `json:"codecs"`
Filters []interface{} `json:"filters"`
Outputs []interface{} `json:"outputs"`
} `json:"plugins"`
Reloads PipelineReloadResponse `json:"reloads"`
Queue interface{} `json:"queue,omitempty"`
Queue interface{} `json:"queue,omitempty"`
}

type PipelineReloadResponse struct {
Expand All @@ -227,7 +206,7 @@ type PipelineReloadResponse struct {
}

type LastError struct {
Message string `json:"message"`
Message string `json:"message"`
Backtrace []string `json:"backtrace"`
}

Expand Down
2 changes: 1 addition & 1 deletion fixtures/node_stats.json
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@
]
},
"reloads": {
"last_failure_timestamp": "2023-04-20T20:00:32.437218256Z",
"last_failure_timestamp": "2023-04-20T20:00:32.437218256Z",
"successes": 3,
"failures": 1,
"last_success_timestamp": "2023-04-20T22:30:32.437218256Z",
Expand Down
10 changes: 10 additions & 0 deletions scripts/snapshots/metric_names.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,13 @@ logstash_stats_pipeline_plugin_bulk_requests_errors
logstash_stats_pipeline_plugin_bulk_requests_responses
logstash_stats_pipeline_plugin_documents_non_retryable_failures
logstash_stats_pipeline_plugin_documents_successes
logstash_stats_pipeline_flow_filter_current
logstash_stats_pipeline_flow_filter_lifetime
logstash_stats_pipeline_flow_input_current
logstash_stats_pipeline_flow_input_lifetime
logstash_stats_pipeline_flow_output_current
logstash_stats_pipeline_flow_output_lifetime
logstash_stats_pipeline_flow_queue_backpressure_current
logstash_stats_pipeline_flow_queue_backpressure_lifetime
logstash_stats_pipeline_flow_worker_concurrency_current
logstash_stats_pipeline_flow_worker_concurrency_lifetime

0 comments on commit 7326fea

Please sign in to comment.