Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies #12161

Merged
merged 43 commits into from
Jun 16, 2022

Conversation

jnh5y
Copy link
Contributor

@jnh5y jnh5y commented May 13, 2022

This PR adds the ability to pause and resume KafkaStreams instances as well as named/modular topologies.

Added an integration test to show how pausing and resuming works.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mjsax mjsax added streams kip Requires or implements a KIP labels May 13, 2022
Copy link
Contributor

@wcarlson5 wcarlson5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking good for the most part just had a couple questions.

I gave the tests a quick pass. I realize you are still working on them so feel free to ignore that part until you are ready

@jnh5y jnh5y marked this pull request as ready for review May 16, 2022 15:06
@jnh5y jnh5y changed the title DRAFT: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies May 16, 2022
This PR adds the ability to pause and resume KafkaStreams instances as well as named/modular topologies.
Comment on lines +213 to +214
kafkaStreams.pause();
kafkaStreams.start();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the names here are a little confusing. With context, it makes sense that this is how to start a KafkaStreams instance with processing paused.

If anyone has a naming suggestion here, I'm very open to it!

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @jnh5y !

I would like to see a bit more unit testing.

I was wondering whether the pausing of tasks in restoration really works. See my comment in the StoreChangelogReader.

@@ -897,7 +897,8 @@ private void initializeAndRestorePhase() {
}
// we can always let changelog reader try restoring in order to initialize the changelogs;
// if there's no active restoring or standby updating it would not try to fetch any data
changelogReader.restore(taskManager.tasks());
// After KAFKA-13873, we only restore the not paused tasks.
changelogReader.restore(taskManager.notPausedTasks());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be verified in a unit test with a mock changelog reader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've covered this with a new test in StreamThreadTest.

this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
this.pausedTopologies = pausedTopologies;
}

public boolean canProcessTask(final Task task, final long now) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am aware that there are now unit tests for this class, but there are enough different code paths that would justify to add unit tests for this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some unit tests for TaskExecutionMetadata here: 0a37879

