Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies #12161

Merged
merged 43 commits into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
207c4ef
KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies
jnh5y May 13, 2022
b73708f
Cleaning up reviewer comments.
jnh5y May 18, 2022
070b867
Some work on fixing up the StoreChangelogReader.
jnh5y May 19, 2022
d019d7a
Updates to the IT.
jnh5y May 23, 2022
66fc07e
IntelliJ refactoring to fix complexity.
jnh5y May 23, 2022
e0a80bb
Adding unit tests for KafkaStreams.
jnh5y May 23, 2022
85e506d
Adding unit test showing that only non-paused, standby tasks are rest…
jnh5y May 23, 2022
a620621
Found the suppressions for JavaNCSS.
jnh5y May 23, 2022
0a37879
Added TaskExecutionMetadataTest.
jnh5y May 23, 2022
34ec8ac
Adding TopologyMetadataTest.
jnh5y May 23, 2022
fb016a8
Fixes to make the StoreChangelogReader work.
jnh5y May 24, 2022
6d57d04
Responding to reviewer comments.
jnh5y May 24, 2022
d007d15
Minor refactoring / renaming.
jnh5y May 24, 2022
d7be172
Update streams/src/test/java/org/apache/kafka/streams/integration/Pau…
jnh5y Jun 1, 2022
b2385ef
Merge remote-tracking branch 'origin/trunk' into kafka-13873
jnh5y Jun 1, 2022
759d3a8
Using assertThrows.
jnh5y Jun 1, 2022
cfb18f5
Updating TaskExecutorMetadataTest.
jnh5y Jun 1, 2022
a3bd8ae
Extracted some code in StreamThreadTest.java.
jnh5y Jun 1, 2022
cb6d6f1
Refactoring StoreChangelogReader.
jnh5y Jun 1, 2022
722132f
Fixing a checkstyle issue.
jnh5y Jun 1, 2022
c8f8c74
commit 5d25096520503a733b5e520c022d25aaf9d939a7
jnh5y Jun 2, 2022
337c995
Various clean up.
jnh5y Jun 2, 2022
0cedb80
Small clean up.
jnh5y Jun 2, 2022
d30e3b3
Adding PR IT and fixing an import.
jnh5y Jun 2, 2022
148da23
Test which shows that active tasks still restore.
jnh5y Jun 2, 2022
e467852
Pausing active task restoration.
jnh5y Jun 3, 2022
1bcc5c9
Updated PR IT with multiple client example. This is hitting SESSION_…
jnh5y Jun 3, 2022
08e332d
Updating the PauseResume IT so that the multiple instance test runs f…
jnh5y Jun 3, 2022
c495fb6
Fixing StoreChangelogReaderTest.
jnh5y Jun 3, 2022
771c296
Removing most comments.
jnh5y Jun 3, 2022
e076a07
Added a little getTopicSize function.
jnh5y Jun 3, 2022
f56dd99
Verifying Streams pausing by watching the poll count.
jnh5y Jun 3, 2022
6771854
Update streams/src/test/java/org/apache/kafka/streams/processor/inter…
jnh5y Jun 9, 2022
12342d5
Update streams/src/test/java/org/apache/kafka/streams/processor/inter…
jnh5y Jun 9, 2022
61a302b
Update streams/src/test/java/org/apache/kafka/streams/processor/inter…
jnh5y Jun 9, 2022
06c98ea
Updating import order and sorting out other issues.
jnh5y Jun 9, 2022
1cdbc15
Responding to more reviewer comments.
jnh5y Jun 9, 2022
ce707de
Small test changes before adding new test.
jnh5y Jun 13, 2022
769a53a
Partial unit test?
jnh5y Jun 14, 2022
ea77133
Added new unit test.
jnh5y Jun 15, 2022
f7799a5
Removing assertNoLag.
jnh5y Jun 15, 2022
e1c698b
Responding to reviewer comments.
jnh5y Jun 15, 2022
c5d7abc
Responding to reviewer comments.
jnh5y Jun 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>

