Skip to content

Commit

Permalink
feat: move setup of the sink to plan builders (#3360)
Browse files Browse the repository at this point in the history
This patch moves stream and table sinks (SchemaKX.into) into execution
plan builders. This change also moves computation of the rowkey/rowtime
indexes to be excluded from the output into the plan builder.
  • Loading branch information
rodesai authored Sep 20, 2019
1 parent a292f05 commit bfbdc20
Show file tree
Hide file tree
Showing 16 changed files with 768 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,21 @@
import static java.util.Objects.requireNonNull;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Streams;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.serde.KeySerde;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKStream.Type;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.QueryIdGenerator;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.streams.kstream.KStream;

public class KsqlStructuredDataOutputNode extends OutputNode {

Expand All @@ -56,7 +42,6 @@ public class KsqlStructuredDataOutputNode extends OutputNode {
private final Optional<String> partitionByField;
private final boolean doCreateInto;
private final Set<SerdeOption> serdeOptions;
private final Set<Integer> implicitAndKeyFieldIndexes;

public KsqlStructuredDataOutputNode(
final PlanNodeId id,
Expand Down Expand Up @@ -88,7 +73,6 @@ public KsqlStructuredDataOutputNode(
this.ksqlTopic = requireNonNull(ksqlTopic, "ksqlTopic");
this.partitionByField = Objects.requireNonNull(partitionByField, "partitionByField");
this.doCreateInto = doCreateInto;
this.implicitAndKeyFieldIndexes = implicitAndKeyColumnIndexesInValueSchema(schema);

validatePartitionByField();
}
Expand Down Expand Up @@ -134,20 +118,13 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
contextStacker
);

final Serde<GenericRow> outputRowSerde = builder.buildValueSerde(
getKsqlTopic().getValueFormat().getFormatInfo(),
PhysicalSchema.from(getSchema(), serdeOptions),
contextStacker.getQueryContext()
);

return result.into(
getKsqlTopic().getKafkaTopicName(),
outputRowSerde,
getSchema(),
getKsqlTopic().getValueFormat(),
serdeOptions,
implicitAndKeyFieldIndexes,
contextStacker
contextStacker,
builder
);
}

Expand Down Expand Up @@ -190,36 +167,4 @@ private void validatePartitionByField() {
throw new IllegalArgumentException("keyField must match partition by field");
}
}

@SuppressWarnings("UnstableApiUsage")
private static Set<Integer> implicitAndKeyColumnIndexesInValueSchema(final LogicalSchema schema) {
final ConnectSchema valueSchema = schema.valueConnectSchema();

final Stream<Column> cols = Streams.concat(
schema.metadata().stream(),
schema.key().stream()
);

return cols
.map(Column::name)
.map(valueSchema::field)
.filter(Objects::nonNull)
.map(org.apache.kafka.connect.data.Field::index)
.collect(Collectors.toSet());
}

interface SinkFactory<K> {

SchemaKStream create(
KStream<K, GenericRow> kstream,
LogicalSchema schema,
KeySerde<K> keySerde,
KeyField keyField,
List<SchemaKStream> sourceSchemaKStreams,
Type type,
KsqlConfig ksqlConfig,
FunctionRegistry functionRegistry,
QueryContext queryContext
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,15 @@ private KTable aggregateWindowed(
}

private KeyFormat getKeyFormat(final WindowExpression windowExpression) {
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_WINDOWED_SESSION_KEY_LEGACY_CONFIG)) {
return KeyFormat.windowed(
FormatInfo.of(Format.KAFKA),
WindowInfo.of(
WindowType.TUMBLING,
Optional.of(Duration.ofMillis(Long.MAX_VALUE))
)
);
}
return KeyFormat.windowed(
FormatInfo.of(Format.KAFKA),
windowExpression.getKsqlWindowExpression().getWindowInfo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@
import io.confluent.ksql.execution.plan.StreamGroupByKey;
import io.confluent.ksql.execution.plan.StreamMapValues;
import io.confluent.ksql.execution.plan.StreamSelectKey;
import io.confluent.ksql.execution.plan.StreamSink;
import io.confluent.ksql.execution.plan.StreamSource;
import io.confluent.ksql.execution.plan.StreamToTable;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.StreamFilterBuilder;
import io.confluent.ksql.execution.streams.StreamGroupByBuilder;
import io.confluent.ksql.execution.streams.StreamMapValuesBuilder;
import io.confluent.ksql.execution.streams.StreamSelectKeyBuilder;
import io.confluent.ksql.execution.streams.StreamSinkBuilder;
import io.confluent.ksql.execution.streams.StreamSourceBuilder;
import io.confluent.ksql.execution.streams.StreamToTableBuilder;
import io.confluent.ksql.execution.streams.StreamsUtil;
Expand Down Expand Up @@ -80,7 +82,6 @@
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;

Expand Down Expand Up @@ -273,35 +274,28 @@ public SchemaKStream<K> withKeyField(final KeyField resultKeyField) {
);
}

@SuppressWarnings("unchecked")
public SchemaKStream<K> into(
final String kafkaTopicName,
final Serde<GenericRow> topicValueSerDe,
final LogicalSchema outputSchema,
final ValueFormat valueFormat,
final Set<SerdeOption> options,
final Set<Integer> rowkeyIndexes,
final QueryContext.Stacker contextStacker
final QueryContext.Stacker contextStacker,
final KsqlQueryBuilder queryBuilder
) {
kstream
.mapValues(row -> {
if (row == null) {
return null;
}
final List<Object> columns = new ArrayList<>();
for (int i = 0; i < row.getColumns().size(); i++) {
if (!rowkeyIndexes.contains(i)) {
columns.add(row.getColumns().get(i));
}
}
return new GenericRow(columns);
}).to(kafkaTopicName, Produced.with(keySerde, topicValueSerDe));
final ExecutionStep<KStream<K, GenericRow>> step = ExecutionStepFactory.streamSink(
final StreamSink<K> step = ExecutionStepFactory.streamSink(
contextStacker,
outputSchema,
Formats.of(keyFormat, valueFormat, options),
sourceStep,
kafkaTopicName
);
StreamSinkBuilder.build(
kstream,
step,
(fmt, schema, ctx) -> keySerde,
queryBuilder
);
return new SchemaKStream<>(
kstream,
step,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import io.confluent.ksql.execution.plan.TableFilter;
import io.confluent.ksql.execution.plan.TableGroupBy;
import io.confluent.ksql.execution.plan.TableMapValues;
import io.confluent.ksql.execution.plan.TableSink;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.TableFilterBuilder;
import io.confluent.ksql.execution.streams.TableGroupByBuilder;
import io.confluent.ksql.execution.streams.TableMapValuesBuilder;
import io.confluent.ksql.execution.streams.TableSinkBuilder;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.model.KeyField;
Expand All @@ -44,17 +46,14 @@
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.streams.StreamsFactories;
import io.confluent.ksql.util.KsqlConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public class SchemaKTable<K> extends SchemaKStream<K> {
Expand Down Expand Up @@ -117,38 +116,28 @@ public SchemaKTable(
}

@Override
@SuppressWarnings("unchecked")
public SchemaKTable<K> into(
final String kafkaTopicName,
final Serde<GenericRow> topicValueSerDe,
final LogicalSchema outputSchema,
final ValueFormat valueFormat,
final Set<SerdeOption> options,
final Set<Integer> rowkeyIndexes,
final QueryContext.Stacker contextStacker
final QueryContext.Stacker contextStacker,
final KsqlQueryBuilder builder
) {

ktable.toStream()
.mapValues(row -> {
if (row == null) {
return null;
}
final List<Object> columns = new ArrayList<>();
for (int i = 0; i < row.getColumns().size(); i++) {
if (!rowkeyIndexes.contains(i)) {
columns.add(row.getColumns().get(i));
}
}
return new GenericRow(columns);
}
).to(kafkaTopicName, Produced.with(keySerde, topicValueSerDe));

final ExecutionStep<KTable<K, GenericRow>> step = ExecutionStepFactory.tableSink(
final TableSink<K> step = ExecutionStepFactory.tableSink(
contextStacker,
outputSchema,
sourceTableStep,
Formats.of(keyFormat, valueFormat, options),
kafkaTopicName
);
TableSinkBuilder.build(
ktable,
step,
(fmt, schema, ctx) -> keySerde,
builder
);
return new SchemaKTable<>(
ktable,
step,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ public void before() {
new QueryContext.Stacker(queryId)
.push(inv.getArgument(0).toString()));

when(ksqlStreamBuilder.buildKeySerde(any(), any(), any())).thenReturn((KeySerde)keySerde);
when(ksqlStreamBuilder.buildKeySerde(any(), any(), any()))
.thenReturn((KeySerde)keySerde);
when(ksqlStreamBuilder.buildKeySerde(any(), any(), any(), any())).thenReturn((KeySerde)keySerde);
when(ksqlStreamBuilder.buildValueSerde(any(), any(), any())).thenReturn(rowSerde);
when(ksqlStreamBuilder.getFunctionRegistry()).thenReturn(functionRegistry);
Expand Down
Loading

0 comments on commit bfbdc20

Please sign in to comment.