Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process commands so that WITH clause is always resolved before executing it #2436

Merged
merged 17 commits into from
Mar 28, 2019
Merged
53 changes: 40 additions & 13 deletions ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.confluent.ksql.schema.inference.SchemaRegistryTopicSchemaSupplier;
import io.confluent.ksql.services.DefaultServiceContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.topic.DefaultTopicInjector;
import io.confluent.ksql.topic.TopicInjector;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
Expand All @@ -41,6 +43,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -51,7 +54,8 @@ public class KsqlContext {
private final ServiceContext serviceContext;
private final KsqlConfig ksqlConfig;
private final KsqlEngine ksqlEngine;
private final SchemaInjector schemaInjector;
private final Function<ServiceContext, SchemaInjector> schemaInjectorFactory;
private final Function<KsqlExecutionContext, TopicInjector> topicInjectorFactory;

/**
* Create a KSQL context object with the given properties. A KSQL context has it's own metastore
Expand All @@ -72,23 +76,30 @@ public static KsqlContext create(
functionRegistry,
serviceId);

final DefaultSchemaInjector schemaInjector = new DefaultSchemaInjector(
new SchemaRegistryTopicSchemaSupplier(serviceContext.getSchemaRegistryClient()));

return new KsqlContext(serviceContext, ksqlConfig, engine, schemaInjector);
return new KsqlContext(
agavra marked this conversation as resolved.
Show resolved Hide resolved
serviceContext,
ksqlConfig,
engine,
sc -> new DefaultSchemaInjector(
new SchemaRegistryTopicSchemaSupplier(sc.getSchemaRegistryClient())),
DefaultTopicInjector::new);
}

@VisibleForTesting
KsqlContext(
final ServiceContext serviceContext,
final KsqlConfig ksqlConfig,
final KsqlEngine ksqlEngine,
final SchemaInjector schemaInjector
final Function<ServiceContext, SchemaInjector> schemaInjectorFactory,
final Function<KsqlExecutionContext, TopicInjector> topicInjectorFactory
) {
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig");
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.schemaInjector = Objects.requireNonNull(schemaInjector, "schemaInjector");
this.schemaInjectorFactory = Objects
.requireNonNull(schemaInjectorFactory, "schemaInjectorFactory");
this.topicInjectorFactory = Objects
.requireNonNull(topicInjectorFactory, "topicInjectorFactory");
}

public ServiceContext getServiceContext() {
Expand All @@ -110,12 +121,25 @@ public List<QueryMetadata> sql(final String sql, final Map<String, Object> overr
final List<ParsedStatement> statements = ksqlEngine.parse(sql);

final KsqlExecutionContext sandbox = ksqlEngine.createSandbox();
final SchemaInjector sandboxSchemaInjector = schemaInjectorFactory
.apply(sandbox.getServiceContext());
final TopicInjector sandboxTopicInjector = topicInjectorFactory.apply(sandbox);
agavra marked this conversation as resolved.
Show resolved Hide resolved

for (ParsedStatement stmt : statements) {
execute(
sandbox,
stmt,
ksqlConfig,
overriddenProperties,
sandboxSchemaInjector,
sandboxTopicInjector);
}

statements.forEach(stmt -> execute(sandbox, stmt, ksqlConfig, overriddenProperties));

final SchemaInjector schemaInjector = schemaInjectorFactory.apply(serviceContext);
final TopicInjector topicInjector = topicInjectorFactory.apply(ksqlEngine);
final List<QueryMetadata> queries = new ArrayList<>();
for (final ParsedStatement parsed : statements) {
execute(ksqlEngine, parsed, ksqlConfig, overriddenProperties)
execute(ksqlEngine, parsed, ksqlConfig, overriddenProperties, schemaInjector, topicInjector)
.getQuery()
.ifPresent(queries::add);
}
Expand Down Expand Up @@ -157,10 +181,13 @@ private ExecuteResult execute(
final KsqlExecutionContext executionContext,
final ParsedStatement stmt,
final KsqlConfig ksqlConfig,
final Map<String, Object> overriddenProperties
) {
final Map<String, Object> overriddenProperties,
final SchemaInjector schemaInjector,
final TopicInjector topicInjector) {
final PreparedStatement<?> prepared = executionContext.prepare(stmt);
final PreparedStatement<?> withSchema = schemaInjector.forStatement(prepared);
return executionContext.execute(withSchema, ksqlConfig, overriddenProperties);
final PreparedStatement<?> withInferredTopic =
topicInjector.forStatement(withSchema, ksqlConfig, overriddenProperties);
return executionContext.execute(withInferredTopic, ksqlConfig, overriddenProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
Expand All @@ -46,6 +47,11 @@ public interface KsqlExecutionContext {
*/
MetaStore getMetaStore();

/**
* @return the service context used for this execution context
*/
ServiceContext getServiceContext();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we should avoid leaking the service context out of the engine context. It leads to tangled code. The fact the engine/execution_context is internally using a service context is an implementation detail that shouldn't, IMHO, leak out of the interface.

It looks like you've added this so that DefaultTopicInjector can get hold of the service context from the engine, but you actually have the service context at the point you're trying to get the injector from the factory.

Maybe just have the factory take both a ServiceContext and MetaStore instance. Then you can just invoke via:

topicInjectorFactory.apply(sandboxedServiceContext, ksqlEngine.getMetastore());

I know it's tempting as its convienient as we already pass around the engine, but it feels wrong to expose it IMHO. Maybe I'm wrong, but it just smells to me. We should aim to keep this interface as succinct as possible.

Copy link
Contributor Author

@agavra agavra Mar 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same instinct as you, in fact that was the way I originally had it. The problem was that not just any sandboxed service context needs to be used - it has to be exactly the same sandboxed context that the sandboxed engine uses. Otherwise it can't validate a sequence of statements that create topics, for example:

0: CREATE STREAM Y AS SELECT * FROM X; 
1: CREATE STREAM Z AS SELECT * FROM Y;

When command 0 runs, it updates the service context with a new topic Y with p partitions and r replicas. When command 1 runs, it expects to be able to read from the topic for Y, but that topic doesn't exist.

So, unfortunately, the coupling is an artifact of where the sandbox is created (deep inside the engine).

To do what you suggest, we can pipe in ServiceContext through the ksqlEngine#createSandbox, into the constructor for SandboxedExecutionContext and then into EngineContext#createSandbox from every place we call createSandbox.

This isn't so bad, but it's tricky to get right (to make sure that we always use the same execution context as the engine we are using). As soon as I thought about that, I decided that it was appropriate to expose the service context because (as it stands) the coupling is actually an attribute of the engine, not just an implementation detail.

If you strongly disagree, I can do one of two things:

  • Expose an Optional<ServiceContext> getSandboxContext which will only return if it is a sandbox (ew 🤢 )
  • Do the refactor that I outlined above but risk loose coupling of the context and the engine

Copy link
Contributor

@big-andy-coates big-andy-coates Mar 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hummm... makes sense. let's see how it looks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The impersonation work that Sergio is doing will mean that each request to the engine will need to come with its own ServiceContext that runs things in the context of the initiating user.

With that in mind, it probably makes sense to not expose the service context from the engine, but instead pass it each time.


/**
* Retrieve the details of a persistent query.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.SchemaUtil;
import io.confluent.ksql.util.StringUtil;
import io.confluent.ksql.util.WithClauseUtil;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -136,34 +137,22 @@ private void setIntoProperties(final Sink sink) {
}

if (sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_PARTITIONS) != null) {
try {
final int numberOfPartitions = Integer.parseInt(
sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_PARTITIONS).toString()
);
analysis.getIntoProperties().put(
KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY,
numberOfPartitions
);
final int numberOfPartitions =
WithClauseUtil.parsePartitions(
sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_PARTITIONS));

} catch (final NumberFormatException e) {
throw new KsqlException(
"Invalid number of partitions in WITH clause: "
+ sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_PARTITIONS).toString());
}
analysis.getIntoProperties().put(
KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY,
numberOfPartitions
);
}

if (sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_REPLICAS) != null) {
try {
final short numberOfReplications =
Short.parseShort(
sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_REPLICAS).toString()
);
analysis.getIntoProperties()
.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, numberOfReplications);
} catch (final NumberFormatException e) {
throw new KsqlException("Invalid number of replications in WITH clause: " + sink
.getProperties().get(KsqlConstants.SINK_NUMBER_OF_REPLICAS).toString());
}
final short numberOfReplications =
WithClauseUtil.parseReplicas(
sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_REPLICAS));
analysis.getIntoProperties()
.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, numberOfReplications);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ public MetaStore getMetaStore() {
return primaryContext.getMetaStore();
}

@Override
public ServiceContext getServiceContext() {
return serviceContext;
}

public DdlCommandExec getDdlCommandExec() {
return primaryContext.getDdlCommandExec();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.util.List;
Expand All @@ -44,6 +45,11 @@ public MetaStore getMetaStore() {
return engineContext.getMetaStore();
}

@Override
public ServiceContext getServiceContext() {
return engineContext.getServiceContext();
}

@Override
public KsqlExecutionContext createSandbox() {
return new SandboxedExecutionContext(engineContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.confluent.ksql.structured.QueryContext;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.topic.TopicProperties;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.QueryIdGenerator;
Expand All @@ -42,6 +43,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.connect.data.Field;
Expand Down Expand Up @@ -141,19 +143,22 @@ public SchemaKStream<?> buildStream(

final KsqlStructuredDataOutputNode noRowKey = outputNodeBuilder.build();
if (doCreateInto) {
final SourceTopicProperties sourceTopicProperties = getSourceTopicProperties(
getTheSourceNode().getStructuredDataSource().getKsqlTopic().getKafkaTopicName(),
outputProperties,
serviceContext.getTopicClient(),
ksqlConfig
);
final Supplier<TopicDescription> sourceTopicDescription = () ->
getSourceTopicPropertiesFromKafka(
getTheSourceNode().getStructuredDataSource().getKsqlTopic().getKafkaTopicName(),
serviceContext.getTopicClient());

createSinkTopic(
noRowKey.getKafkaTopicName(),
serviceContext.getTopicClient(),
shouldBeCompacted(result),
sourceTopicProperties.partitions,
sourceTopicProperties.replicas);
new TopicProperties.Builder()
.withName(noRowKey.getKafkaTopicName())
.withOverrides(outputProperties)
.withKsqlConfig(ksqlConfig)
.withSource(sourceTopicDescription)
.build());
}

result.into(
noRowKey.getKafkaTopicName(),
noRowKey.getKsqlTopic().getKsqlTopicSerDe()
Expand Down Expand Up @@ -236,69 +241,22 @@ private void addAvroSchemaToResultTopic(final Builder builder) {
}

private static void createSinkTopic(
final String kafkaTopicName,
final KafkaTopicClient kafkaTopicClient,
final boolean isCompacted,
final int numberOfPartitions,
final short numberOfReplications
final TopicProperties topicProperties
) {
final Map<String, ?> config = isCompacted
? ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
: Collections.emptyMap();

kafkaTopicClient.createTopic(kafkaTopicName,
numberOfPartitions,
numberOfReplications,
kafkaTopicClient.createTopic(
topicProperties.getTopicName(),
topicProperties.getPartitions(),
topicProperties.getReplicas(),
config
);
}

private static SourceTopicProperties getSourceTopicProperties(
final String kafkaTopicName,
final Map<String, Object> sinkProperties,
final KafkaTopicClient kafkaTopicClient,
final KsqlConfig ksqlConfig
) {
final Map ksqlProperties = ksqlConfig.values();
if (ksqlProperties.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY) != null
|| ksqlProperties.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY) != null) {
return getSinkTopicPropertiesLegacyWay(sinkProperties, ksqlConfig);
}
// Don't request topic properties from Kafka if both are set in WITH clause.
if (sinkProperties.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY) != null
&& sinkProperties.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY) != null) {
return new SourceTopicProperties(
(Integer) sinkProperties.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY),
(Short) sinkProperties.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY)
);
}
final TopicDescription topicDescription = getSourceTopicPropertiesFromKafka(
kafkaTopicName,
kafkaTopicClient);

final int partitions = (Integer) sinkProperties.getOrDefault(
KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY,
topicDescription.partitions().size());
final short replicas = (Short) sinkProperties.getOrDefault(
KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY,
(short) topicDescription.partitions().get(0).replicas().size());

return new SourceTopicProperties(partitions, replicas);
}

private static SourceTopicProperties getSinkTopicPropertiesLegacyWay(
final Map<String, Object> sinkProperties,
final KsqlConfig ksqlConfig
) {
final int partitions = (Integer) sinkProperties.getOrDefault(
KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY,
ksqlConfig.getInt(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY));
final short replicas = (Short) sinkProperties.getOrDefault(
KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY,
ksqlConfig.getShort(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY));
return new SourceTopicProperties(partitions, replicas);
}

private static TopicDescription getSourceTopicPropertiesFromKafka(
final String kafkaTopicName,
final KafkaTopicClient kafkaTopicClient
Expand All @@ -318,17 +276,6 @@ private KsqlTopicSerDe getTopicSerde() {
return ksqlTopic.getKsqlTopicSerDe();
}

private static class SourceTopicProperties {

private final int partitions;
private final short replicas;

SourceTopicProperties(final int partitions, final short replicas) {
this.partitions = partitions;
this.replicas = replicas;
}
}

public static class Builder {

private PlanNodeId id;
Expand Down
Loading