diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/ProcessingQueue.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/ProcessingQueue.java index 05f13cc26a54..c345235e3edf 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/ProcessingQueue.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/ProcessingQueue.java @@ -37,6 +37,7 @@ public class ProcessingQueue { private final int queueSizeLimit; private boolean closed = false; private boolean droppedRows = false; + private boolean hasError = false; private Runnable newRowCallback = () -> { }; public ProcessingQueue(final QueryId queryId) { @@ -106,6 +107,14 @@ public synchronized boolean hasDroppedRows() { return droppedRows; } + public synchronized void onError() { + hasError = true; + } + + public synchronized boolean getHasError() { + return hasError; + } + public QueryId getQueryId() { return queryId; } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlan.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlan.java index e9531e9f07be..a814f08998e2 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlan.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlan.java @@ -82,14 +82,8 @@ public boolean isClosed() { private void maybeNext(final Publisher publisher) { List row; - while ((row = (List)next()) != null) { - if (dataSourceOperator.droppedRows()) { - closeInternal(); - publisher.reportDroppedRows(); - break; - } else { - publisher.accept(row); - } + while (!isErrored(publisher) && (row = (List)next()) != null) { + publisher.accept(row); } if (!closed) { if (timer >= 0) { @@ -103,6 +97,19 @@ private void maybeNext(final Publisher publisher) { } } + private boolean isErrored(final Publisher publisher) { + if (dataSourceOperator.droppedRows()) { + closeInternal(); + publisher.reportDroppedRows(); + return true; + } else if (dataSourceOperator.hasError()) { + closeInternal(); + publisher.reportHasError(); + return true; + } + return false; + } + private void open(final Publisher publisher) { VertxUtils.checkContext(context); dataSourceOperator.setNewRowCallback(() -> context.runOnContext(v -> maybeNext(publisher))); @@ -151,5 +158,9 @@ public Publisher(final Context ctx) { public void reportDroppedRows() { sendError(new RuntimeException("Dropped rows")); } + + public void reportHasError() { + sendError(new RuntimeException("Persistent query has error")); + } } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/ScalablePushRegistry.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/ScalablePushRegistry.java index 182d40f276ee..eaf9d1c5c054 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/ScalablePushRegistry.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/ScalablePushRegistry.java @@ -151,6 +151,12 @@ private void handleRow(final Record record) { } } + public synchronized void onError() { + for (ProcessingQueue queue : processingQueues.values()) { + queue.onError(); + } + } + @Override public Processor get() { return new PeekProcessor(); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperator.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperator.java index d990f4faa13d..984b13c9b0f7 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperator.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperator.java @@ -93,4 +93,9 @@ public void setNewRowCallback(final Runnable newRowCallback) { public boolean droppedRows() { return processingQueue.hasDroppedRows(); } + + @Override + public boolean hasError() { + return processingQueue.getHasError(); + } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PushDataSourceOperator.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PushDataSourceOperator.java index 600abcc0dcef..ba2fb8ae2e61 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PushDataSourceOperator.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PushDataSourceOperator.java @@ -29,4 +29,6 @@ public interface PushDataSourceOperator { // If rows have been dropped. boolean droppedRows(); + + boolean hasError(); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TransientQueryQueue.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TransientQueryQueue.java index 14b0d224984f..d9693774d570 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/TransientQueryQueue.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/TransientQueryQueue.java @@ -135,10 +135,17 @@ public void acceptRow(final List key, final GenericRow value) { } } + /** + * Accepts the rows without blocking. + * @param key The key + * @param value The value + * @return If the row was accepted or discarded for an acceptable reason, false if it was rejected + * because the queue was full. + */ public boolean acceptRowNonBlocking(final List key, final GenericRow value) { try { if (!callback.shouldQueue()) { - return false; + return true; } final KeyValue, GenericRow> row = keyValue(key, value); @@ -153,8 +160,9 @@ public boolean acceptRowNonBlocking(final List key, final GenericRow value) { } catch (final InterruptedException e) { // Forced shutdown? Thread.currentThread().interrupt(); + return false; } - return false; + return true; } public boolean isClosed() { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java index 07c51d4dfbf1..bd12643598c1 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java @@ -31,6 +31,7 @@ import io.confluent.ksql.physical.scalablepush.ScalablePushRegistry; import io.confluent.ksql.query.KafkaStreamsBuilder; import io.confluent.ksql.query.MaterializationProviderBuilderFactory; +import io.confluent.ksql.query.QueryError; import io.confluent.ksql.query.QueryErrorClassifier; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.PhysicalSchema; @@ -40,6 +41,7 @@ import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; @@ -105,7 +107,7 @@ public PersistentQueryMetadataImpl( maxQueryErrorsQueueSize, retryBackoffInitialMs, retryBackoffMaxMs, - listener + new QueryListenerWrapper(listener, scalablePushRegistry) ); this.sinkDataSource = requireNonNull(sinkDataSource, "sinkDataSource"); this.schemas = requireNonNull(schemas, "schemas"); @@ -226,4 +228,32 @@ public synchronized void stop() { public Optional getScalablePushRegistry() { return scalablePushRegistry; } + + private static final class QueryListenerWrapper implements Listener { + private final Listener listener; + private final Optional scalablePushRegistry; + + private QueryListenerWrapper(final Listener listener, + final Optional scalablePushRegistry) { + this.listener = listener; + this.scalablePushRegistry = scalablePushRegistry; + } + + @Override + public void onError(final QueryMetadata queryMetadata, final QueryError error) { + this.listener.onError(queryMetadata, error); + scalablePushRegistry.ifPresent(ScalablePushRegistry::onError); + } + + @Override + public void onStateChange(final QueryMetadata queryMetadata, final State before, + final State after) { + this.listener.onStateChange(queryMetadata, before, after); + } + + @Override + public void onClose(final QueryMetadata queryMetadata) { + this.listener.onClose(queryMetadata); + } + } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/ScalablePushQueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/ScalablePushQueryMetadata.java index 987a728c58fe..f4bfa9e9fe42 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/ScalablePushQueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/ScalablePushQueryMetadata.java @@ -97,7 +97,7 @@ public void setLimitHandler(final LimitHandler limitHandler) { @Override public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler handler) { - // We don't do anything special here since the persistent query handles its own errors + onException(handler::handle); } @Override diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/ProcessingQueueTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/ProcessingQueueTest.java index 3b30fb6a5a52..7973c620cdfc 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/ProcessingQueueTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/ProcessingQueueTest.java @@ -63,4 +63,13 @@ public void shouldHitLimit() { assertThat(queue.poll(), nullValue()); assertThat(queue.hasDroppedRows(), is(true)); } + + @Test + public void shouldDefaultToFalseForHasError() { + // Given: + final ProcessingQueue queue = new ProcessingQueue(new QueryId("a")); + + // Then: + assertThat(queue.getHasError(),is(false)); + } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanTest.java index 841b4665df34..98be7ab0f624 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanTest.java @@ -96,7 +96,7 @@ public void shouldStopOnDroppedRows() throws InterruptedException { final PushPhysicalPlan pushPhysicalPlan = new PushPhysicalPlan(root, logicalSchema, queryId, scalablePushRegistry, pushDataSourceOperator, context); doNothing().when(pushDataSourceOperator).setNewRowCallback(runnableCaptor.capture()); - when(pushDataSourceOperator.droppedRows()).thenReturn(false, true); + when(pushDataSourceOperator.droppedRows()).thenReturn(false, false, true); final BufferedPublisher> publisher = pushPhysicalPlan.execute(); final TestSubscriber> subscriber = new TestSubscriber<>(); @@ -124,6 +124,39 @@ public void shouldStopOnDroppedRows() throws InterruptedException { assertThat(pushPhysicalPlan.isClosed(), is(true)); } + @Test + public void shouldStopOnHasError() throws InterruptedException { + final PushPhysicalPlan pushPhysicalPlan = new PushPhysicalPlan(root, logicalSchema, queryId, + scalablePushRegistry, pushDataSourceOperator, context); + doNothing().when(pushDataSourceOperator).setNewRowCallback(runnableCaptor.capture()); + when(pushDataSourceOperator.hasError()).thenReturn(false, false, true); + + final BufferedPublisher> publisher = pushPhysicalPlan.execute(); + final TestSubscriber> subscriber = new TestSubscriber<>(); + publisher.subscribe(subscriber); + + context.owner().setPeriodic(50, timerId -> { + if (runnableCaptor.getValue() == null) { + return; + } + when(root.next()).thenReturn(ROW1, ROW2, null); + + runnableCaptor.getValue().run(); + runnableCaptor.getValue().run(); + + context.owner().cancelTimer(timerId); + }); + + while (subscriber.getError() == null) { + Thread.sleep(100); + } + + assertThat(subscriber.getError().getMessage(), containsString("Persistent query has error")); + assertThat(subscriber.getValues().size(), is(1)); + assertThat(subscriber.getValues().get(0), is(ROW1)); + assertThat(pushPhysicalPlan.isClosed(), is(true)); + } + public static class TestSubscriber implements Subscriber { private Subscription sub; diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/ScalablePushRegistryTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/ScalablePushRegistryTest.java index f108335f2a88..c42b5ba8ee3d 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/ScalablePushRegistryTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/ScalablePushRegistryTest.java @@ -188,4 +188,17 @@ public void shouldCreate_noApplicationServer() { // Then assertThat(registry.isPresent(), is(false)); } + + @Test + public void shouldCallOnErrorOnQueue() { + // Given + ScalablePushRegistry registry = new ScalablePushRegistry(locator, SCHEMA, false, false); + registry.register(processingQueue); + + // When + registry.onError(); + + // Then: + verify(processingQueue).onError(); + } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperatorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperatorTest.java index 2ee169b3731c..db6242c856d8 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperatorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperatorTest.java @@ -35,6 +35,8 @@ public class PeekStreamOperatorTest { private TableRow row2; @Mock private Runnable newRowCallback; + @Mock + private ProcessingQueue processingQueue; @Test public void shouldGetRowsFromOperator() { @@ -57,4 +59,15 @@ public void shouldGetRowsFromOperator() { locator.close(); verify(registry, times(1)).unregister(processingQueue); } + + @Test + public void shouldDefaultToFalseForHasErrorOnQueue() { + // Given: + final PeekStreamOperator locator = new PeekStreamOperator(registry, dataSourceNode, QUERY_ID); + // When: + locator.open(); + + // Then: + assertThat(locator.hasError(),is(false)); + } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java index caa8d3225b06..bd8a1d10a33b 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java @@ -30,6 +30,7 @@ import io.confluent.ksql.execution.streams.materialization.MaterializationProvider; import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.metastore.model.DataSource; +import io.confluent.ksql.physical.scalablepush.ScalablePushRegistry; import io.confluent.ksql.query.KafkaStreamsBuilder; import io.confluent.ksql.query.MaterializationProviderBuilderFactory; import io.confluent.ksql.query.QueryError; @@ -91,6 +92,8 @@ public class PersistentQueryMetadataTest { private ProcessingLogger processingLogger; @Mock private Listener listener; + @Mock + private ScalablePushRegistry scalablePushRegistry; private PersistentQueryMetadata query; @@ -125,7 +128,7 @@ public void setUp() { 0L, 0L, listener, - Optional.empty() + Optional.of(scalablePushRegistry) ); query.initialize();