Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fail scalable push query if error is detected in subscribed persistent query #7996

Merged
merged 20 commits into from
Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class ProcessingQueue {
private final int queueSizeLimit;
private boolean closed = false;
private boolean droppedRows = false;
private boolean hasError = false;
private Runnable newRowCallback = () -> { };

public ProcessingQueue(final QueryId queryId) {
Expand Down Expand Up @@ -106,6 +107,14 @@ public synchronized boolean hasDroppedRows() {
return droppedRows;
}

public synchronized void onError() {
hasError = true;
}

public synchronized boolean getHasError() {
return hasError;
}

public QueryId getQueryId() {
return queryId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,8 @@ public boolean isClosed() {

private void maybeNext(final Publisher publisher) {
List<?> row;
while ((row = (List<?>)next()) != null) {
if (dataSourceOperator.droppedRows()) {
closeInternal();
publisher.reportDroppedRows();
break;
} else {
publisher.accept(row);
}
while (!isErrored(publisher) && (row = (List<?>)next()) != null) {
publisher.accept(row);
}
if (!closed) {
if (timer >= 0) {
Expand All @@ -103,6 +97,19 @@ private void maybeNext(final Publisher publisher) {
}
}

private boolean isErrored(final Publisher publisher) {
if (dataSourceOperator.droppedRows()) {
closeInternal();
publisher.reportDroppedRows();
return true;
} else if (dataSourceOperator.hasError()) {
closeInternal();
publisher.reportHasError();
return true;
}
return false;
}

private void open(final Publisher publisher) {
VertxUtils.checkContext(context);
dataSourceOperator.setNewRowCallback(() -> context.runOnContext(v -> maybeNext(publisher)));
Expand Down Expand Up @@ -151,5 +158,9 @@ public Publisher(final Context ctx) {
public void reportDroppedRows() {
sendError(new RuntimeException("Dropped rows"));
}

public void reportHasError() {
sendError(new RuntimeException("Persistent query has error"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ private void handleRow(final Record<Object, GenericRow> record) {
}
}

public synchronized void onError() {
for (ProcessingQueue queue : processingQueues.values()) {
queue.onError();
}
}

@Override
public Processor<Object, GenericRow, Void, Void> get() {
return new PeekProcessor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,9 @@ public void setNewRowCallback(final Runnable newRowCallback) {
public boolean droppedRows() {
return processingQueue.hasDroppedRows();
}

@Override
public boolean hasError() {
return processingQueue.getHasError();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ public interface PushDataSourceOperator {

// If rows have been dropped.
boolean droppedRows();

boolean hasError();
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ public void acceptRow(final List<?> key, final GenericRow value) {
public boolean acceptRowNonBlocking(final List<?> key, final GenericRow value) {
try {
if (!callback.shouldQueue()) {
return false;
callback.onQueued();
nateab marked this conversation as resolved.
Show resolved Hide resolved
return true;
}

final KeyValue<List<?>, GenericRow> row = keyValue(key, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.confluent.ksql.physical.scalablepush.ScalablePushRegistry;
import io.confluent.ksql.query.KafkaStreamsBuilder;
import io.confluent.ksql.query.MaterializationProviderBuilderFactory;
import io.confluent.ksql.query.QueryError;
import io.confluent.ksql.query.QueryErrorClassifier;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
Expand All @@ -40,6 +41,7 @@
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;

Expand Down Expand Up @@ -105,7 +107,7 @@ public PersistentQueryMetadataImpl(
maxQueryErrorsQueueSize,
retryBackoffInitialMs,
retryBackoffMaxMs,
listener
new ScalablePushQueryListener(listener, scalablePushRegistry)
);
this.sinkDataSource = requireNonNull(sinkDataSource, "sinkDataSource");
this.schemas = requireNonNull(schemas, "schemas");
Expand Down Expand Up @@ -226,4 +228,32 @@ public synchronized void stop() {
public Optional<ScalablePushRegistry> getScalablePushRegistry() {
return scalablePushRegistry;
}

private static final class ScalablePushQueryListener implements Listener {
nateab marked this conversation as resolved.
Show resolved Hide resolved
private final Listener listener;
private final Optional<ScalablePushRegistry> scalablePushRegistry;

private ScalablePushQueryListener(final Listener listener,
final Optional<ScalablePushRegistry> scalablePushRegistry) {
this.listener = listener;
this.scalablePushRegistry = scalablePushRegistry;
}

@Override
public void onError(final QueryMetadata queryMetadata, final QueryError error) {
this.listener.onError(queryMetadata, error);
scalablePushRegistry.ifPresent(ScalablePushRegistry::onError);
}

@Override
public void onStateChange(final QueryMetadata queryMetadata, final State before,
final State after) {
this.listener.onStateChange(queryMetadata, before, after);
}

@Override
public void onClose(final QueryMetadata queryMetadata) {
this.listener.onClose(queryMetadata);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,14 @@ protected StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaught
Throwables.getStackTraceAsString(e),
errorType
);
listener.onError(this, queryError);
queryErrors.add(queryError);
LOG.error(
"Unhandled exception caught in streams thread {}. ({})",
Thread.currentThread().getName(),
errorType,
e
);
listener.onError(this, queryError);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, why was it required to move this call?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was failing some tests in SecureIntegrationTest that called assertQueryFailsWithUserError. It seemed that the queryError was not being added and so the tests failed, but when i moved the listener to the end they passed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We looked into this offline and this line can be reverted.

}
retryEvent.backOff();
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void setLimitHandler(final LimitHandler limitHandler) {

@Override
public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler handler) {
// We don't do anything special here since the persistent query handles its own errors
onException(handler::handle);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,13 @@ public void shouldHitLimit() {
assertThat(queue.poll(), nullValue());
assertThat(queue.hasDroppedRows(), is(true));
}

@Test
public void shouldDefaultToFalseForHasError() {
// Given:
final ProcessingQueue queue = new ProcessingQueue(new QueryId("a"));

// Then:
assertThat(queue.getHasError(),is(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void shouldStopOnDroppedRows() throws InterruptedException {
final PushPhysicalPlan pushPhysicalPlan = new PushPhysicalPlan(root, logicalSchema, queryId,
scalablePushRegistry, pushDataSourceOperator, context);
doNothing().when(pushDataSourceOperator).setNewRowCallback(runnableCaptor.capture());
when(pushDataSourceOperator.droppedRows()).thenReturn(false, true);
when(pushDataSourceOperator.droppedRows()).thenReturn(false, false, true);

final BufferedPublisher<List<?>> publisher = pushPhysicalPlan.execute();
final TestSubscriber<List<?>> subscriber = new TestSubscriber<>();
Expand Down Expand Up @@ -124,6 +124,39 @@ public void shouldStopOnDroppedRows() throws InterruptedException {
assertThat(pushPhysicalPlan.isClosed(), is(true));
}

@Test
public void shouldStopOnHasError() throws InterruptedException {
final PushPhysicalPlan pushPhysicalPlan = new PushPhysicalPlan(root, logicalSchema, queryId,
scalablePushRegistry, pushDataSourceOperator, context);
doNothing().when(pushDataSourceOperator).setNewRowCallback(runnableCaptor.capture());
when(pushDataSourceOperator.hasError()).thenReturn(false, false, true);

final BufferedPublisher<List<?>> publisher = pushPhysicalPlan.execute();
final TestSubscriber<List<?>> subscriber = new TestSubscriber<>();
publisher.subscribe(subscriber);

context.owner().setPeriodic(50, timerId -> {
if (runnableCaptor.getValue() == null) {
return;
}
when(root.next()).thenReturn(ROW1, ROW2, null);

runnableCaptor.getValue().run();
runnableCaptor.getValue().run();

context.owner().cancelTimer(timerId);
});

while (subscriber.getError() == null) {
Thread.sleep(100);
}

assertThat(subscriber.getError().getMessage(), containsString("Persistent query has error"));
nateab marked this conversation as resolved.
Show resolved Hide resolved
assertThat(subscriber.getValues().size(), is(1));
assertThat(subscriber.getValues().get(0), is(ROW1));
assertThat(pushPhysicalPlan.isClosed(), is(true));
}

public static class TestSubscriber<T> implements Subscriber<T> {

private Subscription sub;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,17 @@ public void shouldCreate_noApplicationServer() {
// Then
assertThat(registry.isPresent(), is(false));
}

@Test
public void shouldCallOnErrorOnQueue() {
// Given
ScalablePushRegistry registry = new ScalablePushRegistry(locator, SCHEMA, false, false);
registry.register(processingQueue);

// When
registry.onError();

// Then:
verify(processingQueue).onError();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class PeekStreamOperatorTest {
private TableRow row2;
@Mock
private Runnable newRowCallback;
@Mock
private ProcessingQueue processingQueue;

@Test
public void shouldGetRowsFromOperator() {
Expand All @@ -57,4 +59,15 @@ public void shouldGetRowsFromOperator() {
locator.close();
verify(registry, times(1)).unregister(processingQueue);
}

@Test
public void shouldDefaultToFalseForHasErrorOnQueue() {
// Given:
final PeekStreamOperator locator = new PeekStreamOperator(registry, dataSourceNode, QUERY_ID);
// When:
locator.open();

// Then:
assertThat(locator.hasError(),is(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.confluent.ksql.execution.streams.materialization.MaterializationProvider;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.physical.scalablepush.ScalablePushRegistry;
import io.confluent.ksql.query.KafkaStreamsBuilder;
import io.confluent.ksql.query.MaterializationProviderBuilderFactory;
import io.confluent.ksql.query.QueryError;
Expand Down Expand Up @@ -91,6 +92,8 @@ public class PersistentQueryMetadataTest {
private ProcessingLogger processingLogger;
@Mock
private Listener listener;
@Mock
private ScalablePushRegistry scalablePushRegistry;

private PersistentQueryMetadata query;

Expand Down Expand Up @@ -125,7 +128,7 @@ public void setUp() {
0L,
0L,
listener,
Optional.empty()
Optional.of(scalablePushRegistry)
);

query.initialize();
Expand Down