Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies #12161

Merged
merged 43 commits into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
207c4ef
KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies
jnh5y May 13, 2022
b73708f
Cleaning up reviewer comments.
jnh5y May 18, 2022
070b867
Some work on fixing up the StoreChangelogReader.
jnh5y May 19, 2022
d019d7a
Updates to the IT.
jnh5y May 23, 2022
66fc07e
IntelliJ refactoring to fix complexity.
jnh5y May 23, 2022
e0a80bb
Adding unit tests for KafkaStreams.
jnh5y May 23, 2022
85e506d
Adding unit test showing that only non-paused, standby tasks are rest…
jnh5y May 23, 2022
a620621
Found the suppressions for JavaNCSS.
jnh5y May 23, 2022
0a37879
Added TaskExecutionMetadataTest.
jnh5y May 23, 2022
34ec8ac
Adding TopologyMetadataTest.
jnh5y May 23, 2022
fb016a8
Fixes to make the StoreChangelogReader work.
jnh5y May 24, 2022
6d57d04
Responding to reviewer comments.
jnh5y May 24, 2022
d007d15
Minor refactoring / renaming.
jnh5y May 24, 2022
d7be172
Update streams/src/test/java/org/apache/kafka/streams/integration/Pau…
jnh5y Jun 1, 2022
b2385ef
Merge remote-tracking branch 'origin/trunk' into kafka-13873
jnh5y Jun 1, 2022
759d3a8
Using assertThrows.
jnh5y Jun 1, 2022
cfb18f5
Updating TaskExecutorMetadataTest.
jnh5y Jun 1, 2022
a3bd8ae
Extracted some code in StreamThreadTest.java.
jnh5y Jun 1, 2022
cb6d6f1
Refactoring StoreChangelogReader.
jnh5y Jun 1, 2022
722132f
Fixing a checkstyle issue.
jnh5y Jun 1, 2022
c8f8c74
commit 5d25096520503a733b5e520c022d25aaf9d939a7
jnh5y Jun 2, 2022
337c995
Various clean up.
jnh5y Jun 2, 2022
0cedb80
Small clean up.
jnh5y Jun 2, 2022
d30e3b3
Adding PR IT and fixing an import.
jnh5y Jun 2, 2022
148da23
Test which shows that active tasks still restore.
jnh5y Jun 2, 2022
e467852
Pausing active task restoration.
jnh5y Jun 3, 2022
1bcc5c9
Updated PR IT with multiple client example. This is hitting SESSION_…
jnh5y Jun 3, 2022
08e332d
Updating the PauseResume IT so that the multiple instance test runs f…
jnh5y Jun 3, 2022
c495fb6
Fixing StoreChangelogReaderTest.
jnh5y Jun 3, 2022
771c296
Removing most comments.
jnh5y Jun 3, 2022
e076a07
Added a little getTopicSize function.
jnh5y Jun 3, 2022
f56dd99
Verifying Streams pausing by watching the poll count.
jnh5y Jun 3, 2022
6771854
Update streams/src/test/java/org/apache/kafka/streams/processor/inter…
jnh5y Jun 9, 2022
12342d5
Update streams/src/test/java/org/apache/kafka/streams/processor/inter…
jnh5y Jun 9, 2022
61a302b
Update streams/src/test/java/org/apache/kafka/streams/processor/inter…
jnh5y Jun 9, 2022
06c98ea
Updating import order and sorting out other issues.
jnh5y Jun 9, 2022
1cdbc15
Responding to more reviewer comments.
jnh5y Jun 9, 2022
ce707de
Small test changes before adding new test.
jnh5y Jun 13, 2022
769a53a
Partial unit test?
jnh5y Jun 14, 2022
ea77133
Added new unit test.
jnh5y Jun 15, 2022
f7799a5
Removing assertNoLag.
jnh5y Jun 15, 2022
e1c698b
Responding to reviewer comments.
jnh5y Jun 15, 2022
c5d7abc
Responding to reviewer comments.
jnh5y Jun 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>

<suppress checks="JavaNCSS"
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|TaskManagerTest).java"/>
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I think we should try to avoid adding suppressions. But I also see that StreamThreadTest would need quite some love at the moment which is not the intent of this PR.