<suppress checks="JavaNCSS"
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|TaskManagerTest).java"/>
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I think we should try to avoid adding suppressions. But I also see that StreamThreadTest would need quite some love at the moment which is not the intent of this PR.


<suppress checks="NPathComplexity"
files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
Expand Down
48 changes: 47 additions & 1 deletion streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread.State;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
Expand All @@ -65,6 +64,7 @@
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.QueryConfig;
Expand Down Expand Up @@ -111,6 +111,7 @@
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;

/**
* A Kafka client that allows for performing continuous computation on input coming from one or more input topics and
Expand Down Expand Up @@ -1662,6 +1663,51 @@ public <T> T store(final StoreQueryParameters<T> storeQueryParameters) {
return queryableStoreProvider.getStore(storeQueryParameters);
}

/**
* This method pauses processing for the KafkaStreams instance.
*
* Paused topologies will only skip over a) processing, b) punctuation, and c) standby tasks.
* Notably, paused topologies will still poll Kafka consumers, and commit offsets.
* This method sets transient state that is not maintained or managed among instances.
* Note that pause() can be called before start() in order to start a KafkaStreams instance
* in a manner where the processing is paused as described, but the consumers are started up.
*/
public void pause() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add unit tests for the new methods?

Copy link
Contributor Author

@jnh5y jnh5y May 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! I added some in this commit e0a80bb.

if (topologyMetadata.hasNamedTopologies()) {
for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) {
topologyMetadata.pauseTopology(namedTopology.name());
}
} else {
topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);
}
}

/**
* @return true when the KafkaStreams instance has its processing paused.
*/
public boolean isPaused() {
if (topologyMetadata.hasNamedTopologies()) {
return topologyMetadata.getAllNamedTopologies().stream()
.map(NamedTopology::name)
.allMatch(topologyMetadata::isPaused);
} else {
return topologyMetadata.isPaused(UNNAMED_TOPOLOGY);
}
}

/**
* This method resumes processing for the KafkaStreams instance.
*/
public void resume() {
if (topologyMetadata.hasNamedTopologies()) {
for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) {
topologyMetadata.resumeTopology(namedTopology.name());
}
} else {
topologyMetadata.resumeTopology(UNNAMED_TOPOLOGY);
}
}

