Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move setup of the sink to plan builders #3360

Merged
merged 3 commits into from
Sep 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,21 @@
import static java.util.Objects.requireNonNull;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Streams;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.serde.KeySerde;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKStream.Type;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.QueryIdGenerator;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.streams.kstream.KStream;

public class KsqlStructuredDataOutputNode extends OutputNode {

Expand All @@ -56,7 +42,6 @@ public class KsqlStructuredDataOutputNode extends OutputNode {
private final Optional<String> partitionByField;
private final boolean doCreateInto;
private final Set<SerdeOption> serdeOptions;
private final Set<Integer> implicitAndKeyFieldIndexes;

public KsqlStructuredDataOutputNode(
final PlanNodeId id,
Expand Down Expand Up @@ -88,7 +73,6 @@ public KsqlStructuredDataOutputNode(
this.ksqlTopic = requireNonNull(ksqlTopic, "ksqlTopic");
this.partitionByField = Objects.requireNonNull(partitionByField, "partitionByField");
this.doCreateInto = doCreateInto;
this.implicitAndKeyFieldIndexes = implicitAndKeyColumnIndexesInValueSchema(schema);

validatePartitionByField();
}
Expand Down Expand Up @@ -134,20 +118,13 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
contextStacker
);

final Serde<GenericRow> outputRowSerde = builder.buildValueSerde(
getKsqlTopic().getValueFormat().getFormatInfo(),
PhysicalSchema.from(getSchema(), serdeOptions),
contextStacker.getQueryContext()
);

return result.into(
getKsqlTopic().getKafkaTopicName(),
outputRowSerde,
getSchema(),
getKsqlTopic().getValueFormat(),
serdeOptions,
implicitAndKeyFieldIndexes,
contextStacker
contextStacker,
builder
);
}

Expand Down Expand Up @@ -190,36 +167,4 @@ private void validatePartitionByField() {
throw new IllegalArgumentException("keyField must match partition by field");
}
}

@SuppressWarnings("UnstableApiUsage")
private static Set<Integer> implicitAndKeyColumnIndexesInValueSchema(final LogicalSchema schema) {
final ConnectSchema valueSchema = schema.valueConnectSchema();

final Stream<Column> cols = Streams.concat(
schema.metadata().stream(),
schema.key().stream()
);

return cols
.map(Column::name)
.map(valueSchema::field)
.filter(Objects::nonNull)
.map(org.apache.kafka.connect.data.Field::index)
.collect(Collectors.toSet());
}