<suppress checks="NPathComplexity"
files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
Expand Down
48 changes: 47 additions & 1 deletion streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread.State;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
Expand All @@ -65,6 +64,7 @@
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.QueryConfig;
Expand Down Expand Up @@ -111,6 +111,7 @@
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;

/**
* A Kafka client that allows for performing continuous computation on input coming from one or more input topics and
Expand Down Expand Up @@ -1735,6 +1736,51 @@ public <T> T store(final StoreQueryParameters<T> storeQueryParameters) {
return queryableStoreProvider.getStore(storeQueryParameters);
}

/**
* This method pauses processing for the KafkaStreams instance.
*
* Paused topologies will only skip over a) processing, b) punctuation, and c) standby tasks.
* Notably, paused topologies will still poll Kafka consumers, and commit offsets.
* This method sets transient state that is not maintained or managed among instances.
* Note that pause() can be called before start() in order to start a KafkaStreams instance
* in a manner where the processing is paused as described, but the consumers are started up.
*/
public void pause() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add unit tests for the new methods?

Copy link
Contributor Author

@jnh5y jnh5y May 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! I added some in this commit e0a80bb.

if (topologyMetadata.hasNamedTopologies()) {
for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) {
topologyMetadata.pauseTopology(namedTopology.name());
}
} else {
topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);
}
}

/**
* @return true when the KafkaStreams instance has its processing paused.
*/
public boolean isPaused() {
if (topologyMetadata.hasNamedTopologies()) {
return topologyMetadata.getAllNamedTopologies().stream()
.map(NamedTopology::name)
.allMatch(topologyMetadata::isPaused);
} else {
return topologyMetadata.isPaused(UNNAMED_TOPOLOGY);
}
}

/**
* This method resumes processing for the KafkaStreams instance.
*/
public void resume() {
if (topologyMetadata.hasNamedTopologies()) {
for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) {
topologyMetadata.resumeTopology(namedTopology.name());
}
} else {
topologyMetadata.resumeTopology(UNNAMED_TOPOLOGY);
}
}

