-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies #12161
Changes from 13 commits
207c4ef
b73708f
070b867
d019d7a
66fc07e
e0a80bb
85e506d
a620621
0a37879
34ec8ac
fb016a8
6d57d04
d007d15
d7be172
b2385ef
759d3a8
cfb18f5
a3bd8ae
cb6d6f1
722132f
c8f8c74
337c995
0cedb80
d30e3b3
148da23
e467852
1bcc5c9
08e332d
c495fb6
771c296
e076a07
f56dd99
6771854
12342d5
61a302b
06c98ea
1cdbc15
ce707de
769a53a
ea77133
f7799a5
e1c698b
c5d7abc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,7 +56,6 @@ | |
import org.apache.kafka.streams.processor.internals.ClientUtils; | ||
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; | ||
import org.apache.kafka.streams.processor.internals.GlobalStreamThread; | ||
import org.apache.kafka.streams.processor.internals.GlobalStreamThread.State; | ||
import org.apache.kafka.streams.processor.internals.StateDirectory; | ||
import org.apache.kafka.streams.processor.internals.StreamThread; | ||
import org.apache.kafka.streams.processor.internals.StreamsMetadataState; | ||
|
@@ -65,6 +64,7 @@ | |
import org.apache.kafka.streams.processor.internals.TopologyMetadata; | ||
import org.apache.kafka.streams.processor.internals.assignment.AssignorError; | ||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; | ||
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology; | ||
import org.apache.kafka.streams.query.FailureReason; | ||
import org.apache.kafka.streams.query.PositionBound; | ||
import org.apache.kafka.streams.query.QueryConfig; | ||
|
@@ -111,6 +111,7 @@ | |
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; | ||
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize; | ||
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets; | ||
import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY; | ||
|
||
/** | ||
* A Kafka client that allows for performing continuous computation on input coming from one or more input topics and | ||
|
@@ -1662,6 +1663,51 @@ public <T> T store(final StoreQueryParameters<T> storeQueryParameters) { | |
return queryableStoreProvider.getStore(storeQueryParameters); | ||
} | ||
|
||
/** | ||
* This method pauses processing for the KafkaStreams instance. | ||
* | ||
* Paused topologies will only skip over a) processing, b) punctuation, and c) standby tasks. | ||
* Notably, paused topologies will still poll Kafka consumers, and commit offsets. | ||
* This method sets transient state that is not maintained or managed among instances. | ||
* Note that pause() can be called before start() in order to start a KafkaStreams instance | ||
* in a manner where the processing is paused as described, but the consumers are started up. | ||
*/ | ||
public void pause() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please add unit tests for the new methods? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes! I added some in this commit e0a80bb. |
||
if (topologyMetadata.hasNamedTopologies()) { | ||
for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) { | ||
topologyMetadata.pauseTopology(namedTopology.name()); | ||
} | ||
} else { | ||
topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY); | ||
} | ||
} | ||
|
||
/** | ||
* @return true when the KafkaStreams instance has its processing paused. | ||
*/ | ||
public boolean isPaused() { | ||
if (topologyMetadata.hasNamedTopologies()) { | ||
return topologyMetadata.getAllNamedTopologies().stream() | ||
.map(NamedTopology::name) | ||
.allMatch(topologyMetadata::isPaused); | ||
} else { | ||
return topologyMetadata.isPaused(UNNAMED_TOPOLOGY); | ||
} | ||
} | ||
|
||
/** | ||
* This method resumes processing for the KafkaStreams instance. | ||
*/ | ||
public void resume() { | ||
if (topologyMetadata.hasNamedTopologies()) { | ||
for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) { | ||
topologyMetadata.resumeTopology(namedTopology.name()); | ||
} | ||
} else { | ||
topologyMetadata.resumeTopology(UNNAMED_TOPOLOGY); | ||
} | ||
} | ||
|
||
/** | ||
* handle each stream thread in a snapshot of threads. | ||
* noted: iteration over SynchronizedList is not thread safe so it must be manually synchronized. However, we may | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,7 @@ | |
import org.apache.kafka.streams.processor.StateRestoreListener; | ||
import org.apache.kafka.streams.processor.TaskId; | ||
import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata; | ||
import org.apache.kafka.streams.processor.internals.Task.TaskType; | ||
import org.slf4j.Logger; | ||
|
||
import java.time.Duration; | ||
|
@@ -429,6 +430,8 @@ public void restore(final Map<TaskId, Task> tasks) { | |
final ConsumerRecords<byte[], byte[]> polledRecords; | ||
|
||
try { | ||
updateStandbyPartitions(tasks, restoringChangelogs); | ||
|
||
// for restoring active and updating standby we may prefer different poll time | ||
// in order to make sure we call the main consumer#poll in time. | ||
// TODO: once we move ChangelogReader to a separate thread this may no longer be a concern | ||
|
@@ -463,7 +466,10 @@ public void restore(final Map<TaskId, Task> tasks) { | |
final TaskId taskId = changelogs.get(partition).stateManager.taskId(); | ||
try { | ||
if (restoreChangelog(changelogs.get(partition))) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you sure this method call avoids restoring state stores of paused tasks? Wouldn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll look to add a unit test to verify things. This part may get subtle... I tried to update things and the ITs showing that one can start topologies in paused state broke. |
||
tasks.get(taskId).clearTaskTimeout(); | ||
final Task task = tasks.get(taskId); | ||
if (task != null) { | ||
jnh5y marked this conversation as resolved.
Show resolved
Hide resolved
|
||
task.clearTaskTimeout(); | ||
} | ||
} | ||
} catch (final TimeoutException timeoutException) { | ||
tasks.get(taskId).maybeInitTaskTimeoutOrThrow( | ||
|
@@ -479,6 +485,26 @@ public void restore(final Map<TaskId, Task> tasks) { | |
} | ||
} | ||
|
||
private void updateStandbyPartitions(final Map<TaskId, Task> tasks, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you forgot to also pause the active tasks in restoration. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm wondering if we can make this a more general rule, like:
I know for now it does not matter since we always pause all tasks with the current APIs, but this is extensible for finer-grained controls in the future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After your changes that include the restoration of active tasks the name of the method does not reflect what the method does anymore. Please rename the method to something more appropriate. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do not forget to rename this method to something more meaningful. |
||
final Set<TopicPartition> restoringChangelogs) { | ||
// For standby tasks in the `tasks` map, we make sure that the `restoreConsumer`'s state | ||
// is correct. | ||
if (state == ChangelogReaderState.STANDBY_UPDATING) { | ||
for (final TopicPartition partition : restoringChangelogs) { | ||
final ProcessorStateManager manager = changelogs.get(partition).stateManager; | ||
final TaskId taskId = manager.taskId(); | ||
final Task task = tasks.get(taskId); | ||
if (manager.taskType() == TaskType.STANDBY) { | ||
if (task != null) { | ||
restoreConsumer.resume(Collections.singleton(partition)); | ||
} else { | ||
restoreConsumer.pause(Collections.singleton(partition)); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think a better way would be to collect the partitions to resume and pause and then call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in cb6d6f1. |
||
} | ||
} | ||
} | ||
} | ||
|
||
private void maybeLogRestorationProgress() { | ||
if (state == ChangelogReaderState.ACTIVE_RESTORING) { | ||
if (time.milliseconds() - lastRestoreLogTime > RESTORE_LOG_INTERVAL_MS) { | ||
|
@@ -633,7 +659,11 @@ private Set<Task> getTasksFromPartitions(final Map<TaskId, Task> tasks, | |
} | ||
|
||
private void clearTaskTimeout(final Set<Task> tasks) { | ||
tasks.forEach(Task::clearTaskTimeout); | ||
tasks.forEach(t -> { | ||
if (t != null) { | ||
t.clearTaskTimeout(); | ||
} | ||
}); | ||
} | ||
|
||
private void maybeInitTaskTimeoutOrThrow(final Set<Task> tasks, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -897,7 +897,8 @@ private void initializeAndRestorePhase() { | |
} | ||
// we can always let changelog reader try restoring in order to initialize the changelogs; | ||
// if there's no active restoring or standby updating it would not try to fetch any data | ||
changelogReader.restore(taskManager.tasks()); | ||
// After KAFKA-13873, we only restore the not paused tasks. | ||
changelogReader.restore(taskManager.notPausedTasks()); | ||
jnh5y marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should also be verified in a unit test with a mock changelog reader. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I've covered this with a new test in StreamThreadTest. |
||
log.debug("Idempotent restore call done. Thread state has not changed."); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,21 +35,27 @@ public class TaskExecutionMetadata { | |
private static final long CONSTANT_BACKOFF_MS = 5_000L; | ||
|
||
private final boolean hasNamedTopologies; | ||
private final Set<String> pausedTopologies; | ||
// map of topologies experiencing errors/currently under backoff | ||
private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap<>(); | ||
|
||
public TaskExecutionMetadata(final Set<String> allTopologyNames) { | ||
public TaskExecutionMetadata(final Set<String> allTopologyNames, final Set<String> pausedTopologies) { | ||
this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY)); | ||
this.pausedTopologies = pausedTopologies; | ||
} | ||
|
||
public boolean canProcessTask(final Task task, final long now) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am aware that there are now unit tests for this class, but there are enough different code paths that would justify to add unit tests for this method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added some unit tests for |
||
final String topologyName = task.id().topologyName(); | ||
if (!hasNamedTopologies) { | ||
// TODO implement error handling/backoff for non-named topologies (needs KIP) | ||
return true; | ||
return !pausedTopologies.contains(UNNAMED_TOPOLOGY); | ||
} else { | ||
final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName); | ||
return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now)); | ||
if (pausedTopologies.contains(topologyName)) { | ||
return false; | ||
} else { | ||
final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName); | ||
return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now)); | ||
} | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -275,7 +275,7 @@ private void commitSuccessfullyProcessedTasks() { | |
int punctuate() { | ||
int punctuated = 0; | ||
|
||
for (final Task task : tasks.activeTasks()) { | ||
for (final Task task : tasks.notPausedTasks()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also here unit tests would be great and easily doable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cadonna Ok, I tried to add some unit tests for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to self: Use |
||
try { | ||
if (task.maybePunctuateStreamTime()) { | ||
punctuated++; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
*/ | ||
package org.apache.kafka.streams.processor.internals; | ||
|
||
import java.util.ArrayList; | ||
import org.apache.kafka.clients.consumer.Consumer; | ||
import org.apache.kafka.common.Metric; | ||
import org.apache.kafka.common.MetricName; | ||
|
@@ -273,6 +274,13 @@ Collection<Task> allTasks() { | |
return readOnlyTasks; | ||
} | ||
|
||
Collection<Task> notPausedTasks() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unit tests would be great! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unit tests would still be great! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cadonna Same deal here, I started to try and test I think I'm either missing an easier approach or testing these functions directly may require a decent amount of effort. |
||
return new ArrayList<>(readOnlyTasks) | ||
.stream() | ||
.filter(t -> !topologyMetadata.isPaused(t.id().topologyName())) | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
Set<TaskId> activeTaskIds() { | ||
return readOnlyActiveTaskIds; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,6 +73,7 @@ public class TopologyMetadata { | |
private final ProcessingMode processingMode; | ||
private final TopologyVersion version; | ||
private final TaskExecutionMetadata taskExecutionMetadata; | ||
private final Set<String> pausedTopologies; | ||
|
||
private final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability | ||
|
||
|
@@ -104,14 +105,15 @@ public TopologyMetadata(final InternalTopologyBuilder builder, | |
this.processingMode = StreamsConfigUtils.processingMode(config); | ||
this.config = config; | ||
this.log = LoggerFactory.getLogger(getClass()); | ||
this.pausedTopologies = ConcurrentHashMap.newKeySet(); | ||
|
||
builders = new ConcurrentSkipListMap<>(); | ||
if (builder.hasNamedTopology()) { | ||
builders.put(builder.topologyName(), builder); | ||
} else { | ||
builders.put(UNNAMED_TOPOLOGY, builder); | ||
} | ||
this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet()); | ||
this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet(), pausedTopologies); | ||
} | ||
|
||
public TopologyMetadata(final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders, | ||
|
@@ -120,12 +122,13 @@ public TopologyMetadata(final ConcurrentNavigableMap<String, InternalTopologyBui | |
this.processingMode = StreamsConfigUtils.processingMode(config); | ||
this.config = config; | ||
this.log = LoggerFactory.getLogger(getClass()); | ||
this.pausedTopologies = ConcurrentHashMap.newKeySet(); | ||
|
||
this.builders = builders; | ||
if (builders.isEmpty()) { | ||
log.info("Created an empty KafkaStreams app with no topology"); | ||
} | ||
this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet()); | ||
this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet(), pausedTopologies); | ||
} | ||
|
||
// Need to (re)set the log here to pick up the `processId` part of the clientId in the prefix | ||
|
@@ -257,6 +260,35 @@ public void registerAndBuildNewTopology(final KafkaFutureImpl<Void> future, fina | |
} | ||
} | ||
|
||
/** | ||
* Pauses a topology by name | ||
* @param topologyName Name of the topology to pause | ||
*/ | ||
public void pauseTopology(final String topologyName) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These methods could also be unit tested really easily. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added tests here: 34ec8ac |
||
pausedTopologies.add(topologyName); | ||
} | ||
|
||
/** | ||
* Checks if a given topology is paused. | ||
* @param topologyName If null, assume that we are checking the `UNNAMED_TOPOLOGY`. | ||
* @return A boolean indicating if the topology is paused. | ||
*/ | ||
public boolean isPaused(final String topologyName) { | ||
if (topologyName == null) { | ||
return pausedTopologies.contains(UNNAMED_TOPOLOGY); | ||
} else { | ||
return pausedTopologies.contains(topologyName); | ||
} | ||
} | ||
|
||
/** | ||
* Resumes a topology by name | ||
* @param topologyName Name of the topology to resume | ||
*/ | ||
public void resumeTopology(final String topologyName) { | ||
pausedTopologies.remove(topologyName); | ||
} | ||
|
||
/** | ||
* Removes the topology and registers a future that listens for all threads on the older version to see the update | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I think we should try to avoid adding suppressions. But I also see that
StreamThreadTest
would need quite some love at the moment which is not the intent of this PR.