Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional logging when shutting down the pipeline. #4986

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public Pipeline(
this.sinkExecutorService = PipelineThreadPoolExecutor.newFixedThreadPool(processorThreads,
new PipelineThreadFactory(format("%s-sink-worker", name)), this);

this.pipelineShutdown = new PipelineShutdown(buffer);
this.pipelineShutdown = new PipelineShutdown(name, buffer);
}

AcknowledgementSetManager getAcknowledgementSetManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.opensearch.dataprepper.DataPrepperShutdownOptions;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Clock;
import java.time.Duration;
Expand All @@ -15,19 +17,25 @@
import java.util.concurrent.atomic.AtomicBoolean;

class PipelineShutdown {
private static final Logger LOG = LoggerFactory.getLogger(PipelineShutdown.class);

private final AtomicBoolean stopRequested = new AtomicBoolean(false);
private final Duration bufferDrainTimeout;
private final Duration pipelineConfiguredBufferDrainTimeout;
private final Clock clock;
private final String pipelineName;
private Instant shutdownRequestedAt;
private Instant forceStopReadingBuffersAt;
private Duration bufferDrainTimeoutOverride;
private Duration bufferDrainTimeout;

PipelineShutdown(final Buffer<?> buffer) {
this(buffer, Clock.systemDefaultZone());
PipelineShutdown(final String pipelineName, final Buffer<?> buffer) {
this(pipelineName, buffer, Clock.systemDefaultZone());
}

PipelineShutdown(final Buffer<?> buffer, final Clock clock) {
bufferDrainTimeout = Objects.requireNonNull(buffer.getDrainTimeout());
PipelineShutdown(String pipelineName, final Buffer<?> buffer, final Clock clock) {
this.pipelineName = pipelineName;
pipelineConfiguredBufferDrainTimeout = Objects.requireNonNull(buffer.getDrainTimeout());
bufferDrainTimeout = pipelineConfiguredBufferDrainTimeout;
this.clock = clock;
}

Expand All @@ -48,7 +56,11 @@ public void shutdown(final DataPrepperShutdownOptions dataPrepperShutdownOptions
final Duration bufferDrainTimeoutOverride = dataPrepperShutdownOptions.getBufferDrainTimeout();
if(bufferDrainTimeoutOverride != null) {
this.bufferDrainTimeoutOverride = bufferDrainTimeoutOverride;
bufferDrainTimeout = bufferDrainTimeoutOverride;
}

LOG.info("Started shutdown for pipeline {}. Requested at {}. Force stop reading buffers at {}. The buffer drain timeout to use is {}",
pipelineName, shutdownRequestedAt, forceStopReadingBuffersAt, bufferDrainTimeout);
}

boolean isStopRequested() {
Expand All @@ -60,8 +72,7 @@ boolean isForceStopReadingBuffers() {
}

public Duration getBufferDrainTimeout() {
return bufferDrainTimeoutOverride != null ?
bufferDrainTimeoutOverride : bufferDrainTimeout;
return bufferDrainTimeout;
}

private Instant now() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Random;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -37,17 +38,19 @@ class PipelineShutdownTest {

private Duration bufferDrainTimeout;
private Random random;
private String pipelineName;

@BeforeEach
void setUp() {
random = new Random();
pipelineName = UUID.randomUUID().toString();
bufferDrainTimeout = Duration.ofSeconds(random.nextInt(100) + 1_000);

when(buffer.getDrainTimeout()).thenReturn(bufferDrainTimeout);
}

private PipelineShutdown createObjectUnderTest() {
return new PipelineShutdown(buffer, clock);
return new PipelineShutdown(pipelineName, buffer, clock);
}

@Test
Expand Down
Loading