/**
* handle each stream thread in a snapshot of threads.
* noted: iteration over SynchronizedList is not thread safe so it must be manually synchronized. However, we may
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ Map<TaskId, Set<TopicPartition>> uncreatedTasksForTopologies(final Set<String> c
}

// TODO: change return type to `StreamTask`
Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
public Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
// TODO: change type to `StreamTask`
final List<Task> createdTasks = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.slf4j.Logger;

import java.time.Duration;
Expand Down Expand Up @@ -429,6 +430,8 @@ public void restore(final Map<TaskId, Task> tasks) {
final ConsumerRecords<byte[], byte[]> polledRecords;

try {
updateStandbyPartitions(tasks, restoringChangelogs);

// for restoring active and updating standby we may prefer different poll time
// in order to make sure we call the main consumer#poll in time.
// TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
Expand Down Expand Up @@ -463,7 +466,10 @@ public void restore(final Map<TaskId, Task> tasks) {
final TaskId taskId = changelogs.get(partition).stateManager.taskId();
try {
if (restoreChangelog(changelogs.get(partition))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this method call avoids restoring state stores of paused tasks? Wouldn't restoringChangelogs() still return all changelog partitions that are in restoration and not just those that are not paused?
Maybe it is possible to add some verifications to the unit tests to ensure that only non-paused tasks are restored.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look to add a unit test to verify things.

This part may get subtle... I tried to update things and the ITs showing that one can start topologies in paused state broke.

tasks.get(taskId).clearTaskTimeout();
final Task task = tasks.get(taskId);
if (task != null) {
jnh5y marked this conversation as resolved.
Show resolved Hide resolved
task.clearTaskTimeout();
}
}
} catch (final TimeoutException timeoutException) {
tasks.get(taskId).maybeInitTaskTimeoutOrThrow(
Expand All @@ -479,6 +485,47 @@ public void restore(final Map<TaskId, Task> tasks) {
}
}

private void updateStandbyPartitions(final Map<TaskId, Task> tasks,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you forgot to also pause the active tasks in restoration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can make this a more general rule, like:

  1. When reader is in Restore_Active state i.e. there are at least one (for sake of simplicity just say we happen to have exactly one) active task which needs restoration, say taskA; then taskA was paused, we should be able to transit to Update_Standby.
  2. When reader is in Update_Standby state, and there is one active task say taskA resumed; we should be able to transit to Restore_Active.

I know for now it does not matter since we always pause all tasks with the current APIs, but this is extensible for finer-grained controls in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After your changes that include the restoration of active tasks the name of the method does not reflect what the method does anymore. Please rename the method to something more appropriate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not forget to rename this method to something more meaningful.
Proposal: pauseResumePartitions()

final Set<TopicPartition> restoringChangelogs) {
if (state == ChangelogReaderState.ACTIVE_RESTORING) {
updatePartitionsByType(tasks, restoringChangelogs, TaskType.ACTIVE);
}
if (state == ChangelogReaderState.STANDBY_UPDATING) {
updatePartitionsByType(tasks, restoringChangelogs, TaskType.STANDBY);
}
}

private void updatePartitionsByType(final Map<TaskId, Task> tasks,
final Set<TopicPartition> restoringChangelogs,
final TaskType taskType) {
final Collection<TopicPartition> toResume =
restoringChangelogs.stream().filter(t -> shouldResume(tasks, t, taskType)).collect(Collectors.toList());
final Collection<TopicPartition> toPause =
restoringChangelogs.stream().filter(t -> shouldPause(tasks, t, taskType)).collect(Collectors.toList());
restoreConsumer.resume(toResume);
restoreConsumer.pause(toPause);
}

private boolean shouldResume(final Map<TaskId, Task> tasks, final TopicPartition partition, final TaskType taskType) {
final ProcessorStateManager manager = changelogs.get(partition).stateManager;
final TaskId taskId = manager.taskId();
final Task task = tasks.get(taskId);
if (manager.taskType() == taskType) {
return task != null;
}
return false;
}

private boolean shouldPause(final Map<TaskId, Task> tasks, final TopicPartition partition, final TaskType taskType) {
final ProcessorStateManager manager = changelogs.get(partition).stateManager;
final TaskId taskId = manager.taskId();
final Task task = tasks.get(taskId);
if (manager.taskType() == taskType) {
return task == null;
}
return false;
}
Comment on lines +520 to +527
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not check the task type earlier? If the task type does not match, you do not need to do anything else.

Suggested change
final ProcessorStateManager manager = changelogs.get(partition).stateManager;
final TaskId taskId = manager.taskId();
final Task task = tasks.get(taskId);
if (manager.taskType() == taskType) {
return task == null;
}
return false;
}
final ProcessorStateManager manager = changelogs.get(partition).stateManager;
if (manager.taskType() == taskType) {
final TaskId taskId = manager.taskId();
final Task task = tasks.get(taskId);
return task == null;
}
return false;
}


private void maybeLogRestorationProgress() {
if (state == ChangelogReaderState.ACTIVE_RESTORING) {
if (time.milliseconds() - lastRestoreLogTime > RESTORE_LOG_INTERVAL_MS) {
Expand Down Expand Up @@ -633,7 +680,11 @@ private Set<Task> getTasksFromPartitions(final Map<TaskId, Task> tasks,
}

private void clearTaskTimeout(final Set<Task> tasks) {
tasks.forEach(Task::clearTaskTimeout);
tasks.forEach(t -> {
if (t != null) {
t.clearTaskTimeout();
}
});
}

private void maybeInitTaskTimeoutOrThrow(final Set<Task> tasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,8 @@ private void initializeAndRestorePhase() {
}
// we can always let changelog reader try restoring in order to initialize the changelogs;
// if there's no active restoring or standby updating it would not try to fetch any data
changelogReader.restore(taskManager.tasks());
// After KAFKA-13873, we only restore the not paused tasks.
changelogReader.restore(taskManager.notPausedTasks());
jnh5y marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be verified in a unit test with a mock changelog reader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've covered this with a new test in StreamThreadTest.

log.debug("Idempotent restore call done. Thread state has not changed.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,27 @@ public class TaskExecutionMetadata {
private static final long CONSTANT_BACKOFF_MS = 5_000L;

private final boolean hasNamedTopologies;
private final Set<String> pausedTopologies;
// map of topologies experiencing errors/currently under backoff
private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap<>();

public TaskExecutionMetadata(final Set<String> allTopologyNames) {
public TaskExecutionMetadata(final Set<String> allTopologyNames, final Set<String> pausedTopologies) {
this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
this.pausedTopologies = pausedTopologies;
}

public boolean canProcessTask(final Task task, final long now) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am aware that there are now unit tests for this class, but there are enough different code paths that would justify to add unit tests for this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some unit tests for TaskExecutionMetadata here: 0a37879

final String topologyName = task.id().topologyName();
if (!hasNamedTopologies) {
// TODO implement error handling/backoff for non-named topologies (needs KIP)
return true;
return !pausedTopologies.contains(UNNAMED_TOPOLOGY);
} else {
final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
if (pausedTopologies.contains(topologyName)) {
return false;
} else {
final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ private void commitSuccessfullyProcessedTasks() {
int punctuate() {
int punctuated = 0;

for (final Task task : tasks.activeTasks()) {
for (final Task task : tasks.notPausedActiveTasks()) {
try {
if (task.maybePunctuateStreamTime()) {
punctuated++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public class TaskManager {
final LogContext logContext = new LogContext(logPrefix);
this.log = logContext.logger(getClass());

this.tasks = new Tasks(logContext, topologyMetadata, streamsMetrics, activeTaskCreator, standbyTaskCreator);
this.tasks = new Tasks(logContext, topologyMetadata, activeTaskCreator, standbyTaskCreator);
this.taskExecutor = new TaskExecutor(
tasks,
topologyMetadata.taskExecutionMetadata(),
Expand Down Expand Up @@ -1018,6 +1018,11 @@ Map<TaskId, Task> tasks() {
return tasks.tasksPerId();
}

Map<TaskId, Task> notPausedTasks() {
return Collections.unmodifiableMap(tasks.notPausedTasks().stream()
.collect(Collectors.toMap(Task::id, v -> v)));
}

Map<TaskId, Task> activeTaskMap() {
return activeTaskStream().collect(Collectors.toMap(Task::id, t -> t));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,28 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;

import java.util.HashSet;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;

class Tasks {
private final Logger log;
private final TopologyMetadata topologyMetadata;
private final StreamsMetricsImpl streamsMetrics;

private final Map<TaskId, Task> allTasksPerId = new TreeMap<>();
private final Map<TaskId, Task> allTasksPerId = Collections.synchronizedSortedMap(new TreeMap<>());
private final Map<TaskId, Task> readOnlyTasksPerId = Collections.unmodifiableMap(allTasksPerId);
private final Collection<Task> readOnlyTasks = Collections.unmodifiableCollection(allTasksPerId.values());

Expand Down Expand Up @@ -68,14 +65,12 @@ class Tasks {

Tasks(final LogContext logContext,
final TopologyMetadata topologyMetadata,
final StreamsMetricsImpl streamsMetrics,
final ActiveTaskCreator activeTaskCreator,
final StandbyTaskCreator standbyTaskCreator) {

log = logContext.logger(getClass());

this.topologyMetadata = topologyMetadata;
this.streamsMetrics = streamsMetrics;
this.activeTaskCreator = activeTaskCreator;
this.standbyTaskCreator = standbyTaskCreator;
}
Expand Down Expand Up @@ -273,6 +268,20 @@ Collection<Task> allTasks() {
return readOnlyTasks;
}

Collection<Task> notPausedActiveTasks() {
return new ArrayList<>(readOnlyActiveTasks)
.stream()
.filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
.collect(Collectors.toList());
}

Collection<Task> notPausedTasks() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests would be great!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests would still be great!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna Same deal here, I started to try and test Tasks instances and I started down a path of mocking quite a bit of the internal classes.

I think I'm either missing an easier approach or testing these functions directly may require a decent amount of effort.

return new ArrayList<>(readOnlyTasks)
.stream()
.filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
.collect(Collectors.toList());
}

Set<TaskId> activeTaskIds() {
return readOnlyActiveTaskIds;
}
Expand Down
Loading