interface SinkFactory<K> {

SchemaKStream create(
KStream<K, GenericRow> kstream,
LogicalSchema schema,
KeySerde<K> keySerde,
KeyField keyField,
List<SchemaKStream> sourceSchemaKStreams,
Type type,
KsqlConfig ksqlConfig,
FunctionRegistry functionRegistry,
QueryContext queryContext
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,15 @@ private KTable aggregateWindowed(
}

private KeyFormat getKeyFormat(final WindowExpression windowExpression) {
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_WINDOWED_SESSION_KEY_LEGACY_CONFIG)) {
return KeyFormat.windowed(
FormatInfo.of(Format.KAFKA),
WindowInfo.of(
WindowType.TUMBLING,
Optional.of(Duration.ofMillis(Long.MAX_VALUE))
)
);
}
return KeyFormat.windowed(
FormatInfo.of(Format.KAFKA),
windowExpression.getKsqlWindowExpression().getWindowInfo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@
import io.confluent.ksql.execution.plan.StreamGroupByKey;
import io.confluent.ksql.execution.plan.StreamMapValues;
import io.confluent.ksql.execution.plan.StreamSelectKey;
import io.confluent.ksql.execution.plan.StreamSink;
import io.confluent.ksql.execution.plan.StreamSource;
import io.confluent.ksql.execution.plan.StreamToTable;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.StreamFilterBuilder;
import io.confluent.ksql.execution.streams.StreamGroupByBuilder;
import io.confluent.ksql.execution.streams.StreamMapValuesBuilder;
import io.confluent.ksql.execution.streams.StreamSelectKeyBuilder;
import io.confluent.ksql.execution.streams.StreamSinkBuilder;
import io.confluent.ksql.execution.streams.StreamSourceBuilder;
import io.confluent.ksql.execution.streams.StreamToTableBuilder;
import io.confluent.ksql.execution.streams.StreamsUtil;
Expand Down Expand Up @@ -80,7 +82,6 @@
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;

Expand Down Expand Up @@ -273,35 +274,28 @@ public SchemaKStream<K> withKeyField(final KeyField resultKeyField) {
);
}

@SuppressWarnings("unchecked")
public SchemaKStream<K> into(
final String kafkaTopicName,
final Serde<GenericRow> topicValueSerDe,
final LogicalSchema outputSchema,
final ValueFormat valueFormat,
final Set<SerdeOption> options,
final Set<Integer> rowkeyIndexes,
final QueryContext.Stacker contextStacker
final QueryContext.Stacker contextStacker,
final KsqlQueryBuilder queryBuilder
) {
kstream
.mapValues(row -> {
if (row == null) {
return null;
}
final List<Object> columns = new ArrayList<>();
for (int i = 0; i < row.getColumns().size(); i++) {
if (!rowkeyIndexes.contains(i)) {
columns.add(row.getColumns().get(i));
}
}
return new GenericRow(columns);
}).to(kafkaTopicName, Produced.with(keySerde, topicValueSerDe));
final ExecutionStep<KStream<K, GenericRow>> step = ExecutionStepFactory.streamSink(
final StreamSink<K> step = ExecutionStepFactory.streamSink(
contextStacker,
outputSchema,
Formats.of(keyFormat, valueFormat, options),
sourceStep,
kafkaTopicName
);
StreamSinkBuilder.build(
kstream,
step,
(fmt, schema, ctx) -> keySerde,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we build the streams app from a visitor, we'll actually get a factory from the source node. For now, we can just pass in the serde we have.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think that the Logical model shouldn't know anything about serializers. The source node should know about schemas, format and serde options only. The physical builder should be able to convert these to a serde, or a serde factory.

Copy link
Contributor Author

@rodesai rodesai Sep 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@big-andy-coates that's what I mean. By source node I mean the "source execution step of this step". So the physical builder will do something like:

class KStreamAndKeySerdeFactory<K> {
    KStream<K, GenericRow> kstream;
    KeySerdeFactory<K> factory;
}

KStreamAndKeySerdeFactory<K> visitStreamSink(final StreamSink<K> streamSink) {
    final KStreamAndKeySerdeFactory<K> k = streamSink.getSource().accept(this);
    StreamSinkBuilder.build(streamSink.kstream, streamSink, streamSink.factory, ...);
    ...
}

queryBuilder
);
return new SchemaKStream<>(
kstream,
step,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import io.confluent.ksql.execution.plan.TableFilter;
import io.confluent.ksql.execution.plan.TableGroupBy;
import io.confluent.ksql.execution.plan.TableMapValues;
import io.confluent.ksql.execution.plan.TableSink;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.TableFilterBuilder;
import io.confluent.ksql.execution.streams.TableGroupByBuilder;
import io.confluent.ksql.execution.streams.TableMapValuesBuilder;
import io.confluent.ksql.execution.streams.TableSinkBuilder;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.model.KeyField;
Expand All @@ -44,17 +46,14 @@
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.streams.StreamsFactories;
import io.confluent.ksql.util.KsqlConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public class SchemaKTable<K> extends SchemaKStream<K> {
Expand Down Expand Up @@ -117,38 +116,28 @@ public SchemaKTable(
}

@Override
@SuppressWarnings("unchecked")
public SchemaKTable<K> into(
final String kafkaTopicName,
final Serde<GenericRow> topicValueSerDe,
final LogicalSchema outputSchema,
final ValueFormat valueFormat,
final Set<SerdeOption> options,
final Set<Integer> rowkeyIndexes,
final QueryContext.Stacker contextStacker
final QueryContext.Stacker contextStacker,
final KsqlQueryBuilder builder
) {

ktable.toStream()
.mapValues(row -> {
if (row == null) {
return null;
}
final List<Object> columns = new ArrayList<>();
for (int i = 0; i < row.getColumns().size(); i++) {
if (!rowkeyIndexes.contains(i)) {
columns.add(row.getColumns().get(i));
}
}
return new GenericRow(columns);
}
).to(kafkaTopicName, Produced.with(keySerde, topicValueSerDe));

final ExecutionStep<KTable<K, GenericRow>> step = ExecutionStepFactory.tableSink(
final TableSink<K> step = ExecutionStepFactory.tableSink(
contextStacker,
outputSchema,
sourceTableStep,
Formats.of(keyFormat, valueFormat, options),
kafkaTopicName
);
TableSinkBuilder.build(
ktable,
step,
(fmt, schema, ctx) -> keySerde,
builder
);
return new SchemaKTable<>(
ktable,
step,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ public void before() {
new QueryContext.Stacker(queryId)
.push(inv.getArgument(0).toString()));

when(ksqlStreamBuilder.buildKeySerde(any(), any(), any())).thenReturn((KeySerde)keySerde);
when(ksqlStreamBuilder.buildKeySerde(any(), any(), any()))
.thenReturn((KeySerde)keySerde);
when(ksqlStreamBuilder.buildKeySerde(any(), any(), any(), any())).thenReturn((KeySerde)keySerde);
when(ksqlStreamBuilder.buildValueSerde(any(), any(), any())).thenReturn(rowSerde);
when(ksqlStreamBuilder.getFunctionRegistry()).thenReturn(functionRegistry);
Expand Down
Loading