Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fixes race condition exposing uninitialized query #7627

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ private void registerQuery(
unregisterQuery(oldQuery);
}

// Initialize the query before it's exposed to other threads via the map/sets.
persistentQuery.initialize();
persistentQueries.put(queryId, persistentQuery);
if (createAsQuery) {
createAsQueries.put(persistentQuery.getSinkName(), queryId);
Expand All @@ -278,10 +280,12 @@ private void registerQuery(
insertQueries.computeIfAbsent(sourceName,
x -> Collections.synchronizedSet(new HashSet<>())).add(queryId));
}
} else {
// Initialize the query before it's exposed to other threads via {@link allLiveQueries}.
query.initialize();
}
allLiveQueries.add(query);
notifyCreate(serviceContext, metaStore, query);
query.initialize();
}

private void unregisterQuery(final QueryMetadata query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ Optional<Materialization> getMaterialization(
QueryContext.Stacker contextStacker
);

void restart();

void stop();

StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaughtHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;

Expand All @@ -54,9 +53,9 @@ public class PersistentQueryMetadataImpl
private final Optional<MaterializationProviderBuilderFactory.MaterializationProviderBuilder>
materializationProviderBuilder;
private final Optional<ScalablePushRegistry> scalablePushRegistry;
private final ProcessingLogger processingLogger;

private Optional<MaterializationProvider> materializationProvider;
private ProcessingLogger processingLogger;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
public PersistentQueryMetadataImpl(
Expand Down Expand Up @@ -196,23 +195,6 @@ public Optional<Materialization> getMaterialization(
return materializationProvider.map(builder -> builder.build(queryId, contextStacker));
}

public synchronized void restart() {
if (isClosed()) {
throw new IllegalStateException(String.format(
"Query with application id %s is already closed, cannot restart.",
getQueryApplicationId()));
}

closeKafkaStreams();

final KafkaStreams newKafkaStreams = buildKafkaStreams();
materializationProvider = materializationProviderBuilder.flatMap(
builder -> builder.apply(newKafkaStreams, getTopology()));

resetKafkaStreams(newKafkaStreams);
start();
}

/**
* Stops the query without cleaning up the external resources
* so that it can be resumed when we call {@link #start()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.LagInfo;
Expand Down Expand Up @@ -70,9 +69,10 @@ public class QueryMetadataImpl implements QueryMetadata {
private final RetryEvent retryEvent;
private final Listener listener;

private boolean everStarted = false;
protected boolean closed = false;
private StreamsUncaughtExceptionHandler uncaughtExceptionHandler = this::uncaughtHandler;
private volatile boolean everStarted = false;
protected volatile boolean closed = false;
// These fields don't need synchronization because they are initialized in initialize() before
// the object is made available to other threads.
private KafkaStreams kafkaStreams;
private boolean initialized = false;

Expand Down Expand Up @@ -146,7 +146,6 @@ public long read() {
this.closeTimeout = other.closeTimeout;
this.queryId = other.getQueryId();
this.errorClassifier = other.errorClassifier;
this.uncaughtExceptionHandler = other.uncaughtExceptionHandler;
this.everStarted = other.everStarted;
this.queryErrors = new TimeBoundedQueue(Duration.ZERO, 0);
this.retryEvent = new RetryEvent(
Expand Down Expand Up @@ -222,7 +221,6 @@ public String getStatementString() {
}

public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler handler) {
this.uncaughtExceptionHandler = handler;
kafkaStreams.setUncaughtExceptionHandler(handler);
}

Expand Down Expand Up @@ -304,9 +302,9 @@ Listener getListener() {
return listener;
}

protected void resetKafkaStreams(final KafkaStreams kafkaStreams) {
private void resetKafkaStreams(final KafkaStreams kafkaStreams) {
this.kafkaStreams = kafkaStreams;
setUncaughtExceptionHandler(uncaughtExceptionHandler);
setUncaughtExceptionHandler(this::uncaughtHandler);
kafkaStreams.setStateListener((b, a) -> listener.onStateChange(this, b, a));
}

Expand All @@ -322,10 +320,6 @@ protected void closeKafkaStreams() {
}
}

protected KafkaStreams buildKafkaStreams() {
return kafkaStreamsBuilder.build(topology, streamsProperties);
}

/**
* Closes the {@code QueryMetadata} and cleans up any of
* the resources associated with it (e.g. internal topics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
package io.confluent.ksql.util;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -48,7 +46,6 @@
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -173,42 +170,6 @@ public void shouldNotCleanUpKStreamsAppOnStop() {
verify(kafkaStreams, never()).cleanUp();
}

@Test
public void shouldRestartKafkaStreams() {
final KafkaStreams newKafkaStreams = mock(KafkaStreams.class);
final MaterializationProvider newMaterializationProvider = mock(MaterializationProvider.class);

// Given:
when(kafkaStreamsBuilder.build(any(), any())).thenReturn(newKafkaStreams);
when(materializationProviderBuilder.apply(newKafkaStreams, topology))
.thenReturn(Optional.of(newMaterializationProvider));

// When:
query.restart();

// Then:
final InOrder inOrder = inOrder(kafkaStreams, newKafkaStreams);
inOrder.verify(kafkaStreams).close(any());
inOrder.verify(newKafkaStreams).setUncaughtExceptionHandler(
any(StreamsUncaughtExceptionHandler.class));
inOrder.verify(newKafkaStreams).start();

assertThat(query.getKafkaStreams(), is(newKafkaStreams));
assertThat(query.getMaterializationProvider(), is(Optional.of(newMaterializationProvider)));
}

@Test
public void shouldNotRestartIfQueryIsClosed() {
// Given:
query.close();

// When:
final Exception e = assertThrows(Exception.class, () -> query.restart());

// Then:
assertThat(e.getMessage(), containsString("is already closed, cannot restart."));
}

@Test
public void shouldCallProcessingLoggerOnError() {
// Given:
Expand Down