@@ -275,7 +275,7 @@ private void commitSuccessfullyProcessedTasks() {
int punctuate() {
int punctuated = 0;

for (final Task task : tasks.activeTasks()) {
for (final Task task : tasks.notPausedTasks()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here unit tests would be great and easily doable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna Ok, I tried to add some unit tests for TaskExecutor and I think I was needing to mock quite a few things. Does that seem right? Or is there an easier way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: Use notPausedActiveTasks.

@@ -273,6 +274,12 @@ Collection<Task> allTasks() {
return readOnlyTasks;
}

Collection<Task> notPausedTasks() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests would be great!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests would still be great!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna Same deal here, I started to try and test Tasks instances and I started down a path of mocking quite a bit of the internal classes.

I think I'm either missing an easier approach or testing these functions directly may require a decent amount of effort.

* Pauses a topology by name
* @param topologyName Name of the topology to pause
*/
public void pauseTopology(final String topologyName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods could also be unit tested really easily.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests here: 34ec8ac

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, @jnh5y!

I did a quick pass. Here my feedback.

Comment on lines 838 to 843
try {
streams.cleanUp();
fail("Should have thrown IllegalStateException");
} catch (final IllegalStateException expected) {
assertEquals("Cannot clean up while running.", expected.getMessage());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be simpler to use assertThrows()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll admit to it; I just copy-pasted the test directly above:

try {
streams.cleanUp();
fail("Should have thrown IllegalStateException");
} catch (final IllegalStateException expected) {
assertEquals("Cannot clean up while running.", expected.getMessage());
}

Is following the existing codebase ok or shall I spend some time to clean it up?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use assertThrows() in the new test. The existing codebase also uses it in some tests like shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState.
I would open a new PR to refactor the tests that use try-fail-catch. That is optional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 759d3a8

thread.runOnce();

assertEquals(10L, store1.approximateNumEntries());
assertEquals(4L, store2.approximateNumEntries());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 4 and not 5?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Short answer: I'm not certain. I copied the test here: https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L1684-L1762 since it is dealing with updating standby tasks.

My guess is that the method is actually returning an approximated number of entries as suggested. I tried to verify that, but I got a little lost in the unit test's complexity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see! Could you please extract common code (especially the setup code) of those two tests into a method.
We could consider to just check for greater 0.
Additionally to standbys, we should also test if active tasks in restoration are paused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the refactoring, I've taken a pass at it here: a3bd8ae.

I tend to prefer to add new things consistently with existing code, so I'm hesitating to change the check. I'm fine either way.

I'll add something for active tasks if that's the way we go.

// One second after the error, task1 cannot process, task2 can.
Assert.assertFalse(metadata.canProcessTask(mockTask1, 1000));
Assert.assertTrue(metadata.canProcessTask(mockTask2, 1000));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a case where the time since the error is exactly the backoff time (i.e. 5000)?
I would prefer to put the times 1000, 5000, and 10000 into variables with meaningful names instead of adding comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I took a crack at updating the test; lemme know if you'd prefer different names.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please use CONSTANT_BACKOFF_MS - 1 for the verifications on line 111 and 112? I think it makes the intent clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cfb18f5

Comment on lines 432 to 434
// for restoring active and updating standby we may prefer different poll time
// in order to make sure we call the main consumer#poll in time.
// TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these comments should be moved before the call to restoreConsumer.poll(). BTW, this again confirms that inline comments are a poor way to document code, most of the times.

for (final TopicPartition partition : polledRecords.partitions()) {
bufferChangelogRecords(restoringChangelogByPartition(partition), polledRecords.records(partition));
}
// JNH: Fix this?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not forget to remove those comments here and elsewhere.

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jnh5y Thank you for the updates!

Here my feedback:

// One second after the error, task1 cannot process, task2 can.
Assert.assertFalse(metadata.canProcessTask(mockTask1, 1000));
Assert.assertTrue(metadata.canProcessTask(mockTask2, 1000));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please use CONSTANT_BACKOFF_MS - 1 for the verifications on line 111 and 112? I think it makes the intent clearer.

Comment on lines 838 to 843
try {
streams.cleanUp();
fail("Should have thrown IllegalStateException");
} catch (final IllegalStateException expected) {
assertEquals("Cannot clean up while running.", expected.getMessage());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use assertThrows() in the new test. The existing codebase also uses it in some tests like shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState.
I would open a new PR to refactor the tests that use try-fail-catch. That is optional.

@@ -273,6 +274,12 @@ Collection<Task> allTasks() {
return readOnlyTasks;
}

Collection<Task> notPausedTasks() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests would still be great!

thread.runOnce();

assertEquals(10L, store1.approximateNumEntries());
assertEquals(4L, store2.approximateNumEntries());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see! Could you please extract common code (especially the setup code) of those two tests into a method.
We could consider to just check for greater 0.
Additionally to standbys, we should also test if active tasks in restoration are paused.

Comment on lines 194 to 197
// Verify no output somehow?
// Is there a better way to show this?
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 0),
CoreMatchers.equalTo(Collections.emptyList()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you write in the comments, this is not straightforward. I think it is not good to just wait for no outputs for two reasons:

  • it increases the duration of the test unnecessarily
  • it opens the door for flakiness because we only rely on a duration for verification

I have the following idea. You create two identical KafkaStreams clients (be careful to specify distinct state directories for each). Both clients read from the same input topics but write to distinct output topics. You do the initial setup verifications for both. Then you pause one of the clients and wait until the other client produced a given amount of records to the output topic. When that happened, you verify if the paused client has not written anything to its output topic. The assumption is that both clients produce with the same rate to their output topics if they are not paused. If one of the two is paused and does not produce any records to the output topic in the same time the other client produces a certain number of records we can assume the pausing works.


// Verify that consumers read new data -- AKA, there is no lag.
final Map<String, Map<Integer, LagInfo>> lagMap = kafkaStreams.allLocalStorePartitionLags();
assertNoLag(lagMap);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear, this works as long as the input buffers for polled records are not full. I think it is good enough for this test, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ayup, that's what I understand and it is showing that the active task consumers are still reading (up until their buffers are full).



@Test
public void shouldPauseAndResumeKafkaStreams() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please try to extract common code to reusable methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I've extracted some reusable methods.

@@ -479,6 +485,26 @@ public void restore(final Map<TaskId, Task> tasks) {
}
}

private void updateStandbyPartitions(final Map<TaskId, Task> tasks,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you forgot to also pause the active tasks in restoration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can make this a more general rule, like:

  1. When reader is in Restore_Active state i.e. there are at least one (for sake of simplicity just say we happen to have exactly one) active task which needs restoration, say taskA; then taskA was paused, we should be able to transit to Update_Standby.
  2. When reader is in Update_Standby state, and there is one active task say taskA resumed; we should be able to transit to Restore_Active.

I know for now it does not matter since we always pause all tasks with the current APIs, but this is extensible for finer-grained controls in the future.

Comment on lines 499 to 502
restoreConsumer.resume(Collections.singleton(partition));
} else {
restoreConsumer.pause(Collections.singleton(partition));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better way would be to collect the partitions to resume and pause and then call restoreConsumer.pause() and restoreConsumer.restore() just once with the collections of partitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in cb6d6f1.

…seResumeIntegrationTest.java

Co-authored-by: Bruno Cadonna <[email protected]>
Comment on lines 440 to 494
@Test
public void shouldWorkAcrossInstances() throws Exception {
// Create data on input topics (would need at least two partitions)
produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);

// Start two instances paused
// Create KafkaStreams instance
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(INPUT_STREAM_1).groupByKey().count().toStream().to(OUTPUT_STREAM_1);

kafkaStreams = new KafkaStreams(builder.build(props()), props());

// Start KafkaStream with paused processing.
kafkaStreams.pause();
kafkaStreams.start();
// Check for rebalancing instead?
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
assertTrue(kafkaStreams.isPaused());

// Create KafkaStreams instance
final StreamsBuilder builder2 = new StreamsBuilder();
builder2.stream(INPUT_STREAM_1).groupByKey().count().toStream().to(OUTPUT_STREAM_2);

kafkaStreams2 = new KafkaStreams(builder2.build(props()), props());

// Start KafkaStream with paused processing.
kafkaStreams2.pause();
kafkaStreams2.start();
// Check for rebalancing instead?
//waitForApplicationState(singletonList(kafkaStreams2), State.RUNNING, STARTUP_TIMEOUT);
assertTrue(kafkaStreams2.isPaused());

// Verify no data?
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 0),
CoreMatchers.equalTo(Collections.emptyList()));

// -- Verify that each instance is in rebalancing (after change to pause active task restoration)

System.out.println("JNH: calling close: " + kafkaStreams.state());
// Close the other -- this causes a rebalance
kafkaStreams2.close();
waitForApplicationState(singletonList(kafkaStreams2), State.NOT_RUNNING, STARTUP_TIMEOUT);

System.out.println("JNH: called close: " + kafkaStreams.state());

// Resume paused instance
kafkaStreams.resume();
System.out.println("JNH: called resume " + kafkaStreams.state());
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
System.out.println("JNH: streams is running again " + new Date());

// Observe all data processed
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3),
CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna I added this as a test to show what happens between multiple clients. I am noticing that this test was taking 45+ seconds to run.

It looks like it is hitting the ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG and I'm trying to think through why that may be happening. Any ideas?

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jnh5y Thank you for the updates!

Here my feedback!

@Test
public void testCanProcessWithoutNamedTopologies() {
final Set<String> topologies = Collections.singleton(UNNAMED_TOPOLOGY);
final Set<String> pausedTopologies = ConcurrentHashMap.newKeySet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why do you use a ConcurrentHashMap here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used ConcurrentHashMap since that's what pausedTopologies is created as elsewhere.

In the unit tests, it can be a HashSet, I switched to that.

Comment on lines 94 to 146
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.niceMock;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you IDE reformatted the imports. The same occurred in TaskExecutionMetadataTest. Could you check also in other files? We use more or less the following import order:

all other imports non-static (sorted alphabetically)

java.* (sorted alphabetically)
javax.* (sorted alphabetically)

static imports (sorted alphabetically)

However, we are not really consistent across files. Nevertheless we should try to keep that order.

@@ -225,7 +225,7 @@
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>

<suppress checks="JavaNCSS"
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|TaskManagerTest).java"/>
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I think we should try to avoid adding suppressions. But I also see that StreamThreadTest would need quite some love at the moment which is not the intent of this PR.

produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
assertNoLag(kafkaStreams);

waitUntilStreamsHasPolled(kafkaStreams, 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your approach!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! It was either going to be a good idea or prove too hacky! Glad you like it!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hacky, but still a good idea 🙂

assertTrue(kafkaStreams.isPaused());

produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
assertNoLag(kafkaStreams);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that this method computes the lag of the store regarding the changelog partition and not of the input partitions. Was this intentional? I think the lag of the store regarding the changelog topic will always be zero in this case because in normal mode the Streams client writes to the changelog topic and so it is always up-to-date.
I thought that this method verified that the main consumer does not have any lag regarding the input partitions which would tell us that the main consumer polled data although it was paused which is the expected behavior. This method with waitUntilStreamsHasPolled(kafkaStreams, 2); and assertTopicSize(OUTPUT_STREAM_1, <same size as before the call to pause>); would tell us that although the Streams clients polled data and went through the poll loop, no data was produced to the output topic. For that you should also set the cache size to 0 with STATESTORE_CACHE_MAX_BYTES_CONFIG.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, good catch! It wasn't intentional... I thought that I had found the right calls to show no lag for consumers...

I tried to switch over to verifying that there was no consumer lag, and I don't think we can sensibly do so.

(If I understand correctly) The consumer offsets are only committed after the input records have been processed. Since there is no processing, the consumer offsets will appear to be behind. (I verified this locally.)

My conclusion is that the method isn't checking anything useful, so I am planning on removing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that you should remove assertNoLag().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed!


awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also add a test that verifies if active tasks in restoration and standbys are paused? Something like you start two Streams clients with 1 standby. If you have one partition one client should get the active stateful task and the other should get the standby task. The clients process some data and write some data into their states. Then shutdown the Streams clients and wipe out the local state. Finally, start both clients paused and verify if the lag of the local stores stays constant and greater than zero for a couple of poll loop iterations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea, but based on https://github.com/apache/kafka/pull/12161/files#r893999817 I'm not sure if we can sensibly check something useful.

It seems like the offsets (at least for the main consumer) will not be committed, so they will be definitely be at 0 since processing is paused.

For the restore consumer, does it have a groupId? Would we be able to see anything about its state?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment https://github.com/apache/kafka/pull/12161/files#r893999817 is unrelated here. In my proposal, you would assert that the lag between local state store and changelog topic does not decrease. You can measure that in a similar way as you already do in assertNoLag().
The restore consumer does not have a group since the partitions are manually assigned to the consumer. If there is no group there are no committed offsets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still a little light on the details. I'll give it a try here in a little bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I added a test named pausedTopologyShouldNotRestoreStateStores that hopefully will cover things.

kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2);
kafkaStreams2.pause();
kafkaStreams2.start();
assertTrue(kafkaStreams2.isPaused());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you also want to wait for RUNNING for the second KafkaStreams instance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that's fair.

properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can remove this config. It is not relevant for what this test tests.

}

@Test
public void pauseResumehouldWorkAcrossInstances() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the builds, this test seems flaky. I think the reason is that the cache (in Streams, not the RocksDB cache) of the state stores is not set to zero. When the cache is not set to zero, the number of results that are send downstreams is not deterministic, because some intermediate results might be sent downstream and some not. I tried to set the cache to zero and was able to run 100+ times the test in a row without failure whereas with the cache > 0, the test failed much earlier.
Note that by changing the cache size also the results that you verify change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I was having trouble shifting through the build info. I've updated the test to set the state store size to 0.

I'll watch the CI jobs to see if they are green after my last push. (They seemed to be ok as I was working today.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you fixed it! 🎉

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jnh5y Thanks you for the updates!

Here my feedback.

Comment on lines 367 to 368
assertTrue(kafkaStreams.allLocalStorePartitionLags().isEmpty());
assertTrue(kafkaStreams2.allLocalStorePartitionLags().isEmpty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you verifying for emptiness? I would expect that there are entries for the state stores with a lag greater than 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played a bit around with the test and indeed if you add a Thread.sleep(2000) before these asserts, the test fails because the returned map is not empty. That means, the assignment was not finished before the asserts were called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could do something like:

        waitForApplicationState(Arrays.asList(kafkaStreams), State.REBALANCING, STARTUP_TIMEOUT);
        waitForCondition(
            () -> !kafkaStreams.allLocalStorePartitionLags().isEmpty(),
            "Lags for local store partitions were not found within the timeout!");
        waitUntilStreamsHasPolled(kafkaStreams, 2);
        final long stateStoreLag1 = kafkaStreams.allLocalStorePartitionLags().get("test-store").get(0).offsetLag();
        waitUntilStreamsHasPolled(kafkaStreams, 2);
        final long stateStoreLag2 = kafkaStreams.allLocalStorePartitionLags().get("test-store").get(0).offsetLag();
        assertTrue(stateStoreLag1 > 0);
        assertEquals(stateStoreLag1, stateStoreLag2);

This code just considers one Streams client. You need to add Materialized.as("test-store") to the call to count() in your topology.
As soon as you activated the standbys, you need to do the same for the second Streams client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I've added these changes.

Comment on lines 343 to 344
kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do not use standby tasks, there is no reason to use two Kafka Streams clients. I would propose to use one standby only for this test. For that you need to set num.standby.replicas to 1. That has the effect that one client gets the active store assigned and the other gets the standby store assigned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake; I've updated the test.

@@ -479,6 +485,47 @@ public void restore(final Map<TaskId, Task> tasks) {
}
}

private void updateStandbyPartitions(final Map<TaskId, Task> tasks,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not forget to rename this method to something more meaningful.
Proposal: pauseResumePartitions()

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jnh5y Thank you for the updates!

LGTM!

Had just one nit.

Thank you for your patience!

Comment on lines 372 to 373
assertStreamsLagStaysConstant(kafkaStreams);
assertStreamsLagStaysConstant(kafkaStreams2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: assertStreamsLagStaysConstant() -> assertStreamsLocalStoreLagStaysConstant()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it!

@jnh5y
Copy link
Contributor Author

jnh5y commented Jun 15, 2022

@jnh5y Thank you for the updates!

LGTM!

Had just one nit.

Thank you for your patience!

@cadonna Thank you for pushing me and helping me learn more about streams!

@cadonna cadonna merged commit 7ed3748 into apache:trunk Jun 16, 2022
mjsax pushed a commit to confluentinc/kafka that referenced this pull request Jun 30, 2022
…ache#12161)

This PR adds the ability to pause and resume KafkaStreams instances as well as named/modular topologies (KIP-834).

Co-authored-by: Bruno Cadonna <[email protected]>

Reviewers: Bonnie Varghese <[email protected]>, Walker Carlson <[email protected]>, Guozhang Wang <[email protected]>, Bruno Cadonna <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants