Skip to content

Commit

Permalink
KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies (#1…
Browse files Browse the repository at this point in the history
…2161)

This PR adds the ability to pause and resume KafkaStreams instances as well as named/modular topologies (KIP-834).

Co-authored-by: Bruno Cadonna <[email protected]>

Reviewers: Bonnie Varghese <[email protected]>, Walker Carlson <[email protected]>, Guozhang Wang <[email protected]>, Bruno Cadonna <[email protected]>
  • Loading branch information
jnh5y authored Jun 16, 2022
1 parent a126e3a commit 7ed3748
Show file tree
Hide file tree
Showing 20 changed files with 1,125 additions and 49 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>

<suppress checks="JavaNCSS"
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|TaskManagerTest).java"/>
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/>

<suppress checks="NPathComplexity"
files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
Expand Down
48 changes: 47 additions & 1 deletion streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread.State;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
Expand All @@ -65,6 +64,7 @@
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.QueryConfig;
Expand Down Expand Up @@ -111,6 +111,7 @@
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;

/**
* A Kafka client that allows for performing continuous computation on input coming from one or more input topics and
Expand Down Expand Up @@ -1735,6 +1736,51 @@ public <T> T store(final StoreQueryParameters<T> storeQueryParameters) {
return queryableStoreProvider.getStore(storeQueryParameters);
}

/**
* This method pauses processing for the KafkaStreams instance.
*
* Paused topologies will only skip over a) processing, b) punctuation, and c) standby tasks.
* Notably, paused topologies will still poll Kafka consumers, and commit offsets.
* This method sets transient state that is not maintained or managed among instances.
* Note that pause() can be called before start() in order to start a KafkaStreams instance
* in a manner where the processing is paused as described, but the consumers are started up.
*/
public void pause() {
if (topologyMetadata.hasNamedTopologies()) {
for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) {
topologyMetadata.pauseTopology(namedTopology.name());
}
} else {
topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);
}
}

/**
* @return true when the KafkaStreams instance has its processing paused.
*/
public boolean isPaused() {
if (topologyMetadata.hasNamedTopologies()) {
return topologyMetadata.getAllNamedTopologies().stream()
.map(NamedTopology::name)
.allMatch(topologyMetadata::isPaused);
} else {
return topologyMetadata.isPaused(UNNAMED_TOPOLOGY);
}
}

/**
* This method resumes processing for the KafkaStreams instance.
*/
public void resume() {
if (topologyMetadata.hasNamedTopologies()) {
for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) {
topologyMetadata.resumeTopology(namedTopology.name());
}
} else {
topologyMetadata.resumeTopology(UNNAMED_TOPOLOGY);
}
}

