From 7abc2bab9ad948bec672ed094f0c686f087ae42e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Sat, 9 Mar 2019 10:23:57 -0800 Subject: [PATCH] WIP: Get single vertex stats API to return desired structure --- .../lib/logstash/get_pipeline_vertex.js | 52 ++++++++++--------- .../get_pipeline_vertex_stats_aggregation.js | 8 +-- 2 files changed, 32 insertions(+), 28 deletions(-) diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex.js b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex.js index c58d13c47320b8d..718ebfc1b10e287 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex.js +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex.js @@ -50,40 +50,44 @@ export function _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationI /** * The UI needs a list of all vertices for the requested pipeline version, with each vertex in the list having its timeseries metrics associated with it. The - * stateDocument object provides the list of vertices while the statsAggregation object provides the latest metrics for each of these vertices. + * stateDocument object provides the list of vertices while the statsAggregation object provides the timeseries metrics for each of these vertices. * This function stitches the two together and returns the modified stateDocument object. * * @param {Object} stateDocument - * @param {Object} statsAggregation + * @param {Object} vertexStatsAggregation * @param {Object} First and last seen timestamps for pipeline version we're getting data for * @param {Integer} timeseriesIntervalInSeconds The size of each timeseries bucket, in seconds */ -export function _enrichVertexStateWithStatsAggregation(stateDocument, statsAggregation, timeseriesIntervalInSeconds) { +export function _enrichVertexStateWithStatsAggregation(stateDocument, vertexStatsAggregation, vertexId, timeseriesIntervalInSeconds) { const logstashState = stateDocument.logstash_state; const vertices = logstashState.pipeline.representation.graph.vertices; - const verticesById = {}; - vertices.forEach(vertex => { - verticesById[vertex.id] = vertex; - vertex.stats = {}; + // First, filter out the vertex we care about + const vertex = vertices.find(v => v.id = vertexId); + vertex.stats = {}; + + // Gather total duration stats needed later for computing each timeseries bucket stats for the vertex + // const totalDurationStats = vertexStatsAggregation.aggregations.pipelines.scoped.total_processor_duration_stats; + // const totalProcessorsDurationInMillis = totalDurationStats.max - totalDurationStats.min; + // TODO + const totalProcessorsDurationInMillis = 100000; + + // Next, iterate over timeseries metrics and attach them to vertex + const timeSeriesBuckets = vertexStatsAggregation.aggregations.timeseries.buckets; + timeSeriesBuckets.forEach(timeSeriesBucket => { + const timestamp = timeSeriesBucket.key; + + const vertexStatsBucket = timeSeriesBucket.pipelines.scoped.vertices.vertex_id; + const vertexStats = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds); + Object.keys(vertexStats).forEach(stat => { + if (!vertex.stats.hasOwnProperty(stat)) { + vertex.stats[stat] = { data: [] }; + } + vertex.stats[stat].data.push([ timestamp, vertexStats[stat] ]); + }); }); - const totalDurationStats = statsAggregation.aggregations.pipelines.scoped.total_processor_duration_stats; - const totalProcessorsDurationInMillis = totalDurationStats.max - totalDurationStats.min; - - const verticesWithStatsBuckets = statsAggregation.aggregations.pipelines.scoped.vertices.vertex_id.buckets; - verticesWithStatsBuckets.forEach(vertexStatsBucket => { - // Each vertexStats bucket contains a list of stats for a single vertex within a single timeseries interval - const vertexId = vertexStatsBucket.key; - const vertex = verticesById[vertexId]; - - if (vertex !== undefined) { - // We extract this vertex's stats from vertexStatsBucket - vertex.stats = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds); - } - }); - - return stateDocument.logstash_state; + return vertex; } export async function getPipelineVertex(req, config, lsIndexPattern, clusterUuid, pipelineId, pipelineHash, vertexId) { @@ -115,5 +119,5 @@ export async function getPipelineVertex(req, config, lsIndexPattern, clusterUuid return boom.notFound(`Pipeline [${pipelineId} @ ${version.hash}] not found in the selected time range for cluster [${clusterUuid}].`); } - return _enrichVertexStateWithStatsAggregation(stateDocument, statsAggregation, timeseriesInterval); + return _enrichVertexStateWithStatsAggregation(stateDocument, statsAggregation, vertexId, timeseriesInterval); } diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex_stats_aggregation.js b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex_stats_aggregation.js index a95c67884d59f04..e3709b8034d0b05 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex_stats_aggregation.js +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex_stats_aggregation.js @@ -127,10 +127,10 @@ function fetchPipelineVertexTimeSeriesStats(query, logstashIndexPattern, pipelin size: 0, ignoreUnavailable: true, filterPath: [ - 'aggregations.pipelines.timeseries.scoped.vertices.vertex_id.buckets.key', - 'aggregations.pipelines.timeseries.scoped.vertices.vertex_id.buckets.events_in_total', - 'aggregations.pipelines.timeseries.scoped.vertices.vertex_id.buckets.events_out_total', - 'aggregations.pipelines.timeseries.scoped.vertices.vertex_id.buckets.duration_in_millis_total', + 'aggregations.timeseries.buckets.key', + 'aggregations.timeseries.buckets.pipelines.scoped.vertices.vertex_id.events_in_total', + 'aggregations.timeseries.buckets.pipelines.scoped.vertices.vertex_id.events_out_total', + 'aggregations.timeseries.buckets.pipelines.scoped.vertices.vertex_id.duration_in_millis_total', 'aggregations.pipelines.scoped.total_processor_duration_stats' ], body: {