/**
* handle each stream thread in a snapshot of threads.
* noted: iteration over SynchronizedList is not thread safe so it must be manually synchronized. However, we may
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,68 +415,111 @@ public Set<TopicPartition> completedChangelogs() {
public void restore(final Map<TaskId, Task> tasks) {
initializeChangelogs(tasks, registeredChangelogs());

if (!activeRestoringChangelogs().isEmpty() && state == ChangelogReaderState.STANDBY_UPDATING) {
throw new IllegalStateException("Should not be in standby updating state if there are still un-completed active changelogs");
if (!activeRestoringChangelogs().isEmpty()
&& state == ChangelogReaderState.STANDBY_UPDATING) {
jnh5y marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalStateException(
"Should not be in standby updating state if there are still un-completed active changelogs");
}

if (allChangelogsCompleted()) {
log.debug("Finished restoring all changelogs {}", changelogs.keySet());
return;
}
} else {
final Set<TopicPartition> restoringChangelogs = restoringChangelogs();
if (!restoringChangelogs.isEmpty()) {
final ConsumerRecords<byte[], byte[]> polledRecords;

final Set<TopicPartition> restoringChangelogs = restoringChangelogs();
if (!restoringChangelogs.isEmpty()) {
final ConsumerRecords<byte[], byte[]> polledRecords;
try {
// for restoring active and updating standby we may prefer different poll time
// in order to make sure we call the main consumer#poll in time.
// TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these comments should be moved before the call to restoreConsumer.poll(). BTW, this again confirms that inline comments are a poor way to document code, most of the times.

// JNH: Fix this?
// Update state based on paused/resumed status.
for (final TopicPartition partition : restoringChangelogs) {
final TaskId taskId = changelogs.get(partition).stateManager.taskId();
final Task task = tasks.get(taskId);
if (task != null) {
jnh5y marked this conversation as resolved.
Show resolved Hide resolved
restoreConsumer.resume(Collections.singleton(partition));
} else {
restoreConsumer.pause(Collections.singleton(partition));
}
}

try {
// for restoring active and updating standby we may prefer different poll time
// in order to make sure we call the main consumer#poll in time.
// TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);

// TODO (?) If we cannot fetch records during restore, should we trigger `task.timeout.ms` ?
// TODO (?) If we cannot fetch records for standby task, should we trigger `task.timeout.ms` ?
} catch (final InvalidOffsetException e) {
log.warn("Encountered " + e.getClass().getName() +
" fetching records from restore consumer for partitions " + e.partitions() + ", it is likely that " +
"the consumer's position has fallen out of the topic partition offset range because the topic was " +
"truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" +
" it later.", e);

final Set<TaskId> corruptedTasks = new HashSet<>();
e.partitions().forEach(partition -> corruptedTasks.add(changelogs.get(partition).stateManager.taskId()));
throw new TaskCorruptedException(corruptedTasks, e);
} catch (final KafkaException e) {
throw new StreamsException("Restore consumer get unexpected error polling records.", e);
}
polledRecords = restoreConsumer.poll(
state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);

// TODO (?) If we cannot fetch records during restore, should we trigger `task.timeout.ms` ?
// TODO (?) If we cannot fetch records for standby task, should we trigger `task.timeout.ms` ?
} catch (final InvalidOffsetException e) {
log.warn("Encountered " + e.getClass().getName() +
" fetching records from restore consumer for partitions " + e.partitions()
+ ", it is likely that " +
"the consumer's position has fallen out of the topic partition offset range because the topic was "
+
"truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing"
+
" it later.", e);

final Set<TaskId> corruptedTasks = new HashSet<>();
e.partitions().forEach(partition -> corruptedTasks.add(
changelogs.get(partition).stateManager.taskId()));
throw new TaskCorruptedException(corruptedTasks, e);
} catch (final KafkaException e) {
throw new StreamsException(
"Restore consumer get unexpected error polling records.", e);
}

for (final TopicPartition partition : polledRecords.partitions()) {
bufferChangelogRecords(restoringChangelogByPartition(partition), polledRecords.records(partition));
}
// JNH: Fix this?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not forget to remove those comments here and elsewhere.

for (final TopicPartition partition : polledRecords.partitions()) {
bufferChangelogRecords(restoringChangelogByPartition(partition),
polledRecords.records(partition));
}

for (final TopicPartition partition : restoringChangelogs) {
// even if some partition do not have any accumulated data, we still trigger
// restoring since some changelog may not need to restore any at all, and the
// restore to end check needs to be executed still.
// TODO: we always try to restore as a batch when some records are accumulated, which may result in
// small batches; this can be optimized in the future, e.g. wait longer for larger batches.
final TaskId taskId = changelogs.get(partition).stateManager.taskId();
try {
if (restoreChangelog(changelogs.get(partition))) {
tasks.get(taskId).clearTaskTimeout();
for (final TopicPartition partition : restoringChangelogs) {
// even if some partition do not have any accumulated data, we still trigger
// restoring since some changelog may not need to restore any at all, and the
// restore to end check needs to be executed still.
// TODO: we always try to restore as a batch when some records are accumulated, which may result in
// small batches; this can be optimized in the future, e.g. wait longer for larger batches.
final TaskId taskId = changelogs.get(partition).stateManager.taskId();
// JNH: Need to revisit
/* Skip over paused tasks
final Task task = tasks.get(taskId);
if (task != null) {
try {
if (restoreChangelog(changelogs.get(partition))) {
task.clearTaskTimeout();
}
} catch (final TimeoutException timeoutException) {
tasks.get(taskId).maybeInitTaskTimeoutOrThrow(
time.milliseconds(),
timeoutException
);
}
}
*/

// Read from all topics.
try {
if (restoreChangelog(changelogs.get(partition))) {
final Task task = tasks.get(taskId);
if (task != null) {
task.clearTaskTimeout();
}
}
} catch (final TimeoutException timeoutException) {
tasks.get(taskId).maybeInitTaskTimeoutOrThrow(
time.milliseconds(),
timeoutException
);
}
} catch (final TimeoutException timeoutException) {
tasks.get(taskId).maybeInitTaskTimeoutOrThrow(
time.milliseconds(),
timeoutException
);
}
}

