Skip to content

Commit

Permalink
fix: added special handling for forwarded pull query request (#4597)
Browse files Browse the repository at this point in the history
added request properties to ksql request
  • Loading branch information
vpapavas authored Feb 27, 2020
1 parent c8f5353 commit ba4fe74
Show file tree
Hide file tree
Showing 41 changed files with 441 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

import static io.confluent.ksql.util.KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX;
import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX;
import static io.confluent.ksql.util.KsqlRequestConfig.KSQL_REQUEST_CONFIG_PROPERTY_PREFIX;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlRequestConfig;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -39,6 +41,7 @@ public class KsqlConfigResolver implements ConfigResolver {
private static final ConfigDef CONSUMER_CONFIG_DEF = getConfigDef(ConsumerConfig.class);
private static final ConfigDef PRODUCER_CONFIG_DEF = getConfigDef(ProducerConfig.class);
private static final ConfigDef KSQL_CONFIG_DEF = KsqlConfig.CURRENT_DEF;
private static final ConfigDef REQUEST_CONFIG_DEF = KsqlRequestConfig.CURRENT_DEF;

private static final List<PrefixedConfig> STREAM_CONFIG_DEFS = ImmutableList.of(
new PrefixedConfig(StreamsConfig.CONSUMER_PREFIX, CONSUMER_CONFIG_DEF),
Expand All @@ -50,7 +53,9 @@ public class KsqlConfigResolver implements ConfigResolver {

@Override
public Optional<ConfigItem> resolve(final String propertyName, final boolean strict) {
if (propertyName.startsWith(KSQL_CONFIG_PROPERTY_PREFIX)
if (propertyName.startsWith(KSQL_REQUEST_CONFIG_PROPERTY_PREFIX)) {
return resolveRequestConfig(propertyName);
} else if (propertyName.startsWith(KSQL_CONFIG_PROPERTY_PREFIX)
&& !propertyName.startsWith(KSQL_STREAMS_PREFIX)) {
return resolveKsqlConfig(propertyName);
}
Expand Down Expand Up @@ -100,6 +105,15 @@ private static Optional<ConfigItem> resolveKsqlConfig(final String propertyName)
return Optional.empty();
}

private static Optional<ConfigItem> resolveRequestConfig(final String propertyName) {
final Optional<ConfigItem> possibleItem = resolveConfig("", REQUEST_CONFIG_DEF, propertyName);
if (possibleItem.isPresent()) {
return possibleItem;
}

return Optional.empty();
}

private static Optional<ConfigItem> resolveConfig(
final String prefix,
final ConfigDef def,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;

import java.util.Map;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;

public class KsqlRequestConfig extends AbstractConfig {

public static final String KSQL_REQUEST_CONFIG_PROPERTY_PREFIX = "request.ksql.";

public static final ConfigDef CURRENT_DEF = buildConfigDef();

public static final String KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING =
"request.ksql.query.pull.skip.forwarding";
public static final Boolean KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING_DEFAULT = false;
private static final String KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING_DOC =
"Controls whether a ksql host forwards a pull query request to another host";

private static ConfigDef buildConfigDef() {
final ConfigDef configDef = new ConfigDef()
.define(
KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING,
Type.BOOLEAN,
KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING_DEFAULT,
ConfigDef.Importance.MEDIUM,
KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING_DOC
);
return configDef;
}

public KsqlRequestConfig(final Map<?, ?> props) {
super(CURRENT_DEF, props);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void execute(
final InsertValues insertValues = statement.getStatement();
final MetaStore metaStore = executionContext.getMetaStore();
final KsqlConfig config = statement.getConfig()
.cloneWithPropertyOverwrite(statement.getOverrides());
.cloneWithPropertyOverwrite(statement.getConfigOverrides());

final DataSource dataSource = getDataSource(config, metaStore, insertValues);

Expand Down Expand Up @@ -210,7 +210,7 @@ private ProducerRecord<byte[], byte[]> buildRecord(

final InsertValues insertValues = statement.getStatement();
final KsqlConfig config = statement.getConfig()
.cloneWithPropertyOverwrite(statement.getOverrides());
.cloneWithPropertyOverwrite(statement.getConfigOverrides());

try {
final RowData row = extractRow(
Expand Down
18 changes: 14 additions & 4 deletions ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ public KsqlPlan plan(
final ConfiguredStatement<?> statement
) {
return EngineExecutor
.create(primaryContext, serviceContext, statement.getConfig(), statement.getOverrides())
.create(
primaryContext, serviceContext, statement.getConfig(), statement.getConfigOverrides())
.plan(statement);
}

Expand All @@ -198,7 +199,7 @@ public ExecuteResult execute(
serviceContext,
ConfiguredKsqlPlan.of(
plan(serviceContext, statement),
statement.getOverrides(),
statement.getConfigOverrides(),
statement.getConfig()
)
);
Expand All @@ -211,7 +212,11 @@ public TransientQueryMetadata executeQuery(
) {
try {
final TransientQueryMetadata query = EngineExecutor
.create(primaryContext, serviceContext, statement.getConfig(), statement.getOverrides())
.create(
primaryContext,
serviceContext,
statement.getConfig(),
statement.getConfigOverrides())
.executeQuery(statement);
registerQuery(query);
return query;
Expand All @@ -230,7 +235,12 @@ public QueryMetadata executeQuery(
final Consumer<GenericRow> rowConsumer
) {
final QueryMetadata query = EngineExecutor
.create(primaryContext, serviceContext, statement.getConfig(), statement.getOverrides())
.create(
primaryContext,
serviceContext,
statement.getConfig(),
statement.getConfigOverrides()
)
.executeQuery(statement, rowConsumer);
registerQuery(query);
return query;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public KsqlPlan plan(
engineContext,
serviceContext,
statement.getConfig(),
statement.getOverrides()
statement.getConfigOverrides()
).plan(statement);
}

Expand All @@ -127,7 +127,7 @@ public ExecuteResult execute(
serviceContext,
ConfiguredKsqlPlan.of(
plan(serviceContext, statement),
statement.getOverrides(),
statement.getConfigOverrides(),
statement.getConfig()
)
);
Expand All @@ -142,7 +142,7 @@ public TransientQueryMetadata executeQuery(
engineContext,
serviceContext,
statement.getConfig(),
statement.getOverrides()
statement.getConfigOverrides()
).executeQuery(statement);
}

Expand All @@ -153,7 +153,7 @@ public QueryMetadata executeQuery(final ServiceContext serviceContext,
engineContext,
serviceContext,
statement.getConfig(),
statement.getOverrides()
statement.getConfigOverrides()
).executeQuery(statement, rowConsumer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private Optional<ConfiguredStatement<CreateSource>> forCreateStatement(
final CreateSource withSchema = addSchemaFields(statement, valueSchema);
final PreparedStatement<CreateSource> prepared = buildPreparedStatement(withSchema);
final ConfiguredStatement<CreateSource> configured = ConfiguredStatement
.of(prepared, statement.getOverrides(), statement.getConfig());
.of(prepared, statement.getConfigOverrides(), statement.getConfig());

return Optional.of(configured);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public final class ConfiguredStatement<T extends Statement> {

private final PreparedStatement<T> statement;
@EffectivelyImmutable
private final ImmutableMap<String, Object> overrides;
private final ImmutableMap<String, Object> configOverrides;
@EffectivelyImmutable
private final ImmutableMap<String, Object> requestProperties;
private final KsqlConfig config;

public static <S extends Statement> ConfiguredStatement<S> of(
Expand All @@ -46,13 +48,36 @@ public static <S extends Statement> ConfiguredStatement<S> of(
return new ConfiguredStatement<>(statement, overrides, config);
}

public static <S extends Statement> ConfiguredStatement<S> of(
final PreparedStatement<S> statement,
final Map<String, ?> overrides,
final Map<String, ?> requestProperties,
final KsqlConfig config
) {
return new ConfiguredStatement<>(statement, overrides, requestProperties, config);
}

private ConfiguredStatement(
final PreparedStatement<T> statement,
final Map<String, ?> overrides,
final Map<String, ?> configOverrides,
final KsqlConfig config
) {
this.statement = requireNonNull(statement, "statement");
this.overrides = ImmutableMap.copyOf(requireNonNull(overrides, "overrides"));
this.configOverrides = ImmutableMap.copyOf(requireNonNull(configOverrides, "overrides"));
this.config = requireNonNull(config, "config");
this.requestProperties = ImmutableMap.of();
}

private ConfiguredStatement(
final PreparedStatement<T> statement,
final Map<String, ?> configOverrides,
final Map<String, ?> requestProperties,
final KsqlConfig config
) {
this.statement = requireNonNull(statement, "statement");
this.configOverrides = ImmutableMap.copyOf(requireNonNull(configOverrides, "overrides"));
this.requestProperties = ImmutableMap.copyOf(requireNonNull(
requestProperties, "serverProperties"));
this.config = requireNonNull(config, "config");
}

Expand All @@ -69,27 +94,35 @@ public String getStatementText() {
return statement.getStatementText();
}

public Map<String, Object> getOverrides() {
return overrides;
public Map<String, Object> getConfigOverrides() {
return configOverrides;
}

public Map<String, Object> getRequestProperties() {
return requestProperties;
}

public KsqlConfig getConfig() {
return config;
}

public ConfiguredStatement<T> withConfig(final KsqlConfig config) {
return new ConfiguredStatement<>(this.statement, this.overrides, config);
return new ConfiguredStatement<>(this.statement, this.configOverrides, config);
}

public ConfiguredStatement<T> withProperties(final Map<String, Object> properties) {
public ConfiguredStatement<T> withConfigOverrides(final Map<String, Object> properties) {
return new ConfiguredStatement<>(this.statement, properties, this.config);
}

public ConfiguredStatement<T> withRequestProperties(final Map<String, Object> properties) {
return new ConfiguredStatement<>(this.statement, this.configOverrides, properties, this.config);
}

public ConfiguredStatement<T> withStatement(
final String statementText,
final T statement) {
return new ConfiguredStatement<>(
PreparedStatement.of(statementText, statement), this.overrides, this.config);
PreparedStatement.of(statementText, statement), this.configOverrides, this.config);
}

@Override
Expand All @@ -102,20 +135,21 @@ public boolean equals(final Object o) {
}
final ConfiguredStatement<?> that = (ConfiguredStatement<?>) o;
return Objects.equals(statement, that.statement)
&& Objects.equals(overrides, that.overrides)
&& Objects.equals(configOverrides, that.configOverrides)
&& Objects.equals(config, that.config);
}

@Override
public int hashCode() {
return Objects.hash(statement, overrides, config);
return Objects.hash(statement, configOverrides, config);
}

@Override
public String toString() {
return "ConfiguredStatement{"
+ "statement=" + statement
+ ", overrides=" + overrides
+ ", configOverrides=" + configOverrides
+ ", requestProperties=" + requestProperties
+ ", config=" + config
+ '}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private <T extends CreateAsSelect> ConfiguredStatement<?> injectForCreateAsSelec
final TopicProperties.Builder topicPropertiesBuilder
) {
final String prefix =
statement.getOverrides().getOrDefault(
statement.getConfigOverrides().getOrDefault(
KsqlConfig.KSQL_OUTPUT_TOPIC_NAME_PREFIX_CONFIG,
statement.getConfig().getString(KsqlConfig.KSQL_OUTPUT_TOPIC_NAME_PREFIX_CONFIG))
.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void shouldTakeDefensiveCopyOfProperties() {
props.put("other", "thing");

// Then:
assertThat(statement.getOverrides(), is(ImmutableMap.of("this", "that")));
assertThat(statement.getConfigOverrides(), is(ImmutableMap.of("this", "that")));
}

@Test
Expand All @@ -60,6 +60,6 @@ public void shouldReturnImmutableProperties() {
.of(prepared, new HashMap<>(), CONFIG);

// Then:
assertThat(statement.getOverrides(), is(instanceOf(ImmutableMap.class)));
assertThat(statement.getConfigOverrides(), is(instanceOf(ImmutableMap.class)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public RestResponse<KsqlEntityList> makeKsqlRequest(final URI serverEndPoint, fi
public RestResponse<List<StreamedRow>> makeQueryRequest(
final URI serverEndPoint,
final String sql,
final Map<String, ?> properties
final Map<String, ?> configOverrides,
final Map<String, ?> requestProperties
) {
throw new UnsupportedOperationException("KSQL client is disabled");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,19 @@ RestResponse<KsqlEntityList> makeKsqlRequest(
String sql
);

/**
* Send pull query request to remote Ksql server.
* @param serverEndPoint the remote destination
* @param sql the pull query statement
* @param configOverrides the config overrides provided by the client
* @param requestProperties the request metadata provided by the server
* @return the result of pull query evaluation
*/
RestResponse<List<StreamedRow>> makeQueryRequest(
URI serverEndPoint,
String sql,
Map<String, ?> properties
Map<String, ?> configOverrides,
Map<String, ?> requestProperties
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ private Optional<ConfiguredKsqlPlan> planStatement(final ParsedStatement stmt) {
return Optional.of(
ConfiguredKsqlPlan.of(
rewritePlan(plan),
reformatted.getOverrides(),
reformatted.getConfigOverrides(),
reformatted.getConfig()
)
);
Expand Down
Loading

0 comments on commit ba4fe74

Please sign in to comment.