diff --git a/ksql-common/src/main/java/io/confluent/ksql/config/KsqlConfigResolver.java b/ksql-common/src/main/java/io/confluent/ksql/config/KsqlConfigResolver.java index cde64ac00e2f..f9184ec3bad4 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/config/KsqlConfigResolver.java +++ b/ksql-common/src/main/java/io/confluent/ksql/config/KsqlConfigResolver.java @@ -17,9 +17,11 @@ import static io.confluent.ksql.util.KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX; import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; +import static io.confluent.ksql.util.KsqlRequestConfig.KSQL_REQUEST_CONFIG_PROPERTY_PREFIX; import com.google.common.collect.ImmutableList; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlRequestConfig; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -39,6 +41,7 @@ public class KsqlConfigResolver implements ConfigResolver { private static final ConfigDef CONSUMER_CONFIG_DEF = getConfigDef(ConsumerConfig.class); private static final ConfigDef PRODUCER_CONFIG_DEF = getConfigDef(ProducerConfig.class); private static final ConfigDef KSQL_CONFIG_DEF = KsqlConfig.CURRENT_DEF; + private static final ConfigDef REQUEST_CONFIG_DEF = KsqlRequestConfig.CURRENT_DEF; private static final List STREAM_CONFIG_DEFS = ImmutableList.of( new PrefixedConfig(StreamsConfig.CONSUMER_PREFIX, CONSUMER_CONFIG_DEF), @@ -50,7 +53,9 @@ public class KsqlConfigResolver implements ConfigResolver { @Override public Optional resolve(final String propertyName, final boolean strict) { - if (propertyName.startsWith(KSQL_CONFIG_PROPERTY_PREFIX) + if (propertyName.startsWith(KSQL_REQUEST_CONFIG_PROPERTY_PREFIX)) { + return resolveRequestConfig(propertyName); + } else if (propertyName.startsWith(KSQL_CONFIG_PROPERTY_PREFIX) && !propertyName.startsWith(KSQL_STREAMS_PREFIX)) { return resolveKsqlConfig(propertyName); } @@ -100,6 +105,15 @@ private static Optional resolveKsqlConfig(final String propertyName) return Optional.empty(); } + private static Optional resolveRequestConfig(final String propertyName) { + final Optional possibleItem = resolveConfig("", REQUEST_CONFIG_DEF, propertyName); + if (possibleItem.isPresent()) { + return possibleItem; + } + + return Optional.empty(); + } + private static Optional resolveConfig( final String prefix, final ConfigDef def, diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlRequestConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlRequestConfig.java new file mode 100644 index 000000000000..3b29751bfa86 --- /dev/null +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlRequestConfig.java @@ -0,0 +1,50 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.util; + +import java.util.Map; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Type; + +public class KsqlRequestConfig extends AbstractConfig { + + public static final String KSQL_REQUEST_CONFIG_PROPERTY_PREFIX = "request.ksql."; + + public static final ConfigDef CURRENT_DEF = buildConfigDef(); + + public static final String KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING = + "request.ksql.query.pull.skip.forwarding"; + public static final Boolean KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING_DEFAULT = false; + private static final String KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING_DOC = + "Controls whether a ksql host forwards a pull query request to another host"; + + private static ConfigDef buildConfigDef() { + final ConfigDef configDef = new ConfigDef() + .define( + KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING, + Type.BOOLEAN, + KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING_DEFAULT, + ConfigDef.Importance.MEDIUM, + KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING_DOC + ); + return configDef; + } + + public KsqlRequestConfig(final Map props) { + super(CURRENT_DEF, props); + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java index 554a15d642e1..e1f2170d305c 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java @@ -152,7 +152,7 @@ public void execute( final InsertValues insertValues = statement.getStatement(); final MetaStore metaStore = executionContext.getMetaStore(); final KsqlConfig config = statement.getConfig() - .cloneWithPropertyOverwrite(statement.getOverrides()); + .cloneWithPropertyOverwrite(statement.getConfigOverrides()); final DataSource dataSource = getDataSource(config, metaStore, insertValues); @@ -210,7 +210,7 @@ private ProducerRecord buildRecord( final InsertValues insertValues = statement.getStatement(); final KsqlConfig config = statement.getConfig() - .cloneWithPropertyOverwrite(statement.getOverrides()); + .cloneWithPropertyOverwrite(statement.getConfigOverrides()); try { final RowData row = extractRow( diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java index 7cdec5c1c471..475d5bed585c 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java @@ -176,7 +176,8 @@ public KsqlPlan plan( final ConfiguredStatement statement ) { return EngineExecutor - .create(primaryContext, serviceContext, statement.getConfig(), statement.getOverrides()) + .create( + primaryContext, serviceContext, statement.getConfig(), statement.getConfigOverrides()) .plan(statement); } @@ -198,7 +199,7 @@ public ExecuteResult execute( serviceContext, ConfiguredKsqlPlan.of( plan(serviceContext, statement), - statement.getOverrides(), + statement.getConfigOverrides(), statement.getConfig() ) ); @@ -211,7 +212,11 @@ public TransientQueryMetadata executeQuery( ) { try { final TransientQueryMetadata query = EngineExecutor - .create(primaryContext, serviceContext, statement.getConfig(), statement.getOverrides()) + .create( + primaryContext, + serviceContext, + statement.getConfig(), + statement.getConfigOverrides()) .executeQuery(statement); registerQuery(query); return query; @@ -230,7 +235,12 @@ public QueryMetadata executeQuery( final Consumer rowConsumer ) { final QueryMetadata query = EngineExecutor - .create(primaryContext, serviceContext, statement.getConfig(), statement.getOverrides()) + .create( + primaryContext, + serviceContext, + statement.getConfig(), + statement.getConfigOverrides() + ) .executeQuery(statement, rowConsumer); registerQuery(query); return query; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java index 9a3a73c91c41..c570952bd70f 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java @@ -101,7 +101,7 @@ public KsqlPlan plan( engineContext, serviceContext, statement.getConfig(), - statement.getOverrides() + statement.getConfigOverrides() ).plan(statement); } @@ -127,7 +127,7 @@ public ExecuteResult execute( serviceContext, ConfiguredKsqlPlan.of( plan(serviceContext, statement), - statement.getOverrides(), + statement.getConfigOverrides(), statement.getConfig() ) ); @@ -142,7 +142,7 @@ public TransientQueryMetadata executeQuery( engineContext, serviceContext, statement.getConfig(), - statement.getOverrides() + statement.getConfigOverrides() ).executeQuery(statement); } @@ -153,7 +153,7 @@ public QueryMetadata executeQuery(final ServiceContext serviceContext, engineContext, serviceContext, statement.getConfig(), - statement.getOverrides() + statement.getConfigOverrides() ).executeQuery(statement, rowConsumer); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java b/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java index d238524c7a45..9916cce00496 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java @@ -96,7 +96,7 @@ private Optional> forCreateStatement( final CreateSource withSchema = addSchemaFields(statement, valueSchema); final PreparedStatement prepared = buildPreparedStatement(withSchema); final ConfiguredStatement configured = ConfiguredStatement - .of(prepared, statement.getOverrides(), statement.getConfig()); + .of(prepared, statement.getConfigOverrides(), statement.getConfig()); return Optional.of(configured); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/statement/ConfiguredStatement.java b/ksql-engine/src/main/java/io/confluent/ksql/statement/ConfiguredStatement.java index 0e0e0fbbf0b8..cf113e5b257a 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/statement/ConfiguredStatement.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/statement/ConfiguredStatement.java @@ -35,7 +35,9 @@ public final class ConfiguredStatement { private final PreparedStatement statement; @EffectivelyImmutable - private final ImmutableMap overrides; + private final ImmutableMap configOverrides; + @EffectivelyImmutable + private final ImmutableMap requestProperties; private final KsqlConfig config; public static ConfiguredStatement of( @@ -46,13 +48,36 @@ public static ConfiguredStatement of( return new ConfiguredStatement<>(statement, overrides, config); } + public static ConfiguredStatement of( + final PreparedStatement statement, + final Map overrides, + final Map requestProperties, + final KsqlConfig config + ) { + return new ConfiguredStatement<>(statement, overrides, requestProperties, config); + } + private ConfiguredStatement( final PreparedStatement statement, - final Map overrides, + final Map configOverrides, final KsqlConfig config ) { this.statement = requireNonNull(statement, "statement"); - this.overrides = ImmutableMap.copyOf(requireNonNull(overrides, "overrides")); + this.configOverrides = ImmutableMap.copyOf(requireNonNull(configOverrides, "overrides")); + this.config = requireNonNull(config, "config"); + this.requestProperties = ImmutableMap.of(); + } + + private ConfiguredStatement( + final PreparedStatement statement, + final Map configOverrides, + final Map requestProperties, + final KsqlConfig config + ) { + this.statement = requireNonNull(statement, "statement"); + this.configOverrides = ImmutableMap.copyOf(requireNonNull(configOverrides, "overrides")); + this.requestProperties = ImmutableMap.copyOf(requireNonNull( + requestProperties, "serverProperties")); this.config = requireNonNull(config, "config"); } @@ -69,8 +94,12 @@ public String getStatementText() { return statement.getStatementText(); } - public Map getOverrides() { - return overrides; + public Map getConfigOverrides() { + return configOverrides; + } + + public Map getRequestProperties() { + return requestProperties; } public KsqlConfig getConfig() { @@ -78,18 +107,22 @@ public KsqlConfig getConfig() { } public ConfiguredStatement withConfig(final KsqlConfig config) { - return new ConfiguredStatement<>(this.statement, this.overrides, config); + return new ConfiguredStatement<>(this.statement, this.configOverrides, config); } - public ConfiguredStatement withProperties(final Map properties) { + public ConfiguredStatement withConfigOverrides(final Map properties) { return new ConfiguredStatement<>(this.statement, properties, this.config); } + public ConfiguredStatement withRequestProperties(final Map properties) { + return new ConfiguredStatement<>(this.statement, this.configOverrides, properties, this.config); + } + public ConfiguredStatement withStatement( final String statementText, final T statement) { return new ConfiguredStatement<>( - PreparedStatement.of(statementText, statement), this.overrides, this.config); + PreparedStatement.of(statementText, statement), this.configOverrides, this.config); } @Override @@ -102,20 +135,21 @@ public boolean equals(final Object o) { } final ConfiguredStatement that = (ConfiguredStatement) o; return Objects.equals(statement, that.statement) - && Objects.equals(overrides, that.overrides) + && Objects.equals(configOverrides, that.configOverrides) && Objects.equals(config, that.config); } @Override public int hashCode() { - return Objects.hash(statement, overrides, config); + return Objects.hash(statement, configOverrides, config); } @Override public String toString() { return "ConfiguredStatement{" + "statement=" + statement - + ", overrides=" + overrides + + ", configOverrides=" + configOverrides + + ", requestProperties=" + requestProperties + ", config=" + config + '}'; } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java index b05adde6dcbf..b6f897be5575 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java @@ -137,7 +137,7 @@ private ConfiguredStatement injectForCreateAsSelec final TopicProperties.Builder topicPropertiesBuilder ) { final String prefix = - statement.getOverrides().getOrDefault( + statement.getConfigOverrides().getOrDefault( KsqlConfig.KSQL_OUTPUT_TOPIC_NAME_PREFIX_CONFIG, statement.getConfig().getString(KsqlConfig.KSQL_OUTPUT_TOPIC_NAME_PREFIX_CONFIG)) .toString(); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/statement/ConfiguredStatementTest.java b/ksql-engine/src/test/java/io/confluent/ksql/statement/ConfiguredStatementTest.java index 09f56947de60..c69fe900a0de 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/statement/ConfiguredStatementTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/statement/ConfiguredStatementTest.java @@ -50,7 +50,7 @@ public void shouldTakeDefensiveCopyOfProperties() { props.put("other", "thing"); // Then: - assertThat(statement.getOverrides(), is(ImmutableMap.of("this", "that"))); + assertThat(statement.getConfigOverrides(), is(ImmutableMap.of("this", "that"))); } @Test @@ -60,6 +60,6 @@ public void shouldReturnImmutableProperties() { .of(prepared, new HashMap<>(), CONFIG); // Then: - assertThat(statement.getOverrides(), is(instanceOf(ImmutableMap.class))); + assertThat(statement.getConfigOverrides(), is(instanceOf(ImmutableMap.class))); } } \ No newline at end of file diff --git a/ksql-execution/src/main/java/io/confluent/ksql/services/DisabledKsqlClient.java b/ksql-execution/src/main/java/io/confluent/ksql/services/DisabledKsqlClient.java index 954a1943b53a..4ae5e246f90e 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/services/DisabledKsqlClient.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/services/DisabledKsqlClient.java @@ -46,7 +46,8 @@ public RestResponse makeKsqlRequest(final URI serverEndPoint, fi public RestResponse> makeQueryRequest( final URI serverEndPoint, final String sql, - final Map properties + final Map configOverrides, + final Map requestProperties ) { throw new UnsupportedOperationException("KSQL client is disabled"); } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java b/ksql-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java index 658a3ebee040..d9826965abfe 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java @@ -34,10 +34,19 @@ RestResponse makeKsqlRequest( String sql ); + /** + * Send pull query request to remote Ksql server. + * @param serverEndPoint the remote destination + * @param sql the pull query statement + * @param configOverrides the config overrides provided by the client + * @param requestProperties the request metadata provided by the server + * @return the result of pull query evaluation + */ RestResponse> makeQueryRequest( URI serverEndPoint, String sql, - Map properties + Map configOverrides, + Map requestProperties ); /** diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java index 94e08051cf82..6024ced941bf 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java @@ -407,7 +407,7 @@ private Optional planStatement(final ParsedStatement stmt) { return Optional.of( ConfiguredKsqlPlan.of( rewritePlan(plan), - reformatted.getOverrides(), + reformatted.getConfigOverrides(), reformatted.getConfig() ) ); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index d2c62fa746be..473bda03d832 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -235,8 +235,7 @@ public static SourceName getCommandsStreamName() { this.routingFilterFactory = initializeRoutingFilterFactory( ksqlConfigNoPort, heartbeatAgent, lagReportingAgent); - this.pullQueryExecutor = new PullQueryExecutor( - ksqlEngine, heartbeatAgent, routingFilterFactory); + this.pullQueryExecutor = new PullQueryExecutor(ksqlEngine, routingFilterFactory); } @Override @@ -663,7 +662,7 @@ static KsqlRestApplication buildApplication( heartbeatAgent, lagReportingAgent); final PullQueryExecutor pullQueryExecutor = new PullQueryExecutor( - ksqlEngine, heartbeatAgent, routingFilterFactory); + ksqlEngine, routingFilterFactory); final StreamedQueryResource streamedQueryResource = new StreamedQueryResource( ksqlEngine, commandStore, @@ -790,18 +789,27 @@ private static Optional initializeLagReportingAgent( } private static RoutingFilterFactory initializeRoutingFilterFactory( - final KsqlConfig ksqlConfig, + final KsqlConfig configWithApplicationServer, final Optional heartbeatAgent, final Optional lagReportingAgent) { return (routingOptions, hosts, active, applicationQueryId, storeName, partition) -> { final ImmutableList.Builder filterBuilder = ImmutableList.builder(); - if (!ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PULL_ENABLE_STANDBY_READS)) { - filterBuilder.add(new ActiveHostFilter(active)); + + // If the lookup is for a forwarded request, apply only MaxLagFilter for localhost + if (routingOptions.skipForwardRequest()) { + MaximumLagFilter.create(lagReportingAgent, routingOptions, hosts, applicationQueryId, + storeName, partition) + .map(filterBuilder::add); + } else { + if (!configWithApplicationServer.getBoolean( + KsqlConfig.KSQL_QUERY_PULL_ENABLE_STANDBY_READS)) { + filterBuilder.add(new ActiveHostFilter(active)); + } + filterBuilder.add(new LivenessFilter(heartbeatAgent)); + MaximumLagFilter.create(lagReportingAgent, routingOptions, hosts, applicationQueryId, + storeName, partition) + .map(filterBuilder::add); } - filterBuilder.add(new LivenessFilter(heartbeatAgent)); - MaximumLagFilter.create(lagReportingAgent, routingOptions, hosts, applicationQueryId, - storeName, partition) - .map(filterBuilder::add); return new RoutingFilters(filterBuilder.build()); }; } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java index 51ddf183df4d..5ab66eb2c734 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java @@ -79,7 +79,7 @@ public static Command of(final ConfiguredKsqlPlan configuredPlan) { public static Command of(final ConfiguredStatement configuredStatement) { return new Command( configuredStatement.getStatementText(), - configuredStatement.getOverrides(), + configuredStatement.getConfigOverrides(), configuredStatement.getConfig().getAllConfigPropsWithSecretsObfuscated(), Optional.empty() ); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/ValidatedCommandFactory.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/ValidatedCommandFactory.java index b157bc48c5f9..ebee2d9a5bbb 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/ValidatedCommandFactory.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/ValidatedCommandFactory.java @@ -103,11 +103,11 @@ private Command createForPlannedQuery( serviceContext, ConfiguredKsqlPlan.of( plan, - statement.getOverrides(), + statement.getConfigOverrides(), statement.getConfig() ) ); return Command.of( - ConfiguredKsqlPlan.of(plan, statement.getOverrides(), statement.getConfig())); + ConfiguredKsqlPlan.of(plan, statement.getConfigOverrides(), statement.getConfig())); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java index b8aea19688b9..aa5681a6a770 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java @@ -105,14 +105,14 @@ private static QueryDescription explainStatement( metadata = sandbox.executeQuery( serviceContext, ConfiguredStatement.of( - preparedStatement, explain.getOverrides(), explain.getConfig()).cast() + preparedStatement, explain.getConfigOverrides(), explain.getConfig()).cast() ); } else { metadata = sandbox .execute( serviceContext, ConfiguredStatement - .of(preparedStatement, explain.getOverrides(), explain.getConfig())) + .of(preparedStatement, explain.getConfigOverrides(), explain.getConfig())) .getQuery() .orElseThrow(() -> new IllegalStateException("The provided statement did not run a ksql query")); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListPropertiesExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListPropertiesExecutor.java index e5a76a99f22a..51c2ef35ccb9 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListPropertiesExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListPropertiesExecutor.java @@ -76,7 +76,7 @@ private static List mergedProperties( final List mergedProperties = new ArrayList<>(); statement.getConfig() - .cloneWithPropertyOverwrite(statement.getOverrides()) + .cloneWithPropertyOverwrite(statement.getConfigOverrides()) .getAllConfigPropsWithSecretsObfuscated() .forEach((key, value) -> mergedProperties.add(new Property(key, "KSQL", value))); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java index 06cc270af2d2..d107b83a5652 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java @@ -75,7 +75,6 @@ import io.confluent.ksql.rest.entity.StreamedRow.Header; import io.confluent.ksql.rest.entity.TableRowsEntity; import io.confluent.ksql.rest.entity.TableRowsEntityFactory; -import io.confluent.ksql.rest.server.HeartbeatAgent; import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.schema.ksql.DefaultSqlValueCoercer; import io.confluent.ksql.schema.ksql.FormatOptions; @@ -89,6 +88,7 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.KsqlRequestConfig; import io.confluent.ksql.util.KsqlServerException; import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.PersistentQueryMetadata; @@ -126,16 +126,13 @@ public final class PullQueryExecutor { VALID_WINDOW_BOUNDS_TYPES.toString(); private final KsqlExecutionContext executionContext; - private final Optional heartbeatAgent; private final RoutingFilterFactory routingFilterFactory; public PullQueryExecutor( final KsqlExecutionContext executionContext, - final Optional heartbeatAgent, final RoutingFilterFactory routingFilterFactory ) { this.executionContext = Objects.requireNonNull(executionContext, "executionContext"); - this.heartbeatAgent = Objects.requireNonNull(heartbeatAgent, "heartbeatAgent"); this.routingFilterFactory = Objects.requireNonNull(routingFilterFactory, "routingFilterFactory"); } @@ -217,7 +214,7 @@ private TableRowsEntity handlePullQuery( final PullQueryContext pullQueryContext ) { final RoutingOptions routingOptions = new ConfigRoutingOptions( - statement.getConfig(), statement.getOverrides()); + statement.getConfig(), statement.getConfigOverrides(), statement.getRequestProperties()); // Get active and standby nodes for this key final Locator locator = pullQueryContext.mat.locator(); @@ -252,7 +249,6 @@ private TableRowsEntity routeQuery( final ServiceContext serviceContext, final PullQueryContext pullQueryContext ) { - if (node.isLocal()) { LOG.debug("Query {} executed locally at host {} at timestamp {}.", statement.getStatementText(), node.location(), System.currentTimeMillis()); @@ -318,6 +314,63 @@ TableRowsEntity queryRowsLocally( ); } + @VisibleForTesting + TableRowsEntity forwardTo( + final KsqlNode owner, + final ConfiguredStatement statement, + final ServiceContext serviceContext + ) { + // Add skip forward flag to properties + final Map requestProperties = ImmutableMap.of( + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING, true); + final RestResponse> response = serviceContext + .getKsqlClient() + .makeQueryRequest( + owner.location(), + statement.getStatementText(), + statement.getConfigOverrides(), + requestProperties + ); + + if (response.isErroneous()) { + throw new KsqlServerException("Forwarding attempt failed: " + response.getErrorMessage()); + } + + final List streamedRows = response.getResponse(); + if (streamedRows.isEmpty()) { + throw new KsqlServerException("Invalid empty response from forwarding call"); + } + + // Temporary code to convert from QueryStream to TableRowsEntity + // Tracked by: https://github.com/confluentinc/ksql/issues/3865 + final Header header = streamedRows.get(0).getHeader() + .orElseThrow(() -> new KsqlServerException("Expected header in first row")); + + final ImmutableList.Builder> rows = ImmutableList.builder(); + + for (final StreamedRow row : streamedRows.subList(1, streamedRows.size())) { + if (row.getErrorMessage().isPresent()) { + throw new KsqlStatementException( + row.getErrorMessage().get().getMessage(), + statement.getStatementText() + ); + } + + if (!row.getRow().isPresent()) { + throw new KsqlServerException("Unexpected forwarding response"); + } + + rows.add(row.getRow().get().values()); + } + + return new TableRowsEntity( + statement.getStatementText(), + header.getQueryId(), + header.getSchema(), + rows.build() + ); + } + private QueryId uniqueQueryId() { return new QueryId("query_" + System.currentTimeMillis()); } @@ -742,7 +795,7 @@ private List> handleSelects( } final KsqlConfig ksqlConfig = statement.getConfig() - .cloneWithPropertyOverwrite(statement.getOverrides()); + .cloneWithPropertyOverwrite(statement.getConfigOverrides()); final SelectValueMapper select = SelectValueMapperFactory.create( analysis.getSelectExpressions(), @@ -853,55 +906,6 @@ private SourceName getSourceName(final ImmutableAnalysis analysis) { return source.getName(); } - @VisibleForTesting - TableRowsEntity forwardTo( - final KsqlNode owner, - final ConfiguredStatement statement, - final ServiceContext serviceContext - ) { - final RestResponse> response = serviceContext - .getKsqlClient() - .makeQueryRequest(owner.location(), statement.getStatementText(), statement.getOverrides()); - - if (response.isErroneous()) { - throw new KsqlServerException("Proxy attempt failed: " + response.getErrorMessage()); - } - - final List streamedRows = response.getResponse(); - if (streamedRows.isEmpty()) { - throw new KsqlServerException("Invalid empty response from proxy call"); - } - - // Temporary code to convert from QueryStream to TableRowsEntity - // Tracked by: https://github.com/confluentinc/ksql/issues/3865 - final Header header = streamedRows.get(0).getHeader() - .orElseThrow(() -> new KsqlServerException("Expected header in first row")); - - final ImmutableList.Builder> rows = ImmutableList.builder(); - - for (final StreamedRow row : streamedRows.subList(1, streamedRows.size())) { - if (row.getErrorMessage().isPresent()) { - throw new KsqlStatementException( - row.getErrorMessage().get().getMessage(), - statement.getStatementText() - ); - } - - if (!row.getRow().isPresent()) { - throw new KsqlServerException("Unexpected proxy response"); - } - - rows.add(row.getRow().get().values()); - } - - return new TableRowsEntity( - statement.getStatementText(), - header.getQueryId(), - header.getSchema(), - rows.build() - ); - } - private KsqlException notMaterializedException(final SourceName sourceTable) { return new KsqlException("'" + sourceTable.toString(FormatOptions.noEscape()) + "' is not materialized. " @@ -968,23 +972,43 @@ public Optional visitQualifiedColumnReference( private static final class ConfigRoutingOptions implements RoutingOptions { private final KsqlConfig ksqlConfig; - private final Map overrides; + private final Map configOverrides; + private final Map requestProperties; - ConfigRoutingOptions(final KsqlConfig ksqlConfig, final Map overrides) { + ConfigRoutingOptions( + final KsqlConfig ksqlConfig, + final Map configOverrides, + final Map requestProperties + ) { this.ksqlConfig = ksqlConfig; - this.overrides = overrides; + this.configOverrides = configOverrides; + this.requestProperties = requestProperties; } private long getLong(final String key) { - if (overrides.containsKey(key)) { - return (Long) overrides.get(key); + if (configOverrides.containsKey(key)) { + return (Long) configOverrides.get(key); } return ksqlConfig.getLong(key); } + private boolean getForwardedFlag(final String key) { + if (requestProperties.containsKey(key)) { + return (Boolean) requestProperties.get(key); + } + return KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING_DEFAULT; + } + @Override public long getOffsetLagAllowed() { return getLong(KsqlConfig.KSQL_QUERY_PULL_MAX_ALLOWED_OFFSET_LAG_CONFIG); } + + @Override + public boolean skipForwardRequest() { + return getForwardedFlag(KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING); + } + + } } \ No newline at end of file diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index 5ee904fbff8f..3d476bb6bffb 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -250,7 +250,7 @@ public Response handleKsqlStatements( SandboxedServiceContext.create(securityContext.getServiceContext()), statements, new SessionProperties( - request.getStreamsProperties(), + request.getConfigOverrides(), localHost, localUrl ), @@ -261,7 +261,7 @@ public Response handleKsqlStatements( securityContext, statements, new SessionProperties( - request.getStreamsProperties(), + request.getConfigOverrides(), localHost, localUrl ) diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index e0b64742218a..2a4bb66f4a2e 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -201,21 +201,22 @@ private Response handleStatement( return handlePullQuery( securityContext.getServiceContext(), queryStmt, - request.getStreamsProperties() + request.getConfigOverrides(), + request.getRequestProperties() ); } return handlePushQuery( securityContext.getServiceContext(), queryStmt, - request.getStreamsProperties() + request.getConfigOverrides() ); } if (statement.getStatement() instanceof PrintTopic) { return handlePrintTopic( securityContext.getServiceContext(), - request.getStreamsProperties(), + request.getConfigOverrides(), (PreparedStatement) statement); } @@ -234,10 +235,11 @@ private Response handleStatement( private Response handlePullQuery( final ServiceContext serviceContext, final PreparedStatement statement, - final Map streamsProperties + final Map configOverrides, + final Map requestProperties ) { final ConfiguredStatement configured = - ConfiguredStatement.of(statement,streamsProperties, ksqlConfig); + ConfiguredStatement.of(statement, configOverrides, requestProperties, ksqlConfig); final TableRowsEntity entity = pullQueryExecutor .execute(configured, serviceContext); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index 3ecb6c0a1eed..9193ddcc14be 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -361,7 +361,7 @@ private KsqlRequest parseRequest(final Session session) { throw new IllegalArgumentException("\"ksql\" field of \"request\" must be populated"); } // To validate props: - request.getStreamsProperties(); + request.getConfigOverrides(); return request; } catch (final Exception e) { throw new IllegalArgumentException("Error parsing request: " + e.getMessage(), e); @@ -387,7 +387,7 @@ private void validateKafkaAuthorization(final Statement statement) { @SuppressWarnings({"unused"}) private void handleQuery(final RequestContext info, final Query query) { - final Map clientLocalProperties = info.request.getStreamsProperties(); + final Map clientLocalProperties = info.request.getConfigOverrides(); final WebSocketSubscriber streamSubscriber = new WebSocketSubscriber<>(info.session, mapper); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/DefaultKsqlClient.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/DefaultKsqlClient.java index d1df27264224..60fc85c07cf8 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/DefaultKsqlClient.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/DefaultKsqlClient.java @@ -77,14 +77,15 @@ public RestResponse makeKsqlRequest( public RestResponse> makeQueryRequest( final URI serverEndPoint, final String sql, - final Map properties + final Map configOverrides, + final Map requestProperties ) { final KsqlTarget target = sharedClient .target(serverEndPoint) - .properties(properties); + .properties(configOverrides); final RestResponse> resp = getTarget(target, authHeader) - .postQueryRequest(sql, Optional.empty()); + .postQueryRequest(sql, requestProperties, Optional.empty()); if (resp.isErroneous()) { return RestResponse.erroneous(resp.getStatusCode(), resp.getErrorMessage()); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClient.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClient.java index 4bfa4ef9907a..1b1c285710ea 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClient.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClient.java @@ -60,7 +60,8 @@ public RestResponse makeKsqlRequest( final URI serverEndpoint, final String sql ) { - final KsqlRequest request = new KsqlRequest(sql, Collections.emptyMap(), null); + final KsqlRequest request = new KsqlRequest( + sql, Collections.emptyMap(), Collections.emptyMap(), null); final Response response = ksqlResource.handleKsqlStatements(securityContext, request); final Code statusCode = HttpStatus.getCode(response.getStatus()); @@ -76,7 +77,8 @@ public RestResponse makeKsqlRequest( public RestResponse> makeQueryRequest( final URI serverEndpoint, final String sql, - final Map properties + final Map configOverrides, + final Map requestProperties ) { throw new UnsupportedOperationException(); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/parser/ParserMatchers.java b/ksql-rest-app/src/test/java/io/confluent/ksql/parser/ParserMatchers.java index 88a05ca4aeb8..16d5149eaecc 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/parser/ParserMatchers.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/parser/ParserMatchers.java @@ -104,7 +104,7 @@ public static Matcher> configured( return new TypeSafeMatcher>() { @Override protected boolean matchesSafely(final ConfiguredStatement item) { - return Objects.equals(properties, item.getOverrides()) + return Objects.equals(properties, item.getConfigOverrides()) && Objects.equals(config, item.getConfig()); } @@ -124,7 +124,7 @@ public static Matcher> configured( @Override protected boolean matchesSafely(final ConfiguredStatement item) { return statement.matches(PreparedStatement.of(item.getStatementText(), item.getStatement())) - && Objects.equals(properties, item.getOverrides()) + && Objects.equals(properties, item.getConfigOverrides()) && Objects.equals(config, item.getConfig()); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java index ddf8771cfaa8..748e6e2e5888 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java @@ -36,6 +36,7 @@ import java.net.URI; import java.nio.charset.Charset; import java.util.Base64; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -131,6 +132,7 @@ static String rawRestQueryRequest( final KsqlRequest request = new KsqlRequest( sql, ImmutableMap.of(), + Collections.emptyMap(), cmdSeqNum.orElse(null) ); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 900972d2c2ed..c3f72f2ee239 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -228,7 +228,7 @@ public void shouldCreateLogStreamThroughKsqlResource() { // Then: verify(ksqlResource).handleKsqlStatements( securityContextArgumentCaptor.capture(), - eq(new KsqlRequest(logCreateStatement, Collections.emptyMap(), null)) + eq(new KsqlRequest(logCreateStatement, Collections.emptyMap(), Collections.emptyMap(), null)) ); assertThat(securityContextArgumentCaptor.getValue().getUserPrincipal(), is(Optional.empty())); assertThat(securityContextArgumentCaptor.getValue().getServiceContext(), is(serviceContext)); @@ -246,7 +246,7 @@ public void shouldNotCreateLogStreamIfAutoCreateNotConfigured() { // Then: verify(ksqlResource, never()).handleKsqlStatements( securityContext, - new KsqlRequest(logCreateStatement, Collections.emptyMap(), null) + new KsqlRequest(logCreateStatement, Collections.emptyMap(), Collections.emptyMap(), null) ); } @@ -262,7 +262,7 @@ public void shouldStartCommandStoreAndCommandRunnerBeforeCreatingLogStream() { inOrder.verify(commandRunner).start(); inOrder.verify(ksqlResource).handleKsqlStatements( securityContextArgumentCaptor.capture(), - eq(new KsqlRequest(logCreateStatement, Collections.emptyMap(), null)) + eq(new KsqlRequest(logCreateStatement, Collections.emptyMap(), Collections.emptyMap(), null)) ); assertThat(securityContextArgumentCaptor.getValue().getUserPrincipal(), is(Optional.empty())); assertThat(securityContextArgumentCaptor.getValue().getServiceContext(), is(serviceContext)); @@ -278,7 +278,7 @@ public void shouldCreateLogTopicBeforeSendingCreateStreamRequest() { inOrder.verify(topicClient).createTopic(eq(LOG_TOPIC_NAME), anyInt(), anyShort()); inOrder.verify(ksqlResource).handleKsqlStatements( securityContextArgumentCaptor.capture(), - eq(new KsqlRequest(logCreateStatement, Collections.emptyMap(), null)) + eq(new KsqlRequest(logCreateStatement, Collections.emptyMap(), Collections.emptyMap(), null)) ); assertThat(securityContextArgumentCaptor.getValue().getUserPrincipal(), is(Optional.empty())); assertThat(securityContextArgumentCaptor.getValue().getServiceContext(), is(serviceContext)); @@ -316,7 +316,7 @@ public void shouldSendCreateStreamRequestBeforeSettingReady() { final InOrder inOrder = Mockito.inOrder(ksqlResource, serverState); verify(ksqlResource).handleKsqlStatements( securityContextArgumentCaptor.capture(), - eq(new KsqlRequest(logCreateStatement, Collections.emptyMap(), null)) + eq(new KsqlRequest(logCreateStatement, Collections.emptyMap(), Collections.emptyMap(), null)) ); assertThat(securityContextArgumentCaptor.getValue().getUserPrincipal(), is(Optional.empty())); assertThat(securityContextArgumentCaptor.getValue().getServiceContext(), is(serviceContext)); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 0e0eaabc1217..03a5db1d5029 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -244,7 +244,7 @@ void executeCommands() { void submitCommands(final String ...statements) { for (final String statement : statements) { final Response response = ksqlResource.handleKsqlStatements(securityContext, - new KsqlRequest(statement, Collections.emptyMap(), null)); + new KsqlRequest(statement, Collections.emptyMap(), Collections.emptyMap(), null)); assertThat(response.getStatus(), equalTo(200)); executeCommands(); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListPropertiesExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListPropertiesExecutorTest.java index 83d5ab7b683c..986ff43f95ef 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListPropertiesExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListPropertiesExecutorTest.java @@ -73,7 +73,7 @@ public void shouldListPropertiesWithOverrides() { // When: final PropertiesList properties = (PropertiesList) CustomExecutors.LIST_PROPERTIES.execute( engine.configure("LIST PROPERTIES;") - .withProperties(ImmutableMap.of("ksql.streams.auto.offset.reset", "latest")), + .withConfigOverrides(ImmutableMap.of("ksql.streams.auto.offset.reset", "latest")), mock(SessionProperties.class), engine.getEngine(), engine.getServiceContext() diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java index f21135354de1..e40a98b43b48 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java @@ -37,7 +37,6 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; -import java.util.Optional; import org.eclipse.jetty.http.HttpStatus.Code; import org.junit.Rule; import org.junit.Test; @@ -71,7 +70,7 @@ public void shouldThrowExceptionIfConfigDisabled() { engine.getKsqlConfig() ); PullQueryExecutor pullQueryExecutor = new PullQueryExecutor( - engine.getEngine(), Optional.empty(), ROUTING_FILTER_FACTORY); + engine.getEngine(), ROUTING_FILTER_FACTORY); // Then: expectedException.expect(KsqlException.class); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index 021354c97834..ac60fb5ae3a8 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -200,8 +200,8 @@ public class KsqlResourceTest { private static final KsqlRequest VALID_EXECUTABLE_REQUEST = new KsqlRequest( "CREATE STREAM S AS SELECT * FROM test_stream;", ImmutableMap.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), - 0L - ); + emptyMap(), + 0L); private static final LogicalSchema SINGLE_FIELD_SCHEMA = LogicalSchema.builder() .valueColumn(ColumnName.of("val"), SqlTypes.STRING) .build(); @@ -397,7 +397,7 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { // When: ksqlResource.handleKsqlStatements( securityContext, - new KsqlRequest("query", Collections.emptyMap(), null) + new KsqlRequest("query", emptyMap(), emptyMap(), null) ); } @@ -736,7 +736,7 @@ public void shouldDistributeWithStreamsProperties() { verify(commandStore).enqueueCommand( any(), argThat(is(commandWithOverwrittenProperties( - VALID_EXECUTABLE_REQUEST.getStreamsProperties()))), + VALID_EXECUTABLE_REQUEST.getConfigOverrides()))), any(Producer.class) ); } @@ -1424,8 +1424,10 @@ public void shouldUnsetProperty() { // When: final CommandStatusEntity result = makeSingleRequest( - new KsqlRequest("UNSET '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "';\n" - + csas, localOverrides, null), + new KsqlRequest("UNSET '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "';\n" + csas, + localOverrides, + emptyMap(), + null), CommandStatusEntity.class); // Then: @@ -1523,7 +1525,8 @@ public void shouldListPropertiesWithOverrides() { // When: final PropertiesList props = makeSingleRequest( - new KsqlRequest("list properties;", overrides, null), PropertiesList.class); + new KsqlRequest("list properties;", overrides, emptyMap(), null), + PropertiesList.class); // Then: assertThat( @@ -1986,7 +1989,7 @@ private KsqlErrorMessage makeFailingRequestWithSequenceNumber( final String ksql, final Long seqNum, final Code errorCode) { - return makeFailingRequest(new KsqlRequest(ksql, emptyMap(), seqNum), errorCode); + return makeFailingRequest(new KsqlRequest(ksql, emptyMap(), emptyMap(), seqNum), errorCode); } private KsqlErrorMessage makeFailingRequest(final KsqlRequest ksqlRequest, final Code errorCode) { @@ -2015,7 +2018,7 @@ private T makeSingleRequestWithSequenceNumber( final Long seqNum, final Class expectedEntityType) { return makeSingleRequest( - new KsqlRequest(sql, emptyMap(), seqNum), expectedEntityType); + new KsqlRequest(sql, emptyMap(), emptyMap(), seqNum), expectedEntityType); } private T makeSingleRequest( @@ -2039,7 +2042,7 @@ private List makeMultipleRequest( final Map props, final Class expectedEntityType ) { - final KsqlRequest request = new KsqlRequest(sql, props, null); + final KsqlRequest request = new KsqlRequest(sql, props, emptyMap(), null); return makeMultipleRequest(request, expectedEntityType); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 28f69dc3b53f..235374f4f8d3 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -181,7 +181,7 @@ public void setup() { securityContext = new KsqlSecurityContext(Optional.empty(), serviceContext); pullQueryExecutor = new PullQueryExecutor( - mockKsqlEngine, Optional.empty(), ROUTING_FILTER_FACTORY); + mockKsqlEngine, ROUTING_FILTER_FACTORY); testResource = new StreamedQueryResource( mockKsqlEngine, mockStatementParser, @@ -230,7 +230,7 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { // When: testResource.streamQuery( securityContext, - new KsqlRequest("query", Collections.emptyMap(), null) + new KsqlRequest("query", Collections.emptyMap(), Collections.emptyMap(), null) ); } @@ -250,7 +250,7 @@ public void shouldReturn400OnBadStatement() { // When: testResource.streamQuery( securityContext, - new KsqlRequest("query", Collections.emptyMap(), null) + new KsqlRequest("query", Collections.emptyMap(), Collections.emptyMap(), null) ); } @@ -259,7 +259,7 @@ public void shouldNotWaitIfCommandSequenceNumberSpecified() throws Exception { // When: testResource.streamQuery( securityContext, - new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), null) + new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null) ); // Then: @@ -271,7 +271,7 @@ public void shouldWaitIfCommandSequenceNumberSpecified() throws Exception { // When: testResource.streamQuery( securityContext, - new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), 3L) + new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), 3L) ); // Then: @@ -296,7 +296,7 @@ public void shouldReturnServiceUnavailableIfTimeoutWaitingForCommandSequenceNumb // When: testResource.streamQuery( securityContext, - new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), 3L) + new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), 3L) ); } @@ -310,7 +310,7 @@ public void shouldNotCreateExternalClientsForPullQuery() { // When: testResource.streamQuery( securityContext, - new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), null) + new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null) ); // Then: @@ -332,7 +332,7 @@ public void shouldReturnForbiddenKafkaAccessForPullQueryAuthorizationDenied() { // When: final Response response = testResource.streamQuery( securityContext, - new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), null) + new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null) ); final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); @@ -400,7 +400,7 @@ public void shouldStreamRowsCorrectly() throws Throwable { final Response response = testResource.streamQuery( securityContext, - new KsqlRequest(queryString, requestStreamsProperties, null) + new KsqlRequest(queryString, requestStreamsProperties, Collections.emptyMap(), null) ); final PipedOutputStream responseOutputStream = new EOFPipedOutputStream(); final PipedInputStream responseInputStream = new PipedInputStream(responseOutputStream, 1); @@ -539,7 +539,7 @@ public void shouldUpdateTheLastRequestTime() { /// When: testResource.streamQuery( securityContext, - new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), null) + new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null) ); // Then: @@ -558,7 +558,7 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() // When: final Response response = testResource.streamQuery( securityContext, - new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), null) + new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null) ); final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); @@ -581,7 +581,7 @@ public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationEx // When: final Response response = testResource.streamQuery( securityContext, - new KsqlRequest(PRINT_TOPIC, Collections.emptyMap(), null) + new KsqlRequest(PRINT_TOPIC, Collections.emptyMap(), Collections.emptyMap(), null) ); assertEquals(response.getStatus(), AUTHORIZATION_ERROR_RESPONSE.getStatus()); @@ -627,7 +627,7 @@ public void shouldSuggestAlternativesIfPrintTopicDoesNotExist() { // When: testResource.streamQuery( securityContext, - new KsqlRequest(PRINT_TOPIC, Collections.emptyMap(), null) + new KsqlRequest(PRINT_TOPIC, Collections.emptyMap(), Collections.emptyMap(), null) ); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java index 4be64c1ba604..06242ad58c89 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java @@ -102,16 +102,25 @@ public class WSQueryEndpointTest { private static final ObjectMapper OBJECT_MAPPER = JsonMapper.INSTANCE.mapper; - private static final KsqlRequest VALID_REQUEST = new KsqlRequest("test-sql", - ImmutableMap.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), null); - - private static final KsqlRequest ANOTHER_REQUEST = new KsqlRequest("other-sql", - ImmutableMap.of(), null); + private static final KsqlRequest VALID_REQUEST = new KsqlRequest( + "test-sql", + ImmutableMap.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), + Collections.emptyMap(), + null); + + private static final KsqlRequest ANOTHER_REQUEST = new KsqlRequest( + "other-sql", + ImmutableMap.of(), + Collections.emptyMap(), + null); private static final long SEQUENCE_NUMBER = 2L; private static final KsqlRequest REQUEST_WITHOUT_SEQUENCE_NUMBER = VALID_REQUEST; - private static final KsqlRequest REQUEST_WITH_SEQUENCE_NUMBER = new KsqlRequest("test-sql", - ImmutableMap.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), SEQUENCE_NUMBER); + private static final KsqlRequest REQUEST_WITH_SEQUENCE_NUMBER = new KsqlRequest( + "test-sql", + ImmutableMap.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), + Collections.emptyMap(), + SEQUENCE_NUMBER); private static final String VALID_VERSION = Versions.KSQL_V1_WS; private static final String[] NO_VERSION_PROPERTY = null; @@ -344,7 +353,7 @@ public void shouldReturnErrorOnInvalidStreamProperty() throws Exception { // Given: final String jsonRequest = "{" + "\"ksql\":\"sql\"," - + "\"streamsProperties\":{" + + "\"configOverrides\":{" + "\"unknown-property\":true" + "}}"; @@ -389,7 +398,7 @@ public void shouldHandlePushQuery() { // Then: final ConfiguredStatement configuredStatement = ConfiguredStatement.of( PreparedStatement.of(VALID_REQUEST.getKsql(), query), - VALID_REQUEST.getStreamsProperties(), + VALID_REQUEST.getConfigOverrides(), ksqlConfig); verify(pushQueryPublisher).start( @@ -436,7 +445,7 @@ public void shouldHandlePullQuery() { // Then: final ConfiguredStatement configuredStatement = ConfiguredStatement.of( PreparedStatement.of(VALID_REQUEST.getKsql(), query), - VALID_REQUEST.getStreamsProperties(), + VALID_REQUEST.getConfigOverrides(), ksqlConfig); verify(pullQueryPublisher).start( diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClientTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClientTest.java index 409b11f33b4e..c36041d55e3d 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClientTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClientTest.java @@ -40,7 +40,7 @@ public class ServerInternalKsqlClientTest { private static final String KSQL_STATEMENT = "awesome ksql request;"; private static final KsqlRequest EXPECTED_REQUEST = - new KsqlRequest(KSQL_STATEMENT, Collections.emptyMap(), null); + new KsqlRequest(KSQL_STATEMENT, Collections.emptyMap(), Collections.emptyMap(), null); @Mock private KsqlResource ksqlResource; diff --git a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java b/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java index 52d2f55288a9..b96aeec4c885 100644 --- a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java +++ b/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java @@ -34,6 +34,7 @@ import java.net.URI; import java.net.URL; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -140,7 +141,8 @@ public RestResponse> makeQueryRequest(final String ksql, if (properties != null) { target = target.properties(properties); } - return target.postQueryRequest(ksql, Optional.ofNullable(commandSeqNum)); + return target.postQueryRequest( + ksql, Collections.emptyMap(), Optional.ofNullable(commandSeqNum)); } public RestResponse> makePrintTopicRequest( diff --git a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlTarget.java b/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlTarget.java index 6214c2707a1a..b6be87dd4595 100644 --- a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlTarget.java +++ b/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlTarget.java @@ -40,6 +40,7 @@ import io.vertx.core.http.HttpMethod; import io.vertx.core.net.SocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -135,18 +136,19 @@ public RestResponse postKsqlRequest( ) { return post( KSQL_PATH, - createKsqlRequest(ksql, previousCommandSeqNum), + createKsqlRequest(ksql, Collections.emptyMap(), previousCommandSeqNum), r -> deserialize(r.getBody(), KsqlEntityList.class) ); } public RestResponse> postQueryRequest( final String ksql, + final Map serverProperties, final Optional previousCommandSeqNum ) { return post( QUERY_PATH, - createKsqlRequest(ksql, previousCommandSeqNum), + createKsqlRequest(ksql, Collections.emptyMap(), previousCommandSeqNum), KsqlTarget::toRows ); } @@ -168,11 +170,13 @@ public RestResponse> postPrintTopicRequest( private KsqlRequest createKsqlRequest( final String ksql, + final Map serverProperties, final Optional previousCommandSeqNum ) { return new KsqlRequest( ksql, localProperties.toMap(), + serverProperties, previousCommandSeqNum.orElse(null) ); } @@ -216,7 +220,8 @@ private RestResponse> executeQueryRequestWithStreamRespon final Optional previousCommandSeqNum, final Function mapper ) { - final KsqlRequest ksqlRequest = createKsqlRequest(ksql, previousCommandSeqNum); + final KsqlRequest ksqlRequest = createKsqlRequest( + ksql, Collections.emptyMap(), previousCommandSeqNum); final AtomicReference> pubRef = new AtomicReference<>(); return executeSync(HttpMethod.POST, QUERY_PATH, ksqlRequest, resp -> pubRef.get(), (resp, vcf) -> { diff --git a/ksql-rest-client/src/test/java/io/confluent/ksql/rest/client/KsqlClientTest.java b/ksql-rest-client/src/test/java/io/confluent/ksql/rest/client/KsqlClientTest.java index 9e39cdbc6d03..28baec0af3fc 100644 --- a/ksql-rest-client/src/test/java/io/confluent/ksql/rest/client/KsqlClientTest.java +++ b/ksql-rest-client/src/test/java/io/confluent/ksql/rest/client/KsqlClientTest.java @@ -54,6 +54,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Base64; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -112,7 +113,7 @@ public void shouldSendKsqlRequest() { assertThat(server.getPath(), is("/ksql")); assertThat(server.getHeaders().get("Accept"), is("application/json")); assertThat(getKsqlRequest(), - is(new KsqlRequest(ksql, properties, 123L))); + is(new KsqlRequest(ksql, properties, Collections.emptyMap(), 123L))); } @Test @@ -161,7 +162,7 @@ public void shouldOverrideProperties() { target.postKsqlRequest("some ksql", Optional.of(123L)); // Then: - assertThat(getKsqlRequest().getStreamsProperties(), is(props)); + assertThat(getKsqlRequest().getConfigOverrides(), is(props)); } @Test @@ -300,14 +301,15 @@ public void shouldPostQueryRequest() { // When: KsqlTarget target = ksqlClient.target(serverUri); - RestResponse> response = target.postQueryRequest(sql, Optional.of(321L)); + RestResponse> response = target.postQueryRequest( + sql, Collections.emptyMap(), Optional.of(321L)); // Then: assertThat(server.getHttpMethod(), is(HttpMethod.POST)); assertThat(server.getPath(), is("/query")); assertThat(server.getHeaders().get("Accept"), is("application/json")); - assertThat(getKsqlRequest(), is(new KsqlRequest(sql, properties, 321L))); + assertThat(getKsqlRequest(), is(new KsqlRequest(sql, properties, Collections.emptyMap(), 321L))); assertThat(response.get(), is(expectedResponse)); } @@ -330,7 +332,7 @@ public void shouldPostQueryRequestStreamed() throws Exception { assertThat(server.getPath(), is("/query")); assertThat(server.getHeaders().get("Accept"), is("application/json")); - assertThat(getKsqlRequest(), is(new KsqlRequest(sql, properties, 321L))); + assertThat(getKsqlRequest(), is(new KsqlRequest(sql, properties, Collections.emptyMap(), 321L))); List rows = getElementsFromPublisher(numRows, response.getResponse()); assertThat(rows, is(expectedResponse)); @@ -355,7 +357,7 @@ public void shouldPostQueryRequestStreamedWithLimit() throws Exception { assertThat(server.getPath(), is("/query")); assertThat(server.getHeaders().get("Accept"), is("application/json")); - assertThat(getKsqlRequest(), is(new KsqlRequest(sql, properties, 321L))); + assertThat(getKsqlRequest(), is(new KsqlRequest(sql, properties, Collections.emptyMap(), 321L))); List rows = getElementsFromPublisher(numRows + 1, response.getResponse()); assertThat(rows, is(expectedResponse)); @@ -374,7 +376,7 @@ public void shouldCloseConnectionWhenQueryStreamIsClosed() throws Exception { .postQueryRequestStreamed(sql, Optional.of(321L)); // Then: - assertThat(getKsqlRequest(), is(new KsqlRequest(sql, properties, 321L))); + assertThat(getKsqlRequest(), is(new KsqlRequest(sql, properties, Collections.emptyMap(), 321L))); // When: response.getResponse().close(); @@ -401,7 +403,7 @@ public void shouldExecutePrintTopic() throws Exception { assertThat(server.getPath(), is("/query")); assertThat(server.getHeaders().get("Accept"), is("application/json")); - assertThat(getKsqlRequest(), is(new KsqlRequest(command, properties, 123L))); + assertThat(getKsqlRequest(), is(new KsqlRequest(command, properties, Collections.emptyMap(), 123L))); List lines = getElementsFromPublisher(numRows, response.getResponse()); assertThat(lines, is(expectedResponse)); @@ -420,7 +422,7 @@ public void shouldCloseConnectionWhenPrintTopicPublisherIsClosed() throws Except .postPrintTopicRequest(command, Optional.of(123L)); // Then: - assertThat(getKsqlRequest(), is(new KsqlRequest(command, properties, 123L))); + assertThat(getKsqlRequest(), is(new KsqlRequest(command, properties, Collections.emptyMap(), 123L))); // When: response.getResponse().close(); diff --git a/ksql-rest-client/src/test/java/io/confluent/ksql/rest/client/KsqlClientUtilTest.java b/ksql-rest-client/src/test/java/io/confluent/ksql/rest/client/KsqlClientUtilTest.java index 6fa7522ea9f2..cc8b749a52cf 100644 --- a/ksql-rest-client/src/test/java/io/confluent/ksql/rest/client/KsqlClientUtilTest.java +++ b/ksql-rest-client/src/test/java/io/confluent/ksql/rest/client/KsqlClientUtilTest.java @@ -31,6 +31,7 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClientResponse; import io.vertx.core.json.JsonObject; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.function.Function; @@ -170,15 +171,15 @@ public void shouldSerialiseDeserialise() { // Given: Map props = new HashMap<>(); props.put("auto.offset.reset", "latest"); - KsqlRequest request = new KsqlRequest("some ksql", props, 21345L); + KsqlRequest request = new KsqlRequest("some ksql", props, Collections.emptyMap(), 21345L); // When: Buffer buff = KsqlClientUtil.serialize(request); // Then: assertThat(buff, is(notNullValue())); - String expectedJson = "{\"ksql\":\"some ksql\",\"streamsProperties\":{\"auto.offset.reset\":\"" - + "latest\"},\"commandSequenceNumber\":21345}"; + String expectedJson = "{\"ksql\":\"some ksql\",\"configOverrides\":{\"auto.offset.reset\":\"" + + "latest\"},\"requestProperties\":{},\"commandSequenceNumber\":21345}"; assertThat(new JsonObject(buff), is(new JsonObject(expectedJson))); // When: diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java index fdbc871d4a2e..5b15fb66b827 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java @@ -37,19 +37,24 @@ public class KsqlRequest { private static final PropertyParser PROPERTY_PARSER = new LocalPropertyParser(); private final String ksql; - private final ImmutableMap streamsProperties; + private final ImmutableMap configOverrides; + private final ImmutableMap requestProperties; private final Optional commandSequenceNumber; @JsonCreator public KsqlRequest( @JsonProperty("ksql") final String ksql, - @JsonProperty("streamsProperties") final Map streamsProperties, + @JsonProperty("configOverrides") final Map configOverrides, + @JsonProperty("requestProperties") final Map requestProperties, @JsonProperty("commandSequenceNumber") final Long commandSequenceNumber ) { this.ksql = ksql == null ? "" : ksql; - this.streamsProperties = streamsProperties == null + this.configOverrides = configOverrides == null ? ImmutableMap.of() - : ImmutableMap.copyOf(serializeClassValues(streamsProperties)); + : ImmutableMap.copyOf(serializeClassValues(configOverrides)); + this.requestProperties = requestProperties == null + ? ImmutableMap.of() + : ImmutableMap.copyOf(serializeClassValues(requestProperties)); this.commandSequenceNumber = Optional.ofNullable(commandSequenceNumber); } @@ -57,8 +62,12 @@ public String getKsql() { return ksql; } - public Map getStreamsProperties() { - return coerceTypes(streamsProperties); + public Map getConfigOverrides() { + return coerceTypes(configOverrides); + } + + public Map getRequestProperties() { + return coerceTypes(requestProperties); } public Optional getCommandSequenceNumber() { @@ -77,20 +86,22 @@ public boolean equals(final Object o) { final KsqlRequest that = (KsqlRequest) o; return Objects.equals(ksql, that.ksql) - && Objects.equals(streamsProperties, that.streamsProperties) + && Objects.equals(configOverrides, that.configOverrides) + && Objects.equals(requestProperties, that.requestProperties) && Objects.equals(commandSequenceNumber, that.commandSequenceNumber); } @Override public int hashCode() { - return Objects.hash(ksql, streamsProperties, commandSequenceNumber); + return Objects.hash(ksql, configOverrides, requestProperties, commandSequenceNumber); } @Override public String toString() { return "KsqlRequest{" + "ksql='" + ksql + '\'' - + ", streamsProperties=" + streamsProperties + + ", configOverrides=" + configOverrides + + ", requestProperties=" + requestProperties + ", commandSequenceNumber=" + commandSequenceNumber + '}'; } @@ -142,4 +153,4 @@ private static String listToString(final List value) { .map(e -> e == null ? null : e.toString()) .collect(Collectors.joining(",")); } -} +} \ No newline at end of file diff --git a/ksql-rest-model/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java b/ksql-rest-model/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java index 5a5d76c15e60..fc1dd5de6d71 100644 --- a/ksql-rest-model/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java +++ b/ksql-rest-model/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java @@ -28,6 +28,7 @@ import com.google.common.testing.EqualsTester; import io.confluent.ksql.json.JsonMapper; import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.KsqlRequestConfig; import java.io.IOException; import java.util.Collections; import java.util.Map; @@ -45,55 +46,73 @@ public class KsqlRequestTest { private static final ObjectMapper OBJECT_MAPPER = JsonMapper.INSTANCE.mapper; private static final String A_JSON_REQUEST = "{" + "\"ksql\":\"sql\"," - + "\"streamsProperties\":{" + + "\"configOverrides\":{" + "\"" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "\":\"earliest\"," + "\"" + StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + "\":\"" + TimestampExtractor.class.getCanonicalName() + "\"" + + "}," + + "\"requestProperties\":{" + + "\"" + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING + "\":true" + "}}"; private static final String A_JSON_REQUEST_WITH_COMMAND_NUMBER = "{" + "\"ksql\":\"sql\"," - + "\"streamsProperties\":{" + + "\"configOverrides\":{" + "\"" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "\":\"earliest\"," + "\"" + StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + "\":\"" + TimestampExtractor.class.getCanonicalName() + "\"" + "}," + + "\"requestProperties\":{" + + "\"" + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING + "\":true" + + "}," + "\"commandSequenceNumber\":2}"; private static final String A_JSON_REQUEST_WITH_NULL_COMMAND_NUMBER = "{" + "\"ksql\":\"sql\"," - + "\"streamsProperties\":{" + + "\"configOverrides\":{" + "\"" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "\":\"earliest\"," + "\"" + StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + "\":\"" + TimestampExtractor.class.getCanonicalName() + "\"" + "}," + + "\"requestProperties\":{" + + "\"" + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING + "\":true" + + "}," + "\"commandSequenceNumber\":null}"; private static final ImmutableMap SOME_PROPS = ImmutableMap.of( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest", StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class ); + private static final ImmutableMap SOME_REQUEST_PROPS = ImmutableMap.of( + KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING, true + ); private static final long SOME_COMMAND_NUMBER = 2L; - private static final KsqlRequest A_REQUEST = new KsqlRequest("sql", SOME_PROPS, null); + private static final KsqlRequest A_REQUEST = new KsqlRequest( + "sql", SOME_PROPS, SOME_REQUEST_PROPS, null); private static final KsqlRequest A_REQUEST_WITH_COMMAND_NUMBER = - new KsqlRequest("sql", SOME_PROPS, SOME_COMMAND_NUMBER); + new KsqlRequest("sql", SOME_PROPS, SOME_REQUEST_PROPS, SOME_COMMAND_NUMBER); @Rule public final ExpectedException expectedException = ExpectedException.none(); @Test public void shouldHandleNullStatement() { - assertThat(new KsqlRequest(null, SOME_PROPS, SOME_COMMAND_NUMBER).getKsql(), is("")); + assertThat(new KsqlRequest(null, SOME_PROPS, SOME_REQUEST_PROPS, SOME_COMMAND_NUMBER) + .getKsql(), + is("")); } @Test public void shouldHandleNullProps() { - assertThat(new KsqlRequest("sql", null, SOME_COMMAND_NUMBER).getStreamsProperties(), + assertThat(new KsqlRequest("sql", null, SOME_REQUEST_PROPS, SOME_COMMAND_NUMBER) + .getConfigOverrides(), is(Collections.emptyMap())); } @Test public void shouldHandleNullCommandNumber() { - assertThat(new KsqlRequest("sql", SOME_PROPS, null).getCommandSequenceNumber(), is(Optional.empty())); + assertThat(new KsqlRequest( + "sql", SOME_PROPS, Collections.emptyMap(), null).getCommandSequenceNumber(), + is(Optional.empty())); } @Test @@ -144,11 +163,13 @@ public void shouldSerializeToJsonWithCommandNumber() { @Test public void shouldImplementHashCodeAndEqualsCorrectly() { new EqualsTester() - .addEqualityGroup(new KsqlRequest("sql", SOME_PROPS, SOME_COMMAND_NUMBER), - new KsqlRequest("sql", SOME_PROPS, SOME_COMMAND_NUMBER)) - .addEqualityGroup(new KsqlRequest("different-sql", SOME_PROPS, SOME_COMMAND_NUMBER)) - .addEqualityGroup(new KsqlRequest("sql", ImmutableMap.of(), SOME_COMMAND_NUMBER)) - .addEqualityGroup(new KsqlRequest("sql", SOME_PROPS, null)) + .addEqualityGroup(new KsqlRequest("sql", SOME_PROPS, SOME_REQUEST_PROPS, SOME_COMMAND_NUMBER), + new KsqlRequest("sql", SOME_PROPS, SOME_REQUEST_PROPS, SOME_COMMAND_NUMBER)) + .addEqualityGroup(new KsqlRequest("different-sql", SOME_PROPS, + SOME_REQUEST_PROPS, SOME_COMMAND_NUMBER)) + .addEqualityGroup(new KsqlRequest("sql", ImmutableMap.of(), + SOME_REQUEST_PROPS, SOME_COMMAND_NUMBER)) + .addEqualityGroup(new KsqlRequest("sql", SOME_PROPS, SOME_REQUEST_PROPS, null)) .testEquals(); } @@ -157,7 +178,7 @@ public void shouldHandleShortProperties() { // Given: final String jsonRequest = "{" + "\"ksql\":\"sql\"," - + "\"streamsProperties\":{" + + "\"configOverrides\":{" + "\"" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "\":\"earliest\"" + "}}"; @@ -165,7 +186,7 @@ public void shouldHandleShortProperties() { final KsqlRequest request = deserialize(jsonRequest); // Then: - assertThat(request.getStreamsProperties().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), equalTo("earliest")); + assertThat(request.getConfigOverrides().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), equalTo("earliest")); } @Test @@ -176,15 +197,15 @@ public void shouldThrowOnInvalidPropertyValue() { ImmutableMap.of( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "not-parsable" ), - null - ); + SOME_REQUEST_PROPS, + null); expectedException.expect(KsqlException.class); expectedException.expectMessage(containsString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); expectedException.expectMessage(containsString("not-parsable")); // When: - request.getStreamsProperties(); + request.getConfigOverrides(); } @Test @@ -195,11 +216,11 @@ public void shouldHandleNullPropertyValue() { Collections.singletonMap( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" ), - null - ); + SOME_REQUEST_PROPS, + null); // When: - final Map props = request.getStreamsProperties(); + final Map props = request.getConfigOverrides(); // Then: assertThat(props.keySet(), hasItem(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); @@ -214,11 +235,12 @@ public void shouldHandleOverridesOfTypeList() { ImmutableMap.of( ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ImmutableList.of("some.type") ), + SOME_REQUEST_PROPS, null ); // When: - final Map props = request.getStreamsProperties(); + final Map props = request.getConfigOverrides(); // Then: assertThat( diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/RoutingOptions.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/RoutingOptions.java index 0b1130303102..7aa4e9d00d88 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/RoutingOptions.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/RoutingOptions.java @@ -21,4 +21,6 @@ public interface RoutingOptions { // The offset lag allowed from a given host long getOffsetLagAllowed(); + + boolean skipForwardRequest(); } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java index ca83c595b859..058675715f7c 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java @@ -18,6 +18,7 @@ import static java.util.Objects.requireNonNull; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.errorprone.annotations.Immutable; import io.confluent.ksql.execution.streams.RoutingFilter; import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory; @@ -85,17 +86,26 @@ public List locate( final HostInfo activeHost = metadata.getActiveHost(); final Set standByHosts = metadata.getStandbyHosts(); - LOG.debug("Before filtering: Active host {} , standby hosts {}", activeHost, standByHosts); - final List allHosts = Stream.concat(Stream.of(activeHost), standByHosts.stream()) - .map(this::asKsqlHost) - .collect(Collectors.toList()); + // If the lookup is for a forwarded request, only filter localhost + List allHosts = null; + if (routingOptions.skipForwardRequest()) { + LOG.debug("Before filtering: Local host {} ", localHost); + allHosts = ImmutableList.of(new KsqlHostInfo(localHost.getHost(), localHost.getPort())); + } else { + LOG.debug("Before filtering: Active host {} , standby hosts {}", activeHost, standByHosts); + allHosts = Stream.concat(Stream.of(activeHost), standByHosts.stream()) + .map(this::asKsqlHost) + .collect(Collectors.toList()); + } final RoutingFilter routingFilter = routingFilterFactory.createRoutingFilter(routingOptions, allHosts, activeHost, applicationId, stateStoreName, metadata.getPartition()); - // Filter out hosts based on active and liveness filters. + // Filter out hosts based on active, liveness and max lag filters. // The list is ordered by routing preference: active node is first, then standby nodes. // If heartbeat is not enabled, all hosts are considered alive. + // If the request is forwarded internally from another ksql server, only the max lag filter + // is applied. final List filteredHosts = allHosts.stream() .filter(routingFilter::filter) .map(this::asNode)