diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java b/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java new file mode 100644 index 0000000000000..4e809aa5b444e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java @@ -0,0 +1,95 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.metrics.MeanMetric; + +/** + *

Metrics to measure ingest actions. + *

This counts measure documents and timings for a given scope. + * The scope is determined by the calling code. For example you can use this class to count all documents across all pipeline, + * or you can use this class to count documents for a given pipeline or a specific processor. + * This class does not make assumptions about it's given scope. + */ +class IngestMetric { + + /** + * The time it takes to complete the measured item. + */ + private final MeanMetric ingestTime = new MeanMetric(); + /** + * The current count of things being measure. Should most likely ever be 0 or 1. + * Useful when aggregating multiple metrics to see how many things are in flight. + */ + private final CounterMetric ingestCurrent = new CounterMetric(); + /** + * The ever increasing count of things being measured + */ + private final CounterMetric ingestCount = new CounterMetric(); + /** + * The only increasing count of failures + */ + private final CounterMetric ingestFailed = new CounterMetric(); + + /** + * Call this prior to the ingest action. + */ + void preIngest() { + ingestCurrent.inc(); + } + + /** + * Call this after the performing the ingest action, even if the action failed. + * @param ingestTimeInMillis The time it took to perform the action. + */ + void postIngest(long ingestTimeInMillis) { + ingestCurrent.dec(); + ingestTime.inc(ingestTimeInMillis); + ingestCount.inc(); + } + + /** + * Call this if the ingest action failed. + */ + void ingestFailed() { + ingestFailed.inc(); + } + + /** + *

Add two sets of metrics together. + *

Note - this method does not add the current count values. + * The current count value is ephemeral and requires a increase/decrease operation pairs to keep the value correct. + * + * @param metrics The metric to add. + */ + void add(IngestMetric metrics) { + ingestCount.inc(metrics.ingestCount.count()); + ingestTime.inc(metrics.ingestTime.sum()); + ingestFailed.inc(metrics.ingestFailed.count()); + } + + /** + * Creates a serializable representation for these metrics. + */ + IngestStats.Stats createStats() { + return new IngestStats.Stats(ingestCount.count(), ingestTime.sum(), ingestCurrent.count(), ingestFailed.count()); + } +} diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 3cba98a45016a..5bc24a367da33 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -23,16 +23,16 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.stream.Collectors; + import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; @@ -50,8 +50,6 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.metrics.CounterMetric; -import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -80,8 +78,7 @@ public class IngestService implements ClusterStateApplier { // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around. private volatile Map pipelines = new HashMap<>(); private final ThreadPool threadPool; - private final StatsHolder totalStats = new StatsHolder(); - private volatile Map statsHolderPerPipeline = Collections.emptyMap(); + private final IngestMetric totalMetrics = new IngestMetric(); public IngestService(ClusterService clusterService, ThreadPool threadPool, Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, @@ -258,10 +255,16 @@ Map pipelines() { @Override public void applyClusterState(final ClusterChangedEvent event) { ClusterState state = event.state(); + Map originalPipelines = pipelines; innerUpdatePipelines(event.previousState(), state); - IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); - if (ingestMetadata != null) { - updatePipelineStats(ingestMetadata); + //pipelines changed, so add the old metrics to the new metrics + if (originalPipelines != pipelines) { + pipelines.forEach((id, pipeline) -> { + Pipeline originalPipeline = originalPipelines.get(id); + if (originalPipeline != null) { + pipeline.getMetrics().add(originalPipeline.getMetrics()); + } + }); } } @@ -326,6 +329,7 @@ void validatePipeline(Map ingestInfos, PutPipelineReq public void executeBulkRequest(Iterable> actionRequests, BiConsumer itemFailureHandler, Consumer completionHandler, Consumer itemDroppedHandler) { + threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { @Override @@ -368,37 +372,11 @@ protected void doRun() { } public IngestStats stats() { - Map statsHolderPerPipeline = this.statsHolderPerPipeline; - Map statsPerPipeline = new HashMap<>(statsHolderPerPipeline.size()); - for (Map.Entry entry : statsHolderPerPipeline.entrySet()) { - statsPerPipeline.put(entry.getKey(), entry.getValue().createStats()); - } + Map statsPerPipeline = + pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics().createStats())); - return new IngestStats(totalStats.createStats(), statsPerPipeline); - } - - void updatePipelineStats(IngestMetadata ingestMetadata) { - boolean changed = false; - Map newStatsPerPipeline = new HashMap<>(statsHolderPerPipeline); - Iterator iterator = newStatsPerPipeline.keySet().iterator(); - while (iterator.hasNext()) { - String pipeline = iterator.next(); - if (ingestMetadata.getPipelines().containsKey(pipeline) == false) { - iterator.remove(); - changed = true; - } - } - for (String pipeline : ingestMetadata.getPipelines().keySet()) { - if (newStatsPerPipeline.containsKey(pipeline) == false) { - newStatsPerPipeline.put(pipeline, new StatsHolder()); - changed = true; - } - } - - if (changed) { - statsHolderPerPipeline = Collections.unmodifiableMap(newStatsPerPipeline); - } + return new IngestStats(totalMetrics.createStats(), statsPerPipeline); } private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer itemDroppedHandler) throws Exception { @@ -409,10 +387,8 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer long startTimeInNanos = System.nanoTime(); // the pipeline specific stat holder may not exist and that is fine: // (e.g. the pipeline may have been removed while we're ingesting a document - Optional pipelineStats = Optional.ofNullable(statsHolderPerPipeline.get(pipeline.getId())); try { - totalStats.preIngest(); - pipelineStats.ifPresent(StatsHolder::preIngest); + totalMetrics.preIngest(); String index = indexRequest.index(); String type = indexRequest.type(); String id = indexRequest.id(); @@ -438,13 +414,11 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer indexRequest.source(ingestDocument.getSourceAndMetadata()); } } catch (Exception e) { - totalStats.ingestFailed(); - pipelineStats.ifPresent(StatsHolder::ingestFailed); + totalMetrics.ingestFailed(); throw e; } finally { long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); - totalStats.postIngest(ingestTimeInMillis); - pipelineStats.ifPresent(statsHolder -> statsHolder.postIngest(ingestTimeInMillis)); + totalMetrics.postIngest(ingestTimeInMillis); } } @@ -481,27 +455,4 @@ private void innerUpdatePipelines(ClusterState previousState, ClusterState state ExceptionsHelper.rethrowAndSuppress(exceptions); } - private static class StatsHolder { - - private final MeanMetric ingestMetric = new MeanMetric(); - private final CounterMetric ingestCurrent = new CounterMetric(); - private final CounterMetric ingestFailed = new CounterMetric(); - - void preIngest() { - ingestCurrent.inc(); - } - - void postIngest(long ingestTimeInMillis) { - ingestCurrent.dec(); - ingestMetric.inc(ingestTimeInMillis); - } - - void ingestFailed() { - ingestFailed.inc(); - } - - IngestStats.Stats createStats() { - return new IngestStats.Stats(ingestMetric.count(), ingestMetric.sum(), ingestCurrent.count(), ingestFailed.count()); - } - } } diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 1d345ea5f7884..8d5f6d6ff7c54 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -22,10 +22,12 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Nullable; +import java.time.Clock; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; + import org.elasticsearch.script.ScriptService; /** @@ -44,12 +46,21 @@ public final class Pipeline { @Nullable private final Integer version; private final CompoundProcessor compoundProcessor; + private final IngestMetric metrics; + private final Clock clock; public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) { + this(id, description, version, compoundProcessor, Clock.systemUTC()); + } + + //package private for testing + Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, Clock clock) { this.id = id; this.description = description; this.compoundProcessor = compoundProcessor; this.version = version; + this.metrics = new IngestMetric(); + this.clock = clock; } public static Pipeline create(String id, Map config, @@ -78,7 +89,17 @@ public static Pipeline create(String id, Map config, * Modifies the data of a document to be indexed based on the processor this pipeline holds */ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - return compoundProcessor.execute(ingestDocument); + long startTimeInMillis = clock.millis(); + try { + metrics.preIngest(); + return compoundProcessor.execute(ingestDocument); + } catch (Exception e) { + metrics.ingestFailed(); + throw e; + } finally { + long ingestTimeInMillis = clock.millis() - startTimeInMillis; + metrics.postIngest(ingestTimeInMillis); + } } /** @@ -135,4 +156,11 @@ public List getOnFailureProcessors() { public List flattenAllProcessors() { return compoundProcessor.flattenProcessors(); } + + /** + * The metrics associated with this pipeline. + */ + public IngestMetric getMetrics() { + return metrics; + } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 8d1302a2ada0e..afae36427ad17 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -19,16 +19,6 @@ package org.elasticsearch.ingest; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ExecutorService; -import java.util.function.BiConsumer; -import java.util.function.Consumer; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; @@ -59,13 +49,22 @@ import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; @@ -769,16 +768,14 @@ public void testStats() { previousClusterState = clusterState; clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - final Map configurationMap = new HashMap<>(); - configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON)); - configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON)); - ingestService.updatePipelineStats(new IngestMetadata(configurationMap)); + @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1"); + indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterFirstRequestStats = ingestService.stats(); assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2)); @@ -793,23 +790,21 @@ public void testStats() { assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L)); - } - // issue: https://github.com/elastic/elasticsearch/issues/18126 - public void testUpdatingStatsWhenRemovingPipelineWorks() { - IngestService ingestService = createWithProcessors(); - Map configurationMap = new HashMap<>(); - configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON)); - configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON)); - ingestService.updatePipelineStats(new IngestMetadata(configurationMap)); - assertThat(ingestService.stats().getStatsPerPipeline(), hasKey("_id1")); - assertThat(ingestService.stats().getStatsPerPipeline(), hasKey("_id2")); - - configurationMap = new HashMap<>(); - configurationMap.put("_id3", new PipelineConfiguration("_id3", new BytesArray("{}"), XContentType.JSON)); - ingestService.updatePipelineStats(new IngestMetadata(configurationMap)); - assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id1"))); - assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id2"))); + //update cluster state and ensure that new stats are added to old stats + putRequest = new PutPipelineRequest("_id1", + new BytesArray("{\"processors\": [{\"mock\" : {}}, {\"mock\" : {}}]}"), XContentType.JSON); + previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + indexRequest.setPipeline("_id1"); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + final IngestStats afterThirdRequestStats = ingestService.stats(); + assertThat(afterThirdRequestStats.getStatsPerPipeline().size(), equalTo(2)); + assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(2L)); + assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); + assertThat(afterThirdRequestStats.getTotalStats().getIngestCount(), equalTo(3L)); + } private IngestDocument eqIndexTypeId(final Map source) { diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index 99fa7633d085a..018ded346d4fc 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -18,20 +18,17 @@ */ package org.elasticsearch.ingest; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.test.ESTestCase; + +import java.time.Clock; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.PipelineProcessor; -import org.elasticsearch.ingest.Processor; -import org.elasticsearch.ingest.RandomDocumentPicks; -import org.elasticsearch.test.ESTestCase; +import static org.hamcrest.CoreMatchers.equalTo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -130,4 +127,81 @@ innerPipelineId, null, null, new CompoundProcessor() outerProc.execute(testIngestDocument); outerProc.execute(testIngestDocument); } + + public void testPipelineProcessorWithPipelineChain() throws Exception { + String pipeline1Id = "pipeline1"; + String pipeline2Id = "pipeline2"; + String pipeline3Id = "pipeline3"; + IngestService ingestService = mock(IngestService.class); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + Map pipeline1ProcessorConfig = new HashMap<>(); + pipeline1ProcessorConfig.put("pipeline", pipeline2Id); + PipelineProcessor pipeline1Processor = factory.create(Collections.emptyMap(), null, pipeline1ProcessorConfig); + + Map pipeline2ProcessorConfig = new HashMap<>(); + pipeline2ProcessorConfig.put("pipeline", pipeline3Id); + PipelineProcessor pipeline2Processor = factory.create(Collections.emptyMap(), null, pipeline2ProcessorConfig); + + Clock clock = mock(Clock.class); + when(clock.millis()).thenReturn(0L).thenReturn(0L); + Pipeline pipeline1 = new Pipeline( + pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), clock + ); + + String key1 = randomAlphaOfLength(10); + clock = mock(Clock.class); + when(clock.millis()).thenReturn(0L).thenReturn(3L); + Pipeline pipeline2 = new Pipeline( + pipeline2Id, null, null, new CompoundProcessor(true, + Arrays.asList( + new TestProcessor(ingestDocument -> { + ingestDocument.setFieldValue(key1, randomInt()); + }), + pipeline2Processor), + Collections.emptyList()), + clock + ); + clock = mock(Clock.class); + when(clock.millis()).thenReturn(0L).thenReturn(2L); + Pipeline pipeline3 = new Pipeline( + pipeline3Id, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { + throw new RuntimeException("error"); + })), clock + ); + when(ingestService.getPipeline(pipeline1Id)).thenReturn(pipeline1); + when(ingestService.getPipeline(pipeline2Id)).thenReturn(pipeline2); + when(ingestService.getPipeline(pipeline3Id)).thenReturn(pipeline3); + + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + //start the chain + ingestDocument.executePipeline(pipeline1); + assertNotNull(ingestDocument.getSourceAndMetadata().get(key1)); + + //check the stats + IngestStats.Stats pipeline1Stats = pipeline1.getMetrics().createStats(); + IngestStats.Stats pipeline2Stats = pipeline2.getMetrics().createStats(); + IngestStats.Stats pipeline3Stats = pipeline3.getMetrics().createStats(); + + //current + assertThat(pipeline1Stats.getIngestCurrent(), equalTo(0L)); + assertThat(pipeline2Stats.getIngestCurrent(), equalTo(0L)); + assertThat(pipeline3Stats.getIngestCurrent(), equalTo(0L)); + + //count + assertThat(pipeline1Stats.getIngestCount(), equalTo(1L)); + assertThat(pipeline2Stats.getIngestCount(), equalTo(1L)); + assertThat(pipeline3Stats.getIngestCount(), equalTo(1L)); + + //time + assertThat(pipeline1Stats.getIngestTimeInMillis(), equalTo(0L)); + assertThat(pipeline2Stats.getIngestTimeInMillis(), equalTo(3L)); + assertThat(pipeline3Stats.getIngestTimeInMillis(), equalTo(2L)); + + //failure + assertThat(pipeline1Stats.getIngestFailedCount(), equalTo(0L)); + assertThat(pipeline2Stats.getIngestFailedCount(), equalTo(0L)); + assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L)); + } }