/**
* handle each stream thread in a snapshot of threads.
* noted: iteration over SynchronizedList is not thread safe so it must be manually synchronized. However, we may
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ Map<TaskId, Set<TopicPartition>> uncreatedTasksForTopologies(final Set<String> c
}

// TODO: change return type to `StreamTask`
Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
public Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
// TODO: change type to `StreamTask`
final List<Task> createdTasks = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.slf4j.Logger;

import java.time.Duration;
Expand Down Expand Up @@ -429,6 +430,8 @@ public void restore(final Map<TaskId, Task> tasks) {
final ConsumerRecords<byte[], byte[]> polledRecords;

try {
pauseResumePartitions(tasks, restoringChangelogs);

// for restoring active and updating standby we may prefer different poll time
// in order to make sure we call the main consumer#poll in time.
// TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
Expand Down Expand Up @@ -463,7 +466,10 @@ public void restore(final Map<TaskId, Task> tasks) {
final TaskId taskId = changelogs.get(partition).stateManager.taskId();
try {
if (restoreChangelog(changelogs.get(partition))) {
tasks.get(taskId).clearTaskTimeout();
final Task task = tasks.get(taskId);
if (task != null) {
task.clearTaskTimeout();
}
}
} catch (final TimeoutException timeoutException) {
tasks.get(taskId).maybeInitTaskTimeoutOrThrow(
Expand All @@ -479,6 +485,47 @@ public void restore(final Map<TaskId, Task> tasks) {
}
}

private void pauseResumePartitions(final Map<TaskId, Task> tasks,
final Set<TopicPartition> restoringChangelogs) {
if (state == ChangelogReaderState.ACTIVE_RESTORING) {
updatePartitionsByType(tasks, restoringChangelogs, TaskType.ACTIVE);
}
if (state == ChangelogReaderState.STANDBY_UPDATING) {
updatePartitionsByType(tasks, restoringChangelogs, TaskType.STANDBY);
}
}

private void updatePartitionsByType(final Map<TaskId, Task> tasks,
final Set<TopicPartition> restoringChangelogs,
final TaskType taskType) {
final Collection<TopicPartition> toResume =
restoringChangelogs.stream().filter(t -> shouldResume(tasks, t, taskType)).collect(Collectors.toList());
final Collection<TopicPartition> toPause =
restoringChangelogs.stream().filter(t -> shouldPause(tasks, t, taskType)).collect(Collectors.toList());
restoreConsumer.resume(toResume);
restoreConsumer.pause(toPause);
}

private boolean shouldResume(final Map<TaskId, Task> tasks, final TopicPartition partition, final TaskType taskType) {
final ProcessorStateManager manager = changelogs.get(partition).stateManager;
final TaskId taskId = manager.taskId();
final Task task = tasks.get(taskId);
if (manager.taskType() == taskType) {
return task != null;
}
return false;
}

private boolean shouldPause(final Map<TaskId, Task> tasks, final TopicPartition partition, final TaskType taskType) {
final ProcessorStateManager manager = changelogs.get(partition).stateManager;
final TaskId taskId = manager.taskId();
final Task task = tasks.get(taskId);
if (manager.taskType() == taskType) {
return task == null;
}
return false;
}

private void maybeLogRestorationProgress() {
if (state == ChangelogReaderState.ACTIVE_RESTORING) {
if (time.milliseconds() - lastRestoreLogTime > RESTORE_LOG_INTERVAL_MS) {
Expand Down Expand Up @@ -633,7 +680,11 @@ private Set<Task> getTasksFromPartitions(final Map<TaskId, Task> tasks,
}

private void clearTaskTimeout(final Set<Task> tasks) {
tasks.forEach(Task::clearTaskTimeout);
tasks.forEach(t -> {
if (t != null) {
t.clearTaskTimeout();
}
});
}

private void maybeInitTaskTimeoutOrThrow(final Set<Task> tasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,8 @@ private void initializeAndRestorePhase() {
}
// we can always let changelog reader try restoring in order to initialize the changelogs;
// if there's no active restoring or standby updating it would not try to fetch any data
changelogReader.restore(taskManager.tasks());
// After KAFKA-13873, we only restore the not paused tasks.
changelogReader.restore(taskManager.notPausedTasks());
log.debug("Idempotent restore call done. Thread state has not changed.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,27 @@ public class TaskExecutionMetadata {
private static final long CONSTANT_BACKOFF_MS = 5_000L;

private final boolean hasNamedTopologies;
private final Set<String> pausedTopologies;
// map of topologies experiencing errors/currently under backoff
private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap<>();

public TaskExecutionMetadata(final Set<String> allTopologyNames) {
public TaskExecutionMetadata(final Set<String> allTopologyNames, final Set<String> pausedTopologies) {
this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
this.pausedTopologies = pausedTopologies;
}

public boolean canProcessTask(final Task task, final long now) {
final String topologyName = task.id().topologyName();
if (!hasNamedTopologies) {
// TODO implement error handling/backoff for non-named topologies (needs KIP)
return true;
return !pausedTopologies.contains(UNNAMED_TOPOLOGY);
} else {
final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
if (pausedTopologies.contains(topologyName)) {
return false;
} else {
final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ private void commitSuccessfullyProcessedTasks() {
int punctuate() {
int punctuated = 0;

for (final Task task : tasks.activeTasks()) {
for (final Task task : tasks.notPausedActiveTasks()) {
try {
if (task.maybePunctuateStreamTime()) {
punctuated++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public class TaskManager {
final LogContext logContext = new LogContext(logPrefix);
this.log = logContext.logger(getClass());

this.tasks = new Tasks(logContext, topologyMetadata, streamsMetrics, activeTaskCreator, standbyTaskCreator);
this.tasks = new Tasks(logContext, topologyMetadata, activeTaskCreator, standbyTaskCreator);
this.taskExecutor = new TaskExecutor(
tasks,
topologyMetadata.taskExecutionMetadata(),
Expand Down Expand Up @@ -1018,6 +1018,11 @@ Map<TaskId, Task> tasks() {
return tasks.tasksPerId();
}

Map<TaskId, Task> notPausedTasks() {
return Collections.unmodifiableMap(tasks.notPausedTasks().stream()
.collect(Collectors.toMap(Task::id, v -> v)));
}

Map<TaskId, Task> activeTaskMap() {
return activeTaskStream().collect(Collectors.toMap(Task::id, t -> t));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;

import java.util.HashSet;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
Expand All @@ -38,9 +37,8 @@
class Tasks {
private final Logger log;
private final TopologyMetadata topologyMetadata;
private final StreamsMetricsImpl streamsMetrics;

private final Map<TaskId, Task> allTasksPerId = new TreeMap<>();
private final Map<TaskId, Task> allTasksPerId = Collections.synchronizedSortedMap(new TreeMap<>());
private final Map<TaskId, Task> readOnlyTasksPerId = Collections.unmodifiableMap(allTasksPerId);
private final Collection<Task> readOnlyTasks = Collections.unmodifiableCollection(allTasksPerId.values());

Expand Down Expand Up @@ -68,14 +66,12 @@ class Tasks {

Tasks(final LogContext logContext,
final TopologyMetadata topologyMetadata,
final StreamsMetricsImpl streamsMetrics,
final ActiveTaskCreator activeTaskCreator,
final StandbyTaskCreator standbyTaskCreator) {

log = logContext.logger(getClass());

this.topologyMetadata = topologyMetadata;
this.streamsMetrics = streamsMetrics;
this.activeTaskCreator = activeTaskCreator;
this.standbyTaskCreator = standbyTaskCreator;
}
Expand Down Expand Up @@ -273,6 +269,20 @@ Collection<Task> allTasks() {
return readOnlyTasks;
}

Collection<Task> notPausedActiveTasks() {
return new ArrayList<>(readOnlyActiveTasks)
.stream()
.filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
.collect(Collectors.toList());
}

Collection<Task> notPausedTasks() {
return new ArrayList<>(readOnlyTasks)
.stream()
.filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
.collect(Collectors.toList());
}

Set<TaskId> activeTaskIds() {
return readOnlyActiveTaskIds;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class TopologyMetadata {
private final ProcessingMode processingMode;
private final TopologyVersion version;
private final TaskExecutionMetadata taskExecutionMetadata;
private final Set<String> pausedTopologies;

private final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability

Expand Down Expand Up @@ -104,14 +105,15 @@ public TopologyMetadata(final InternalTopologyBuilder builder,
this.processingMode = StreamsConfigUtils.processingMode(config);
this.config = config;
this.log = LoggerFactory.getLogger(getClass());
this.pausedTopologies = ConcurrentHashMap.newKeySet();

builders = new ConcurrentSkipListMap<>();
if (builder.hasNamedTopology()) {
builders.put(builder.topologyName(), builder);
} else {
builders.put(UNNAMED_TOPOLOGY, builder);
}
this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet());
this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet(), pausedTopologies);
}

public TopologyMetadata(final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders,
Expand All @@ -120,12 +122,13 @@ public TopologyMetadata(final ConcurrentNavigableMap<String, InternalTopologyBui
this.processingMode = StreamsConfigUtils.processingMode(config);
this.config = config;
this.log = LoggerFactory.getLogger(getClass());
this.pausedTopologies = ConcurrentHashMap.newKeySet();

this.builders = builders;
if (builders.isEmpty()) {
log.info("Created an empty KafkaStreams app with no topology");
}
this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet());
this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet(), pausedTopologies);
}

// Need to (re)set the log here to pick up the `processId` part of the clientId in the prefix
Expand Down Expand Up @@ -257,6 +260,35 @@ public void registerAndBuildNewTopology(final KafkaFutureImpl<Void> future, fina
}
}

/**
* Pauses a topology by name
* @param topologyName Name of the topology to pause
*/
public void pauseTopology(final String topologyName) {
pausedTopologies.add(topologyName);
}

/**
* Checks if a given topology is paused.
* @param topologyName If null, assume that we are checking the `UNNAMED_TOPOLOGY`.
* @return A boolean indicating if the topology is paused.
*/
public boolean isPaused(final String topologyName) {
if (topologyName == null) {
return pausedTopologies.contains(UNNAMED_TOPOLOGY);
} else {
return pausedTopologies.contains(topologyName);
}
}

/**
* Resumes a topology by name
* @param topologyName Name of the topology to resume
*/
public void resumeTopology(final String topologyName) {
pausedTopologies.remove(topologyName);
}

/**
* Removes the topology and registers a future that listens for all threads on the older version to see the update
*/
Expand Down
Loading

0 comments on commit 7ed3748

Please sign in to comment.