diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java index de22876041..c3d604601d 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java @@ -137,7 +137,7 @@ public Pipeline( this.sinkExecutorService = PipelineThreadPoolExecutor.newFixedThreadPool(processorThreads, new PipelineThreadFactory(format("%s-sink-worker", name)), this); - this.pipelineShutdown = new PipelineShutdown(buffer); + this.pipelineShutdown = new PipelineShutdown(name, buffer); } AcknowledgementSetManager getAcknowledgementSetManager() { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelineShutdown.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelineShutdown.java index f3731e9d67..7398e3f688 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelineShutdown.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelineShutdown.java @@ -7,6 +7,8 @@ import org.opensearch.dataprepper.DataPrepperShutdownOptions; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Clock; import java.time.Duration; @@ -15,19 +17,25 @@ import java.util.concurrent.atomic.AtomicBoolean; class PipelineShutdown { + private static final Logger LOG = LoggerFactory.getLogger(PipelineShutdown.class); + private final AtomicBoolean stopRequested = new AtomicBoolean(false); - private final Duration bufferDrainTimeout; + private final Duration pipelineConfiguredBufferDrainTimeout; private final Clock clock; + private final String pipelineName; private Instant shutdownRequestedAt; private Instant forceStopReadingBuffersAt; private Duration bufferDrainTimeoutOverride; + private Duration bufferDrainTimeout; - PipelineShutdown(final Buffer buffer) { - this(buffer, Clock.systemDefaultZone()); + PipelineShutdown(final String pipelineName, final Buffer buffer) { + this(pipelineName, buffer, Clock.systemDefaultZone()); } - PipelineShutdown(final Buffer buffer, final Clock clock) { - bufferDrainTimeout = Objects.requireNonNull(buffer.getDrainTimeout()); + PipelineShutdown(String pipelineName, final Buffer buffer, final Clock clock) { + this.pipelineName = pipelineName; + pipelineConfiguredBufferDrainTimeout = Objects.requireNonNull(buffer.getDrainTimeout()); + bufferDrainTimeout = pipelineConfiguredBufferDrainTimeout; this.clock = clock; } @@ -48,7 +56,11 @@ public void shutdown(final DataPrepperShutdownOptions dataPrepperShutdownOptions final Duration bufferDrainTimeoutOverride = dataPrepperShutdownOptions.getBufferDrainTimeout(); if(bufferDrainTimeoutOverride != null) { this.bufferDrainTimeoutOverride = bufferDrainTimeoutOverride; + bufferDrainTimeout = bufferDrainTimeoutOverride; } + + LOG.info("Started shutdown for pipeline {}. Requested at {}. Force stop reading buffers at {}. The buffer drain timeout to use is {}", + pipelineName, shutdownRequestedAt, forceStopReadingBuffersAt, bufferDrainTimeout); } boolean isStopRequested() { @@ -60,8 +72,7 @@ boolean isForceStopReadingBuffers() { } public Duration getBufferDrainTimeout() { - return bufferDrainTimeoutOverride != null ? - bufferDrainTimeoutOverride : bufferDrainTimeout; + return bufferDrainTimeout; } private Instant now() { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineShutdownTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineShutdownTest.java index 36ac4aa3d1..ca7d2bf1bd 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineShutdownTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineShutdownTest.java @@ -17,6 +17,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Random; +import java.util.UUID; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -37,17 +38,19 @@ class PipelineShutdownTest { private Duration bufferDrainTimeout; private Random random; + private String pipelineName; @BeforeEach void setUp() { random = new Random(); + pipelineName = UUID.randomUUID().toString(); bufferDrainTimeout = Duration.ofSeconds(random.nextInt(100) + 1_000); when(buffer.getDrainTimeout()).thenReturn(bufferDrainTimeout); } private PipelineShutdown createObjectUnderTest() { - return new PipelineShutdown(buffer, clock); + return new PipelineShutdown(pipelineName, buffer, clock); } @Test