maybeUpdateLimitOffsetsForStandbyChangelogs(tasks);
maybeUpdateLimitOffsetsForStandbyChangelogs(tasks);

maybeLogRestorationProgress();
maybeLogRestorationProgress();
}
}

}

private void maybeLogRestorationProgress() {
Expand Down Expand Up @@ -633,7 +676,11 @@ private Set<Task> getTasksFromPartitions(final Map<TaskId, Task> tasks,
}

private void clearTaskTimeout(final Set<Task> tasks) {
tasks.forEach(Task::clearTaskTimeout);
tasks.forEach(t -> {
if (t != null) {
t.clearTaskTimeout();
}
});
}

private void maybeInitTaskTimeoutOrThrow(final Set<Task> tasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,8 @@ private void initializeAndRestorePhase() {
}
// we can always let changelog reader try restoring in order to initialize the changelogs;
// if there's no active restoring or standby updating it would not try to fetch any data
changelogReader.restore(taskManager.tasks());
// After KAFKA-13873, we only restore the not paused tasks.
changelogReader.restore(taskManager.notPausedTasks());
jnh5y marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be verified in a unit test with a mock changelog reader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've covered this with a new test in StreamThreadTest.

log.debug("Idempotent restore call done. Thread state has not changed.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,27 @@ public class TaskExecutionMetadata {
private static final long CONSTANT_BACKOFF_MS = 5_000L;

private final boolean hasNamedTopologies;
private final Set<String> pausedTopologies;
// map of topologies experiencing errors/currently under backoff
private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap<>();

public TaskExecutionMetadata(final Set<String> allTopologyNames) {
public TaskExecutionMetadata(final Set<String> allTopologyNames, final Set<String> pausedTopologies) {
this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
this.pausedTopologies = pausedTopologies;
}

public boolean canProcessTask(final Task task, final long now) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am aware that there are now unit tests for this class, but there are enough different code paths that would justify to add unit tests for this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some unit tests for TaskExecutionMetadata here: 0a37879

final String topologyName = task.id().topologyName();
if (!hasNamedTopologies) {
// TODO implement error handling/backoff for non-named topologies (needs KIP)
return true;
return !pausedTopologies.contains(UNNAMED_TOPOLOGY);
} else {
final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
if (pausedTopologies.contains(topologyName)) {
return false;
} else {
final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ private void commitSuccessfullyProcessedTasks() {
int punctuate() {
int punctuated = 0;

for (final Task task : tasks.activeTasks()) {
for (final Task task : tasks.notPausedTasks()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here unit tests would be great and easily doable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna Ok, I tried to add some unit tests for TaskExecutor and I think I was needing to mock quite a few things. Does that seem right? Or is there an easier way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: Use notPausedActiveTasks.

try {
if (task.maybePunctuateStreamTime()) {
punctuated++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,11 @@ Map<TaskId, Task> tasks() {
return tasks.tasksPerId();
}

Map<TaskId, Task> notPausedTasks() {
return Collections.unmodifiableMap(tasks.notPausedTasks().stream()
.collect(Collectors.toMap(Task::id, v -> v)));
}

Map<TaskId, Task> activeTaskMap() {
return activeTaskStream().collect(Collectors.toMap(Task::id, t -> t));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
Expand Down Expand Up @@ -273,6 +274,13 @@ Collection<Task> allTasks() {
return readOnlyTasks;
}

Collection<Task> notPausedTasks() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests would be great!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests would still be great!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna Same deal here, I started to try and test Tasks instances and I started down a path of mocking quite a bit of the internal classes.

I think I'm either missing an easier approach or testing these functions directly may require a decent amount of effort.

return new ArrayList<>(readOnlyTasks)
.stream()
.filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
.collect(Collectors.toList());
}

Set<TaskId> activeTaskIds() {
return readOnlyActiveTaskIds;
}
Expand Down
Loading