diff --git a/ksql-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java b/ksql-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java index f60948665752..68fd6dbf56b1 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java @@ -88,13 +88,6 @@ public interface KsqlExecutionContext { */ PreparedStatement prepare(ParsedStatement stmt); - /** - * Executes a statement using the engine's primary service context. - * - * @see #execute(ServiceContext, ConfiguredStatement) - */ - ExecuteResult execute(ConfiguredStatement statement); - /** * Execute the supplied statement, updating the meta store and registering any query. * diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java index 369584005e04..d88725b6ba3d 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java @@ -159,13 +159,6 @@ public PreparedStatement prepare(final ParsedStatement stmt) { return primaryContext.prepare(stmt); } - @Override - public ExecuteResult execute( - final ConfiguredStatement statement - ) { - return execute(primaryContext.getServiceContext(), statement); - } - @Override public ExecuteResult execute( final ServiceContext serviceContext, diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java index 6c0ce14711c7..8bb8700fe007 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java @@ -79,13 +79,6 @@ public PreparedStatement prepare(final ParsedStatement stmt) { return engineContext.prepare(stmt); } - @Override - public ExecuteResult execute( - final ConfiguredStatement statement - ) { - return execute(engineContext.getServiceContext(), statement); - } - @Override public ExecuteResult execute( final ServiceContext serviceContext, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java index f8aa0edb7aff..ee50a83b303f 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java @@ -103,6 +103,7 @@ public class KsqlEngineTest { private KsqlEngine ksqlEngine; private ServiceContext serviceContext; + private ServiceContext sandboxServiceContext; @Spy private final FakeKafkaTopicClient topicClient = new FakeKafkaTopicClient(); private KsqlExecutionContext sandbox; @@ -122,6 +123,7 @@ public void setUp() { ); sandbox = ksqlEngine.createSandbox(serviceContext); + sandboxServiceContext = sandbox.getServiceContext(); } @After @@ -134,8 +136,14 @@ public void closeEngine() { public void shouldCreatePersistentQueries() { // When: final List queries - = KsqlEngineTestUtil.execute(ksqlEngine, "create table bar as select * from test2;" + - "create table foo as select * from test2;", KSQL_CONFIG, Collections.emptyMap()); + = KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + "create table bar as select * from test2;" + + "create table foo as select * from test2;", + KSQL_CONFIG, + Collections.emptyMap() + ); // Then: assertThat(queries, hasSize(2)); @@ -149,6 +157,7 @@ public void shouldCreatePersistentQueries() { public void shouldNotHaveRowTimeAndRowKeyColumnsInPersistentQueryValueSchema() { // When: final PersistentQueryMetadata query = (PersistentQueryMetadata) KsqlEngineTestUtil.execute( + serviceContext, ksqlEngine, "create table bar as select * from test2;", KSQL_CONFIG, @@ -167,8 +176,13 @@ public void shouldNotHaveRowTimeAndRowKeyColumnsInPersistentQueryValueSchema() { public void shouldThrowOnTerminateAsNotExecutable() { // Given: final PersistentQueryMetadata query = (PersistentQueryMetadata) KsqlEngineTestUtil - .execute(ksqlEngine, - "create table bar as select * from test2;", KSQL_CONFIG, Collections.emptyMap()).get(0); + .execute( + serviceContext, + ksqlEngine, + "create table bar as select * from test2;", + KSQL_CONFIG, + Collections.emptyMap()) + .get(0); expectedException.expect(KsqlStatementException.class); expectedException.expect(rawMessage(is("Statement not executable"))); @@ -176,7 +190,12 @@ public void shouldThrowOnTerminateAsNotExecutable() { // When: KsqlEngineTestUtil.execute( - ksqlEngine, "TERMINATE " + query.getQueryId() + ";", KSQL_CONFIG, Collections.emptyMap()); + serviceContext, + ksqlEngine, + "TERMINATE " + query.getQueryId() + ";", + KSQL_CONFIG, + Collections.emptyMap() + ); } @Test @@ -191,7 +210,7 @@ public void shouldExecuteInsertIntoStreamOnSandBox() { // When: final ExecuteResult result = sandbox - .execute(ConfiguredStatement.of( + .execute(sandboxServiceContext, ConfiguredStatement.of( sandbox.prepare(statements.get(1)), Collections.emptyMap(), KSQL_CONFIG)); @@ -203,7 +222,7 @@ public void shouldExecuteInsertIntoStreamOnSandBox() { @Test public void shouldThrowWhenExecutingInsertIntoTable() { KsqlEngineTestUtil.execute( - ksqlEngine, "create table bar as select * from test2;", KSQL_CONFIG, + serviceContext, ksqlEngine, "create table bar as select * from test2;", KSQL_CONFIG, Collections.emptyMap()); final ParsedStatement parsed = ksqlEngine.parse("insert into bar select * from test2;").get(0); @@ -220,8 +239,12 @@ public void shouldThrowWhenExecutingInsertIntoTable() { @Test public void shouldThrowOnInsertIntoStreamWithTableResult() { KsqlEngineTestUtil.execute( - ksqlEngine, "create stream bar as select itemid, orderid from orders;", KSQL_CONFIG, - Collections.emptyMap()); + serviceContext, + ksqlEngine, + "create stream bar as select itemid, orderid from orders;", + KSQL_CONFIG, + Collections.emptyMap() + ); // Then: expectedException.expect(KsqlStatementException.class); @@ -233,6 +256,7 @@ public void shouldThrowOnInsertIntoStreamWithTableResult() { // When: KsqlEngineTestUtil.execute( + serviceContext, ksqlEngine, "insert into bar select itemid, count(*) from orders group by itemid;", KSQL_CONFIG, @@ -243,8 +267,12 @@ public void shouldThrowOnInsertIntoStreamWithTableResult() { @Test public void shouldThrowOnInsertIntoWithKeyMismatch() { KsqlEngineTestUtil.execute( - ksqlEngine, "create stream bar as select * from orders;", KSQL_CONFIG, - Collections.emptyMap()); + serviceContext, + ksqlEngine, + "create stream bar as select * from orders;", + KSQL_CONFIG, + Collections.emptyMap() + ); // Then: expectedException.expect(KsqlStatementException.class); @@ -257,6 +285,7 @@ public void shouldThrowOnInsertIntoWithKeyMismatch() { // When: KsqlEngineTestUtil.execute( + serviceContext, ksqlEngine, "insert into bar select * from orders partition by itemid;", KSQL_CONFIG, @@ -268,8 +297,12 @@ public void shouldThrowOnInsertIntoWithKeyMismatch() { public void shouldThrowWhenInsertIntoSchemaDoesNotMatch() { // Given: KsqlEngineTestUtil.execute( - ksqlEngine, "create stream bar as select * from orders;", KSQL_CONFIG, - Collections.emptyMap()); + serviceContext, + ksqlEngine, + "create stream bar as select * from orders;", + KSQL_CONFIG, + Collections.emptyMap() + ); // Then: expectedException.expect(KsqlStatementException.class); @@ -279,6 +312,7 @@ public void shouldThrowWhenInsertIntoSchemaDoesNotMatch() { // When: KsqlEngineTestUtil.execute( + serviceContext, ksqlEngine, "insert into bar select itemid from orders;", KSQL_CONFIG, @@ -290,12 +324,21 @@ public void shouldThrowWhenInsertIntoSchemaDoesNotMatch() { public void shouldExecuteInsertIntoStream() { // Given: KsqlEngineTestUtil.execute( - ksqlEngine, "create stream bar as select * from orders;", KSQL_CONFIG, - Collections.emptyMap()); + serviceContext, + ksqlEngine, + "create stream bar as select * from orders;", + KSQL_CONFIG, + Collections.emptyMap() + ); // When: final List queries = KsqlEngineTestUtil.execute( - ksqlEngine, "insert into bar select * from orders;", KSQL_CONFIG, Collections.emptyMap()); + serviceContext, + ksqlEngine, + "insert into bar select * from orders;", + KSQL_CONFIG, + Collections.emptyMap() + ); // Then: assertThat(queries, hasSize(1)); @@ -304,7 +347,9 @@ public void shouldExecuteInsertIntoStream() { @Test public void shouldMaintainOrderOfReturnedQueries() { // When: - final List queries = KsqlEngineTestUtil.execute(ksqlEngine, + final List queries = KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, "create stream foo as select * from orders;" + "create stream bar as select * from orders;", KSQL_CONFIG, Collections.emptyMap()); @@ -318,19 +363,36 @@ public void shouldMaintainOrderOfReturnedQueries() { @Test(expected = ParseFailedException.class) public void shouldFailToCreateQueryIfSelectingFromNonExistentEntity() { KsqlEngineTestUtil - .execute(ksqlEngine, "select * from bar;", KSQL_CONFIG, Collections.emptyMap()); + .execute( + serviceContext, + ksqlEngine, + "select * from bar;", + KSQL_CONFIG, + Collections.emptyMap() + ); } @Test(expected = ParseFailedException.class) public void shouldFailWhenSyntaxIsInvalid() { - KsqlEngineTestUtil.execute(ksqlEngine, "blah;", KSQL_CONFIG, Collections.emptyMap()); + KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + "blah;", + KSQL_CONFIG, + Collections.emptyMap() + ); } @Test public void shouldUpdateReferentialIntegrityTableCorrectly() { - KsqlEngineTestUtil.execute(ksqlEngine, "create table bar as select * from test2;" + - "create table foo as select * from test2;", KSQL_CONFIG, Collections - .emptyMap()); + KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + "create table bar as select * from test2;" + + "create table foo as select * from test2;", + KSQL_CONFIG, + Collections.emptyMap() + ); assertThat(metaStore.getQueriesWithSource(SourceName.of("TEST2")), equalTo(Utils.mkSet("CTAS_BAR_0", "CTAS_FOO_1"))); @@ -341,9 +403,14 @@ public void shouldUpdateReferentialIntegrityTableCorrectly() { @Test public void shouldFailIfReferentialIntegrityIsViolated() { // Given: - KsqlEngineTestUtil.execute(ksqlEngine, "create table bar as select * from test2;" + - "create table foo as select * from test2;", - KSQL_CONFIG, Collections.emptyMap()); + KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + "create table bar as select * from test2;" + + "create table foo as select * from test2;", + KSQL_CONFIG, + Collections.emptyMap() + ); expectedException.expect(KsqlStatementException.class); expectedException.expect(rawMessage(is( @@ -354,7 +421,13 @@ public void shouldFailIfReferentialIntegrityIsViolated() { expectedException.expect(statementText(is("drop table foo;"))); // When: - KsqlEngineTestUtil.execute(ksqlEngine, "drop table foo;", KSQL_CONFIG, Collections.emptyMap()); + KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + "drop table foo;", + KSQL_CONFIG, + Collections.emptyMap() + ); } @Test @@ -371,21 +444,34 @@ public void shouldFailDDLStatementIfTopicDoesNotExist() { expectedException.expectMessage("Kafka topic does not exist: S1_NOTEXIST"); // When: - sandbox.execute(ConfiguredStatement.of(prepared, Collections.emptyMap(), KSQL_CONFIG)); + sandbox.execute( + sandboxServiceContext, + ConfiguredStatement.of(prepared, Collections.emptyMap(), KSQL_CONFIG) + ); } @Test public void shouldDropTableIfAllReferencedQueriesTerminated() { // Given: - final QueryMetadata secondQuery = KsqlEngineTestUtil.execute(ksqlEngine, + final QueryMetadata secondQuery = KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, "create table bar as select * from test2;" + "create table foo as select * from test2;", - KSQL_CONFIG, Collections.emptyMap()).get(1); + KSQL_CONFIG, + Collections.emptyMap()) + .get(1); secondQuery.close(); // When: - KsqlEngineTestUtil.execute(ksqlEngine, "drop table foo;", KSQL_CONFIG, Collections.emptyMap()); + KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + "drop table foo;", + KSQL_CONFIG, + Collections.emptyMap() + ); // Then: assertThat(metaStore.getSource(SourceName.of("foo")), nullValue()); @@ -421,7 +507,10 @@ public void shouldThrowFromSandBoxOnPrepareIfSourceTopicDoesNotExist() { + " WITH (KAFKA_TOPIC = 'i_do_not_exist', VALUE_FORMAT = 'JSON');"))); // When: - sandbox.execute(ConfiguredStatement.of(statement, new HashMap<>(), KSQL_CONFIG)); + sandbox.execute( + sandboxServiceContext, + ConfiguredStatement.of(statement, new HashMap<>(), KSQL_CONFIG) + ); } @Test @@ -436,7 +525,10 @@ public void shouldThrowFromExecuteIfSourceTopicDoesNotExist() { expectedException.expect(rawMessage(is("Kafka topic does not exist: i_do_not_exist"))); // When: - ksqlEngine.execute(ConfiguredStatement.of(statement, new HashMap<>(), KSQL_CONFIG)); + ksqlEngine.execute( + serviceContext, + ConfiguredStatement.of(statement, new HashMap<>(), KSQL_CONFIG) + ); } @Test @@ -464,10 +556,13 @@ public void shouldThrowIfSchemaNotPresent() { "create stream bar with (value_format='avro', kafka_topic='bar');"))); // When: - KsqlEngineTestUtil.execute(ksqlEngine, + KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, "create stream bar with (value_format='avro', kafka_topic='bar');", KSQL_CONFIG, - Collections.emptyMap()); + Collections.emptyMap() + ); } @Test @@ -495,10 +590,13 @@ public void shouldFailIfAvroSchemaNotEvolvable() { "CREATE TABLE T WITH(VALUE_FORMAT='AVRO') AS SELECT * FROM TEST2;"))); // When: - KsqlEngineTestUtil.execute(ksqlEngine, + KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, "CREATE TABLE T WITH(VALUE_FORMAT='AVRO') AS SELECT * FROM TEST2;", KSQL_CONFIG, - Collections.emptyMap()); + Collections.emptyMap() + ); } @Test @@ -512,10 +610,13 @@ public void shouldNotFailIfAvroSchemaEvolvable() { givenTopicWithSchema("T", evolvableSchema); // When: - KsqlEngineTestUtil.execute(ksqlEngine, + KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, "CREATE TABLE T WITH(VALUE_FORMAT='AVRO') AS SELECT * FROM TEST2;", KSQL_CONFIG, - Collections.emptyMap()); + Collections.emptyMap() + ); // Then: assertThat(metaStore.getSource(SourceName.of("T")), is(notNullValue())); @@ -525,9 +626,12 @@ public void shouldNotFailIfAvroSchemaEvolvable() { public void shouldNotDeleteSchemaNorTopicForTable() throws Exception { // Given: givenTopicsExist("BAR"); - final QueryMetadata query = KsqlEngineTestUtil.execute(ksqlEngine, + final QueryMetadata query = KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, "create table bar with (value_format = 'avro') as select * from test2;", - KSQL_CONFIG, Collections.emptyMap()).get(0); + KSQL_CONFIG, Collections.emptyMap() + ).get(0); query.close(); @@ -539,7 +643,13 @@ public void shouldNotDeleteSchemaNorTopicForTable() throws Exception { schemaRegistryClient.register("BAR-value", schema); // When: - KsqlEngineTestUtil.execute(ksqlEngine, "DROP TABLE bar;", KSQL_CONFIG, Collections.emptyMap()); + KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + "DROP TABLE bar;", + KSQL_CONFIG, + Collections.emptyMap() + ); // Then: assertThat(serviceContext.getTopicClient().isTopicExists("BAR"), equalTo(true)); @@ -549,9 +659,12 @@ public void shouldNotDeleteSchemaNorTopicForTable() throws Exception { @Test public void shouldCleanUpInternalTopicsOnClose() { // Given: - final QueryMetadata query = KsqlEngineTestUtil.execute(ksqlEngine, + final QueryMetadata query = KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, "select * from test1 EMIT CHANGES;", - KSQL_CONFIG, Collections.emptyMap()).get(0); + KSQL_CONFIG, Collections.emptyMap() + ).get(0); query.start(); @@ -565,9 +678,12 @@ public void shouldCleanUpInternalTopicsOnClose() { @Test public void shouldNotCleanUpInternalTopicsOnCloseIfQueryNeverStarted() { // Given: - final QueryMetadata query = KsqlEngineTestUtil.execute(ksqlEngine, + final QueryMetadata query = KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, "create stream s1 with (value_format = 'avro') as select * from test1;", - KSQL_CONFIG, Collections.emptyMap()).get(0); + KSQL_CONFIG, Collections.emptyMap() + ).get(0); // When: query.close(); @@ -582,9 +698,13 @@ public void shouldRemovePersistentQueryFromEngineWhenClosed() { final int startingLiveQueries = ksqlEngine.numberOfLiveQueries(); final int startingPersistentQueries = ksqlEngine.getPersistentQueries().size(); - final QueryMetadata query = KsqlEngineTestUtil.execute(ksqlEngine, + final QueryMetadata query = KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, "create stream s1 with (value_format = 'avro') as select * from test1;", - KSQL_CONFIG, Collections.emptyMap()).get(0); + KSQL_CONFIG, + Collections.emptyMap() + ).get(0); // When: query.close(); @@ -600,9 +720,12 @@ public void shouldRemoveTransientQueryFromEngineWhenClosed() { // Given: final int startingLiveQueries = ksqlEngine.numberOfLiveQueries(); - final QueryMetadata query = KsqlEngineTestUtil.execute(ksqlEngine, + final QueryMetadata query = KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, "select * from test1 EMIT CHANGES;", - KSQL_CONFIG, Collections.emptyMap()).get(0); + KSQL_CONFIG, Collections.emptyMap() + ).get(0); // When: query.close(); @@ -614,10 +737,13 @@ public void shouldRemoveTransientQueryFromEngineWhenClosed() { @Test public void shouldSetKsqlSinkForSinks() { // When: - KsqlEngineTestUtil.execute(ksqlEngine, + KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, "create stream s as select * from orders;" + "create table t as select itemid, count(*) from orders group by itemid;", - KSQL_CONFIG, Collections.emptyMap()); + KSQL_CONFIG, Collections.emptyMap() + ); // Then: assertThat(metaStore.getSource(SourceName.of("S")).getKsqlTopic().isKsqlSink(), is(true)); @@ -660,6 +786,7 @@ public void shouldHandleMultipleStatements() { { final PreparedStatement prepared = ksqlEngine.prepare(stmt); final ExecuteResult result = ksqlEngine.execute( + serviceContext, ConfiguredStatement.of(prepared, new HashMap<>(), KSQL_CONFIG)); result.getQuery().ifPresent(queries::add); return prepared; @@ -715,7 +842,10 @@ public void shouldThrowWhenExecutingDuplicateTable() { "CREATE TABLE FOO WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM TEST2;"))); // When: - ksqlEngine.execute(ConfiguredStatement.of(prepared, new HashMap<>(), KSQL_CONFIG)); + ksqlEngine.execute( + serviceContext, + ConfiguredStatement.of(prepared, new HashMap<>(), KSQL_CONFIG) + ); } @Test @@ -767,7 +897,10 @@ public void shouldThrowWhenExecutingDuplicateStream() { "CREATE STREAM FOO WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM ORDERS;"))); // When: - ksqlEngine.execute(ConfiguredStatement.of(prepared, new HashMap<>(), KSQL_CONFIG)); + ksqlEngine.execute( + serviceContext, + ConfiguredStatement.of(prepared, new HashMap<>(), KSQL_CONFIG) + ); } @Test @@ -781,9 +914,12 @@ public void shouldThrowWhenExecutingQueriesIfCsasCreatesTable() { "CREATE STREAM FOO AS SELECT COUNT(ORDERID) FROM ORDERS GROUP BY ORDERID;"))); // When: - KsqlEngineTestUtil.execute(ksqlEngine, + KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, "CREATE STREAM FOO AS SELECT COUNT(ORDERID) FROM ORDERS GROUP BY ORDERID;", - KSQL_CONFIG, Collections.emptyMap()); + KSQL_CONFIG, Collections.emptyMap() + ); } @Test @@ -797,9 +933,12 @@ public void shouldThrowWhenExecutingQueriesIfCtasCreatesStream() { "CREATE TABLE FOO AS SELECT * FROM ORDERS;"))); // When: - KsqlEngineTestUtil.execute(ksqlEngine, + KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, "CREATE TABLE FOO AS SELECT * FROM ORDERS;", - KSQL_CONFIG, Collections.emptyMap()); + KSQL_CONFIG, Collections.emptyMap() + ); } @Test @@ -816,7 +955,10 @@ public void shouldThrowWhenTryExecuteCsasThatCreatesTable() { "CREATE STREAM FOO AS SELECT COUNT(ORDERID) FROM ORDERS GROUP BY ORDERID;"))); // When: - sandbox.execute(ConfiguredStatement.of(statement, new HashMap<>(), KSQL_CONFIG)); + sandbox.execute( + serviceContext, + ConfiguredStatement.of(statement, new HashMap<>(), KSQL_CONFIG) + ); } @Test @@ -832,7 +974,10 @@ public void shouldThrowWhenTryExecuteCtasThatCreatesStream() { + "Please use CREATE STREAM AS SELECT statement instead."))); // When: - sandbox.execute(ConfiguredStatement.of(statement, new HashMap<>(), KSQL_CONFIG)); + sandbox.execute( + serviceContext, + ConfiguredStatement.of(statement, new HashMap<>(), KSQL_CONFIG) + ); } @Test @@ -889,7 +1034,13 @@ public void shouldThrowOnNoneExecutableDdlStatement() { expectedException.expect(statementText(is("SHOW STREAMS;"))); // When: - KsqlEngineTestUtil.execute(ksqlEngine, "SHOW STREAMS;", KSQL_CONFIG, Collections.emptyMap()); + KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + "SHOW STREAMS;", + KSQL_CONFIG, + Collections.emptyMap() + ); } @Test @@ -909,6 +1060,7 @@ public void shouldNotUpdateMetaStoreDuringTryExecute() { // When: statements .forEach(stmt -> sandbox.execute( + sandboxServiceContext, ConfiguredStatement.of(sandbox.prepare(stmt), new HashMap<>(), KSQL_CONFIG))); // Then: @@ -933,7 +1085,10 @@ public void shouldNotCreateAnyTopicsDuringTryExecute() { // When: statements.forEach( - stmt -> sandbox.execute(ConfiguredStatement.of(sandbox.prepare(stmt), new HashMap<>(), KSQL_CONFIG))); + stmt -> sandbox.execute( + sandboxServiceContext, + ConfiguredStatement.of(sandbox.prepare(stmt), new HashMap<>(), KSQL_CONFIG)) + ); // Then: assertThat("no topics should be created during a tryExecute call", @@ -947,11 +1102,14 @@ public void shouldNotIncrementQueryIdCounterDuringTryExecute() { final PreparedStatement statement = prepare(parse(sql).get(0)); // When: - sandbox.execute(ConfiguredStatement.of(statement, new HashMap<>(), KSQL_CONFIG)); + sandbox.execute( + sandboxServiceContext, + ConfiguredStatement.of(statement, new HashMap<>(), KSQL_CONFIG) + ); // Then: final List queries = KsqlEngineTestUtil - .execute(ksqlEngine, sql, KSQL_CONFIG, Collections.emptyMap()); + .execute(serviceContext, ksqlEngine, sql, KSQL_CONFIG, Collections.emptyMap()); assertThat("query id of actual execute should not be affected by previous tryExecute", ((PersistentQueryMetadata) queries.get(0)).getQueryId(), is(new QueryId("CTAS_FOO_0"))); } @@ -968,7 +1126,10 @@ public void shouldNotRegisterAnySchemasDuringSandboxExecute() throws Exception { final PreparedStatement prepared = prepare(statements.get(1)); // When: - sandbox.execute(ConfiguredStatement.of(prepared, new HashMap<>(), KSQL_CONFIG)); + sandbox.execute( + sandboxServiceContext, + ConfiguredStatement.of(prepared, new HashMap<>(), KSQL_CONFIG) + ); // Then: verify(schemaRegistryClient, never()).register(any(), any()); @@ -1003,7 +1164,10 @@ public void shouldRegisterPersistentQueriesOnlyInSandbox() { "create table bar as select * from test2;").get(0)); // When: - final ExecuteResult result = sandbox.execute(ConfiguredStatement.of(prepared, new HashMap<>(), KSQL_CONFIG)); + final ExecuteResult result = sandbox.execute( + sandboxServiceContext, + ConfiguredStatement.of(prepared, new HashMap<>(), KSQL_CONFIG) + ); // Then: assertThat(result.getQuery(), is(not(Optional.empty()))); @@ -1021,7 +1185,10 @@ public void shouldExecuteDdlStatement() { prepare(parse("CREATE STREAM FOO (a int) WITH (kafka_topic='foo', value_format='json');").get(0)); // When: - final ExecuteResult result = sandbox.execute(ConfiguredStatement.of(statement, new HashMap<>(), KSQL_CONFIG)); + final ExecuteResult result = sandbox.execute( + sandboxServiceContext, + ConfiguredStatement.of(statement, new HashMap<>(), KSQL_CONFIG) + ); // Then: assertThat(result.getCommandResult(), is(Optional.of("Stream created"))); @@ -1074,13 +1241,22 @@ public void shouldBeAbleToPrepareTerminateAndDrop() { @Test public void shouldIgnoreLegacyDeleteTopicPartOfDropCommand() { // Given: - final QueryMetadata query = KsqlEngineTestUtil.execute(ksqlEngine, + final QueryMetadata query = KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, "CREATE STREAM FOO AS SELECT * FROM TEST1;", - KSQL_CONFIG, Collections.emptyMap()).get(0); + KSQL_CONFIG, Collections.emptyMap() + ).get(0); query.close(); // When: - KsqlEngineTestUtil.execute(ksqlEngine, "DROP STREAM FOO DELETE TOPIC;", KSQL_CONFIG, Collections.emptyMap()); + KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + "DROP STREAM FOO DELETE TOPIC;", + KSQL_CONFIG, + Collections.emptyMap() + ); // Then: verifyNoMoreInteractions(topicClient); @@ -1121,6 +1297,7 @@ private void givenStatementAlreadyExecuted( final ParsedStatement statement ) { ksqlEngine.execute( + serviceContext, ConfiguredStatement.of(ksqlEngine.prepare(statement), new HashMap<>(), KSQL_CONFIG)); sandbox = ksqlEngine.createSandbox(serviceContext); } @@ -1128,6 +1305,7 @@ private void givenStatementAlreadyExecuted( private void givenSqlAlreadyExecuted(final String sql) { parse(sql).forEach(stmt -> ksqlEngine.execute( + serviceContext, ConfiguredStatement.of(ksqlEngine.prepare(stmt), new HashMap<>(), KSQL_CONFIG))); sandbox = ksqlEngine.createSandbox(serviceContext); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java index 431072ede56f..dc0e7e82b595 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java @@ -75,18 +75,20 @@ public static KsqlEngine createKsqlEngine( } public static List execute( + final ServiceContext serviceContext, final KsqlEngine engine, final String sql, final KsqlConfig ksqlConfig, final Map overriddenProperties ) { - return execute(engine, sql, ksqlConfig, overriddenProperties, Optional.empty()); + return execute(serviceContext, engine, sql, ksqlConfig, overriddenProperties, Optional.empty()); } /** * @param srClient if supplied, then schemas can be inferred from the schema registry. */ public static List execute( + final ServiceContext serviceContext, final KsqlEngine engine, final String sql, final KsqlConfig ksqlConfig, @@ -100,7 +102,8 @@ public static List execute( .map(DefaultSchemaInjector::new); return statements.stream() - .map(stmt -> execute(engine, stmt, ksqlConfig, overriddenProperties, schemaInjector)) + .map(stmt -> + execute(serviceContext, engine, stmt, ksqlConfig, overriddenProperties, schemaInjector)) .map(ExecuteResult::getQuery) .filter(Optional::isPresent) .map(Optional::get) @@ -109,6 +112,7 @@ public static List execute( @SuppressWarnings({"rawtypes","unchecked"}) private static ExecuteResult execute( + final ServiceContext serviceContext, final KsqlExecutionContext executionContext, final ParsedStatement stmt, final KsqlConfig ksqlConfig, @@ -125,7 +129,7 @@ private static ExecuteResult execute( final ConfiguredStatement reformatted = new SqlFormatInjector(executionContext).inject(withSchema); try { - return executionContext.execute(reformatted); + return executionContext.execute(serviceContext, reformatted); } catch (final KsqlStatementException e) { // use the original statement text in the exception so that tests // can easily check that the failed statement is the input statement diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java index 41db0820ce51..3010831f3a57 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java @@ -158,9 +158,12 @@ private void execInitCreateStreamQueries() { final String messageStreamStr = String.format("CREATE STREAM %s (message varchar) WITH (value_format = 'json', " + "kafka_topic='%s');", messageLogStream, messageLogTopic); - KsqlEngineTestUtil.execute(ksqlEngine, ordersStreamStr, ksqlConfig, Collections.emptyMap()); - KsqlEngineTestUtil.execute(ksqlEngine, usersTableStr, ksqlConfig, Collections.emptyMap()); - KsqlEngineTestUtil.execute(ksqlEngine, messageStreamStr, ksqlConfig, Collections.emptyMap()); + KsqlEngineTestUtil.execute( + serviceContext, ksqlEngine, ordersStreamStr, ksqlConfig, Collections.emptyMap()); + KsqlEngineTestUtil.execute( + serviceContext, ksqlEngine, usersTableStr, ksqlConfig, Collections.emptyMap()); + KsqlEngineTestUtil.execute( + serviceContext, ksqlEngine, messageStreamStr, ksqlConfig, Collections.emptyMap()); } @After @@ -238,7 +241,8 @@ public void testJsonStreamExtractorNested() { private void executePersistentQuery(final String queryString) { final QueryMetadata queryMetadata = KsqlEngineTestUtil - .execute(ksqlEngine, queryString, ksqlConfig, Collections.emptyMap()).get(0); + .execute(serviceContext, ksqlEngine, queryString, ksqlConfig, Collections.emptyMap()) + .get(0); queryMetadata.start(); queryId = ((PersistentQueryMetadata)queryMetadata).getQueryId(); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java index a18d84b0734c..2a751763c67a 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java @@ -346,7 +346,13 @@ private void execInitCreateStreamQueries() { + "kafka_topic='%s' , " + "key='ordertime');", INPUT_STREAM, INPUT_TOPIC); - KsqlEngineTestUtil.execute(ksqlEngine, ordersStreamStr, ksqlConfig, Collections.emptyMap()); + KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + ordersStreamStr, + ksqlConfig, + Collections.emptyMap() + ); } private void executePersistentQuery(final String queryString, @@ -354,7 +360,7 @@ private void executePersistentQuery(final String queryString, final String query = String.format(queryString, params); final QueryMetadata queryMetadata = KsqlEngineTestUtil - .execute(ksqlEngine, query, ksqlConfig, Collections.emptyMap()).get(0); + .execute(serviceContext, ksqlEngine, query, ksqlConfig, Collections.emptyMap()).get(0); queryMetadata.start(); queryId = ((PersistentQueryMetadata) queryMetadata).getQueryId(); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java index b8acf7524ea5..b34b3afa7a30 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java @@ -628,6 +628,7 @@ private void givenConfigWith(final String name, final Object value) { private List execute(final String sql) { return KsqlEngineTestUtil.execute( + serviceContext, ksqlEngine, sql, ksqlConfig, diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index e858c4a6b469..e43a9c3b5acc 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -477,7 +477,7 @@ static KsqlRestApplication buildApplication( restConfig.getCommandProducerProperties()); final StatementExecutor statementExecutor = - new StatementExecutor(ksqlEngine, hybridQueryIdGenerator); + new StatementExecutor(serviceContext, ksqlEngine, hybridQueryIdGenerator); final RootDocument rootDocument = new RootDocument(); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutor.java index 561ada6f5b2e..cb8d9978f161 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutor.java @@ -169,7 +169,7 @@ private void processesQueryFile(final String queries) { final Injector injector = injectorFactory.apply(ksqlEngine, serviceContext); executeStatements( preparedStatements, - new StatementExecutor(ksqlEngine, injector, ksqlConfig) + new StatementExecutor(serviceContext, ksqlEngine, injector, ksqlConfig) ); ksqlEngine.getPersistentQueries().forEach(QueryMetadata::start); @@ -181,6 +181,7 @@ private void validateStatements(final List statements) { sandboxEngine, sandboxEngine.getServiceContext()); final StatementExecutor sandboxExecutor = new StatementExecutor( + sandboxEngine.getServiceContext(), sandboxEngine, injector, ksqlConfig @@ -261,16 +262,19 @@ private static final class StatementExecutor { private static final String SUPPORTED_STATEMENTS = generateSupportedMessage(); + private final ServiceContext serviceContext; private final KsqlExecutionContext executionContext; private final Map configOverrides = new HashMap<>(); private final KsqlConfig ksqlConfig; private final Injector injector; private StatementExecutor( + final ServiceContext serviceContext, final KsqlExecutionContext executionContext, final Injector injector, final KsqlConfig ksqlConfig ) { + this.serviceContext = requireNonNull(serviceContext, "serviceContext"); this.executionContext = requireNonNull(executionContext, "executionContext"); this.injector = requireNonNull(injector, "injector"); this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig"); @@ -333,11 +337,11 @@ private void handleUnsetProperty(final ConfiguredStatement statem } private void handleExecutableDdl(final ConfiguredStatement statement) { - executionContext.execute(statement); + executionContext.execute(serviceContext, statement); } private void handlePersistentQuery(final ConfiguredStatement statement) { - executionContext.execute(statement) + executionContext.execute(serviceContext, statement) .getQuery() .filter(q -> q instanceof PersistentQueryMetadata) .orElseThrow((() -> new KsqlStatementException( diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java index cb546f2ea296..5c7e116f8764 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java @@ -38,6 +38,7 @@ import io.confluent.ksql.rest.server.StatementParser; import io.confluent.ksql.rest.server.resources.KsqlConfigurable; import io.confluent.ksql.rest.util.QueryCapacityUtil; +import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlConstants; @@ -64,6 +65,7 @@ public class StatementExecutor implements KsqlConfigurable { private static final Logger log = LoggerFactory.getLogger(StatementExecutor.class); + private final ServiceContext serviceContext; private final KsqlEngine ksqlEngine; private final StatementParser statementParser; private final HybridQueryIdGenerator queryIdGenerator; @@ -76,10 +78,12 @@ private enum Mode { } public StatementExecutor( + final ServiceContext serviceContext, final KsqlEngine ksqlEngine, final HybridQueryIdGenerator hybridQueryIdGenerator ) { this( + serviceContext, ksqlEngine, new StatementParser(ksqlEngine), hybridQueryIdGenerator @@ -88,10 +92,12 @@ public StatementExecutor( @VisibleForTesting StatementExecutor( + final ServiceContext serviceContext, final KsqlEngine ksqlEngine, final StatementParser statementParser, final HybridQueryIdGenerator hybridQueryIdGenerator ) { + this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); this.statementParser = Objects.requireNonNull(statementParser, "statementParser"); this.queryIdGenerator = @@ -263,7 +269,7 @@ private String executeDdlStatement(final PreparedStatement statement, final C ConfiguredStatement.of(statement, command.getOverwriteProperties(), mergedConfig); return ksqlEngine - .execute(configured) + .execute(serviceContext, configured) .getCommandResult() .get(); } @@ -292,7 +298,7 @@ private void handleLegacyRunScript(final Command command, final Mode mode) { final PreparedStatement prepared = ksqlEngine.prepare(parsed); final ConfiguredStatement configured = ConfiguredStatement.of(prepared, overriddenProperties, ksqlConfig); - ksqlEngine.execute(configured) + ksqlEngine.execute(serviceContext, configured) .getQuery() .ifPresent(queries::add); } @@ -334,7 +340,7 @@ private PersistentQueryMetadata startQuery( queryIdGenerator.activateNewGenerator(offset); } - final QueryMetadata queryMetadata = ksqlEngine.execute(configured) + final QueryMetadata queryMetadata = ksqlEngine.execute(serviceContext, configured) .getQuery() .orElseThrow(() -> new IllegalStateException("Statement did not return a query")); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java index a6891e00fa3b..e1dfd1f2277d 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java @@ -228,6 +228,8 @@ public class StandaloneExecutorTest { @Mock private ServiceContext serviceContext; @Mock + private ServiceContext sandBoxServiceContext; + @Mock private KafkaTopicClient kafkaTopicClient; @Mock private BiFunction injectorFactory; @@ -256,15 +258,16 @@ public void before() throws Exception { when(ksqlEngine.prepare(PARSED_STMT_0)).thenReturn((PreparedStatement) PREPARED_STMT_0); when(ksqlEngine.prepare(PARSED_STMT_1)).thenReturn((PreparedStatement) PREPARED_STMT_1); - when(ksqlEngine.execute(any())).thenReturn(ExecuteResult.of(persistentQuery)); + when(ksqlEngine.execute(any(), any())).thenReturn(ExecuteResult.of(persistentQuery)); when(ksqlEngine.createSandbox(any())).thenReturn(sandBox); when(sandBox.prepare(PARSED_STMT_0)).thenReturn((PreparedStatement) PREPARED_STMT_0); when(sandBox.prepare(PARSED_STMT_1)).thenReturn((PreparedStatement) PREPARED_STMT_1); - when(sandBox.execute(any())).thenReturn(ExecuteResult.of("success")); - when(sandBox.execute(CSAS_CFG_WITH_TOPIC)) + when(sandBox.getServiceContext()).thenReturn(sandBoxServiceContext); + when(sandBox.execute(any(), any())).thenReturn(ExecuteResult.of("success")); + when(sandBox.execute(sandBoxServiceContext, CSAS_CFG_WITH_TOPIC)) .thenReturn(ExecuteResult.of(persistentQuery)); when(injectorFactory.apply(any(), any())).thenReturn(InjectorChain.of(sandBoxSchemaInjector, sandBoxTopicInjector)); @@ -445,7 +448,7 @@ public void shouldRunCsStatement() { standaloneExecutor.start(); // Then: - verify(ksqlEngine).execute(ConfiguredStatement.of(cs, emptyMap(), ksqlConfig)); + verify(ksqlEngine).execute(serviceContext, ConfiguredStatement.of(cs, emptyMap(), ksqlConfig)); } @Test @@ -460,7 +463,7 @@ public void shouldRunCtStatement() { standaloneExecutor.start(); // Then: - verify(ksqlEngine).execute(ConfiguredStatement.of(ct, emptyMap(), ksqlConfig)); + verify(ksqlEngine).execute(serviceContext, ConfiguredStatement.of(ct, emptyMap(), ksqlConfig)); } @Test @@ -479,6 +482,7 @@ public void shouldRunSetStatements() { // Then: verify(ksqlEngine).execute( + serviceContext, ConfiguredStatement.of( cs, ImmutableMap.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), @@ -500,7 +504,7 @@ public void shouldSetPropertyOnlyOnCommandsFollowingTheSetStatement() { standaloneExecutor.start(); // Then: - verify(ksqlEngine).execute(ConfiguredStatement.of( + verify(ksqlEngine).execute(serviceContext, ConfiguredStatement.of( cs, ImmutableMap.of(), ksqlConfig @@ -527,7 +531,7 @@ public void shouldRunUnSetStatements() { standaloneExecutor.start(); // Then: - verify(ksqlEngine).execute(configured); + verify(ksqlEngine).execute(serviceContext, configured); } @Test @@ -538,14 +542,14 @@ public void shouldRunCsasStatements() { final ConfiguredStatement configured = ConfiguredStatement.of(csas, emptyMap(), ksqlConfig); givenQueryFileParsesTo(csas); - when(sandBox.execute(configured)) + when(sandBox.execute(sandBoxServiceContext, configured)) .thenReturn(ExecuteResult.of(persistentQuery)); // When: standaloneExecutor.start(); // Then: - verify(ksqlEngine).execute(configured); + verify(ksqlEngine).execute(serviceContext, configured); } @Test @@ -557,14 +561,14 @@ public void shouldRunCtasStatements() { givenQueryFileParsesTo(ctas); - when(sandBox.execute(configured)) + when(sandBox.execute(sandBoxServiceContext, configured)) .thenReturn(ExecuteResult.of(persistentQuery)); // When: standaloneExecutor.start(); // Then: - verify(ksqlEngine).execute(configured); + verify(ksqlEngine).execute(serviceContext, configured); } @Test @@ -576,14 +580,14 @@ public void shouldRunInsertIntoStatements() { givenQueryFileParsesTo(insertInto); - when(sandBox.execute(configured)) + when(sandBox.execute(sandBoxServiceContext, configured)) .thenReturn(ExecuteResult.of(persistentQuery)); // When: standaloneExecutor.start(); // Then: - verify(ksqlEngine).execute(configured); + verify(ksqlEngine).execute(serviceContext, configured); } @Test @@ -591,7 +595,7 @@ public void shouldThrowIfExecutingPersistentQueryDoesNotReturnQuery() { // Given: givenFileContainsAPersistentQuery(); - when(sandBox.execute(any())) + when(sandBox.execute(any(), any())) .thenReturn(ExecuteResult.of("well, this is unexpected.")); expectedException.expect(KsqlException.class); @@ -606,7 +610,7 @@ public void shouldThrowIfExecutingPersistentQueryReturnsNonPersistentMetaData() // Given: givenFileContainsAPersistentQuery(); - when(sandBox.execute(any())) + when(sandBox.execute(any(), any())) .thenReturn(ExecuteResult.of(nonPersistentQueryMd)); expectedException.expect(KsqlException.class); @@ -628,7 +632,7 @@ public void shouldThrowIfParseThrows() { @Test(expected = RuntimeException.class) public void shouldThrowIfExecuteThrows() { // Given: - when(ksqlEngine.execute(any())).thenThrow(new RuntimeException("Boom!")); + when(ksqlEngine.execute(any(), any())).thenThrow(new RuntimeException("Boom!")); // When: standaloneExecutor.start(); @@ -668,7 +672,7 @@ public void shouldStartQueries() { public void shouldNotStartValidationPhaseQueries() { // Given: givenFileContainsAPersistentQuery(); - when(sandBox.execute(any())).thenReturn(ExecuteResult.of(sandBoxQuery)); + when(sandBox.execute(any(), any())).thenReturn(ExecuteResult.of(sandBoxQuery)); // When: standaloneExecutor.start(); @@ -689,9 +693,9 @@ public void shouldOnlyPrepareNextStatementOncePreviousStatementHasBeenExecuted() // Then: final InOrder inOrder = inOrder(ksqlEngine); inOrder.verify(ksqlEngine).prepare(PARSED_STMT_0); - inOrder.verify(ksqlEngine).execute(CFG_STMT_0); + inOrder.verify(ksqlEngine).execute(serviceContext, CFG_STMT_0); inOrder.verify(ksqlEngine).prepare(PARSED_STMT_1); - inOrder.verify(ksqlEngine).execute(CFG_STMT_1); + inOrder.verify(ksqlEngine).execute(serviceContext, CFG_STMT_1); } @Test @@ -729,8 +733,8 @@ public void shouldSupportSchemaInference() { standaloneExecutor.start(); // Then: - verify(sandBox).execute(CFG_0_WITH_SCHEMA); - verify(ksqlEngine).execute(CFG_1_WITH_SCHEMA); + verify(sandBox).execute(sandBoxServiceContext, CFG_0_WITH_SCHEMA); + verify(ksqlEngine).execute(serviceContext, CFG_1_WITH_SCHEMA); } @Test @@ -745,7 +749,7 @@ public void shouldSupportTopicInference() { standaloneExecutor.start(); // Then: - verify(sandBox).execute(CSAS_CFG_WITH_TOPIC); + verify(sandBox).execute(sandBoxServiceContext, CSAS_CFG_WITH_TOPIC); } private void givenExecutorWillFailOnNoQueries() { diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 9cf9727c256c..d705ae0132bf 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -186,7 +186,11 @@ private class KsqlServer { } ); - this.statementExecutor = new StatementExecutor(ksqlEngine, hybridQueryIdGenerator); + this.statementExecutor = new StatementExecutor( + serviceContext, + ksqlEngine, + hybridQueryIdGenerator + ); this.commandRunner = new CommandRunner( statementExecutor, diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java index 878a03b881dd..e96b2cb82f88 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java @@ -139,8 +139,18 @@ public void setUp() { final StatementParser statementParser = new StatementParser(ksqlEngine); - statementExecutor = new StatementExecutor(ksqlEngine, statementParser, hybridQueryIdGenerator); - statementExecutorWithMocks = new StatementExecutor(mockEngine, mockParser, mockQueryIdGenerator); + statementExecutor = new StatementExecutor( + serviceContext, + ksqlEngine, + statementParser, + hybridQueryIdGenerator + ); + statementExecutorWithMocks = new StatementExecutor( + serviceContext, + mockEngine, + mockParser, + mockQueryIdGenerator + ); statementExecutor.configure(ksqlConfig); statementExecutorWithMocks.configure(ksqlConfig); @@ -171,7 +181,12 @@ public void shouldThrowOnConfigureIfAppServerNotSet() { @Test(expected = IllegalStateException.class) public void shouldThrowOnHandleStatementIfNotConfigured() { // Given: - statementExecutor = new StatementExecutor(mockEngine, mockParser, mockQueryIdGenerator); + statementExecutor = new StatementExecutor( + serviceContext, + mockEngine, + mockParser, + mockQueryIdGenerator + ); // When: statementExecutor.handleStatement(queuedCommand); @@ -180,7 +195,12 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { @Test(expected = IllegalStateException.class) public void shouldThrowOnHandleRestoreIfNotConfigured() { // Given: - statementExecutor = new StatementExecutor(mockEngine, mockParser, mockQueryIdGenerator); + statementExecutor = new StatementExecutor( + serviceContext, + mockEngine, + mockParser, + mockQueryIdGenerator + ); // When: statementExecutor.handleRestore(queuedCommand); @@ -229,7 +249,7 @@ public void shouldBuildQueriesWithPersistedConfig() { final PreparedStatement ddlStatement = realParser.parseSingleStatement(ddlText); final ConfiguredStatement configuredStatement = ConfiguredStatement.of(ddlStatement, emptyMap(), originalConfig); - ksqlEngine.execute(configuredStatement); + ksqlEngine.execute(serviceContext, configuredStatement); final PreparedStatement csasStatement = realParser.parseSingleStatement(statementText); @@ -254,7 +274,7 @@ public void shouldBuildQueriesWithPersistedConfig() { expect(mockParser.parseSingleStatement(statementText)).andReturn(csasStatement); expect(mockEngine.getPersistentQueries()).andReturn(ImmutableList.of()); - expect(mockEngine.execute(eq(configuredCsas))) + expect(mockEngine.execute(eq(serviceContext), eq(configuredCsas))) .andReturn(ExecuteResult.of(mockQueryMetadata)); mockQueryMetadata.start(); expectLastCall(); @@ -461,7 +481,7 @@ private PersistentQueryMetadata mockReplayCSAS( .andReturn(csas); expect(mockMetaStore.getSource(SourceName.of(name))).andStubReturn(null); expect(mockEngine.getPersistentQueries()).andReturn(ImmutableList.of()); - expect(mockEngine.execute(eqConfigured(csas))) + expect(mockEngine.execute(eq(serviceContext), eqConfigured(csas))) .andReturn(ExecuteResult.of(mockQuery)); return mockQuery; } @@ -484,7 +504,7 @@ private PersistentQueryMetadata mockReplayRunScript( expect(mockEngine.parse(eq(queryStatement))).andReturn(parsedStatements); expect(mockEngine.prepare(parsedStatements.get(0))) .andReturn((PreparedStatement)preparedStatement); - expect(mockEngine.execute(eqConfigured(preparedStatement))) + expect(mockEngine.execute(eq(serviceContext), eqConfigured(preparedStatement))) .andReturn(ExecuteResult.of(mockQuery)); expect(mockEngine.getPersistentQueries()).andReturn(ImmutableList.of()); return mockQuery; @@ -528,7 +548,10 @@ public void shouldCascade4Dot1DropStreamCommand() { expectLastCall(); expect(mockEngine - .execute(eqConfigured(PreparedStatement.of("DROP", mockDropStream)))) + .execute( + eq(serviceContext), + eqConfigured(PreparedStatement.of("DROP", mockDropStream))) + ) .andReturn(ExecuteResult.of("SUCCESS")); replayAll(); @@ -563,7 +586,7 @@ public void shouldNotCascadeDropStreamCommand() { final DropStream mockDropStream = mockDropStream("foo"); final PreparedStatement statement = PreparedStatement.of(drop, mockDropStream); - expect(mockEngine.execute(eqConfigured(statement))) + expect(mockEngine.execute(eq(serviceContext), eqConfigured(statement))) .andReturn(ExecuteResult.of("SUCCESS")); replayAll(); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index 9781addd59ff..cd6a41c3d0f1 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -1845,7 +1845,7 @@ public void shouldThrowServerErrorOnFailedToDistribute() { private Answer executeAgainstEngine(final String sql) { return invocation -> { - KsqlEngineTestUtil.execute(ksqlEngine, sql, ksqlConfig, emptyMap()); + KsqlEngineTestUtil.execute(serviceContext, ksqlEngine, sql, ksqlConfig, emptyMap()); return null; }; } @@ -1889,8 +1889,13 @@ private void givenMockEngine() { private List createQueries( final String sql, final Map overriddenProperties) { - return KsqlEngineTestUtil.execute(ksqlEngine, sql, ksqlConfig, overriddenProperties) - .stream() + return KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + sql, + ksqlConfig, + overriddenProperties + ).stream() .map(PersistentQueryMetadata.class::cast) .collect(Collectors.toList()); } @@ -2009,7 +2014,13 @@ private void validateQueryDescription( final Map overriddenProperties, final KsqlEntity entity) { final QueryMetadata queryMetadata = KsqlEngineTestUtil - .execute(ksqlEngine, ksqlQueryString, ksqlConfig, overriddenProperties).get(0); + .execute( + serviceContext, + ksqlEngine, + ksqlQueryString, + ksqlConfig, + overriddenProperties) + .get(0); validateQueryDescription(queryMetadata, overriddenProperties, entity); }