Skip to content

Commit

Permalink
WIP: Get single vertex stats API to return desired structure
Browse files Browse the repository at this point in the history
  • Loading branch information
ycombinator committed Mar 9, 2019
1 parent 1885486 commit 7abc2ba
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,40 +50,44 @@ export function _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationI

/**
* The UI needs a list of all vertices for the requested pipeline version, with each vertex in the list having its timeseries metrics associated with it. The
* stateDocument object provides the list of vertices while the statsAggregation object provides the latest metrics for each of these vertices.
* stateDocument object provides the list of vertices while the statsAggregation object provides the timeseries metrics for each of these vertices.
* This function stitches the two together and returns the modified stateDocument object.
*
* @param {Object} stateDocument
* @param {Object} statsAggregation
* @param {Object} vertexStatsAggregation
* @param {Object} First and last seen timestamps for pipeline version we're getting data for
* @param {Integer} timeseriesIntervalInSeconds The size of each timeseries bucket, in seconds
*/
export function _enrichVertexStateWithStatsAggregation(stateDocument, statsAggregation, timeseriesIntervalInSeconds) {
export function _enrichVertexStateWithStatsAggregation(stateDocument, vertexStatsAggregation, vertexId, timeseriesIntervalInSeconds) {
const logstashState = stateDocument.logstash_state;
const vertices = logstashState.pipeline.representation.graph.vertices;

const verticesById = {};
vertices.forEach(vertex => {
verticesById[vertex.id] = vertex;
vertex.stats = {};
// First, filter out the vertex we care about
const vertex = vertices.find(v => v.id = vertexId);
vertex.stats = {};

// Gather total duration stats needed later for computing each timeseries bucket stats for the vertex
// const totalDurationStats = vertexStatsAggregation.aggregations.pipelines.scoped.total_processor_duration_stats;
// const totalProcessorsDurationInMillis = totalDurationStats.max - totalDurationStats.min;
// TODO
const totalProcessorsDurationInMillis = 100000;

// Next, iterate over timeseries metrics and attach them to vertex
const timeSeriesBuckets = vertexStatsAggregation.aggregations.timeseries.buckets;
timeSeriesBuckets.forEach(timeSeriesBucket => {
const timestamp = timeSeriesBucket.key;

const vertexStatsBucket = timeSeriesBucket.pipelines.scoped.vertices.vertex_id;
const vertexStats = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds);
Object.keys(vertexStats).forEach(stat => {
if (!vertex.stats.hasOwnProperty(stat)) {
vertex.stats[stat] = { data: [] };
}
vertex.stats[stat].data.push([ timestamp, vertexStats[stat] ]);
});
});

const totalDurationStats = statsAggregation.aggregations.pipelines.scoped.total_processor_duration_stats;
const totalProcessorsDurationInMillis = totalDurationStats.max - totalDurationStats.min;

const verticesWithStatsBuckets = statsAggregation.aggregations.pipelines.scoped.vertices.vertex_id.buckets;
verticesWithStatsBuckets.forEach(vertexStatsBucket => {
// Each vertexStats bucket contains a list of stats for a single vertex within a single timeseries interval
const vertexId = vertexStatsBucket.key;
const vertex = verticesById[vertexId];

if (vertex !== undefined) {
// We extract this vertex's stats from vertexStatsBucket
vertex.stats = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds);
}
});

return stateDocument.logstash_state;
return vertex;
}

export async function getPipelineVertex(req, config, lsIndexPattern, clusterUuid, pipelineId, pipelineHash, vertexId) {
Expand Down Expand Up @@ -115,5 +119,5 @@ export async function getPipelineVertex(req, config, lsIndexPattern, clusterUuid
return boom.notFound(`Pipeline [${pipelineId} @ ${version.hash}] not found in the selected time range for cluster [${clusterUuid}].`);
}

return _enrichVertexStateWithStatsAggregation(stateDocument, statsAggregation, timeseriesInterval);
return _enrichVertexStateWithStatsAggregation(stateDocument, statsAggregation, vertexId, timeseriesInterval);
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ function fetchPipelineVertexTimeSeriesStats(query, logstashIndexPattern, pipelin
size: 0,
ignoreUnavailable: true,
filterPath: [
'aggregations.pipelines.timeseries.scoped.vertices.vertex_id.buckets.key',
'aggregations.pipelines.timeseries.scoped.vertices.vertex_id.buckets.events_in_total',
'aggregations.pipelines.timeseries.scoped.vertices.vertex_id.buckets.events_out_total',
'aggregations.pipelines.timeseries.scoped.vertices.vertex_id.buckets.duration_in_millis_total',
'aggregations.timeseries.buckets.key',
'aggregations.timeseries.buckets.pipelines.scoped.vertices.vertex_id.events_in_total',
'aggregations.timeseries.buckets.pipelines.scoped.vertices.vertex_id.events_out_total',
'aggregations.timeseries.buckets.pipelines.scoped.vertices.vertex_id.duration_in_millis_total',
'aggregations.pipelines.scoped.total_processor_duration_stats'
],
body: {
Expand Down

0 comments on commit 7abc2ba

Please sign in to comment.