Skip to content

Commit

Permalink
feat: move toTable kstreams calls to plan builder
Browse files Browse the repository at this point in the history
This patch moves converting from a kstream to a ktable to the plan
builder, and cleans up that code (and tests) from SchemaKStream and
DataSourceNode
  • Loading branch information
rodesai committed Sep 13, 2019
1 parent fd112a4 commit cbca611
Show file tree
Hide file tree
Showing 19 changed files with 440 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.structured.SchemaKStream;
import java.util.List;
Expand Down Expand Up @@ -145,12 +143,8 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
return schemaKStream.toTable(
dataSource.getKsqlTopic().getKeyFormat(),
dataSource.getKsqlTopic().getValueFormat(),
builder.buildValueSerde(
dataSource.getKsqlTopic().getValueFormat().getFormatInfo(),
PhysicalSchema.from(getSchema(), SerdeOption.none()),
reduceContextStacker.getQueryContext()
),
reduceContextStacker);
reduceContextStacker,
builder);
}

interface SchemaKStreamFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.streams;

import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.util.KsqlConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Grouped;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.streams;

import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.util.KsqlConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Joined;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.streams;

import io.confluent.ksql.execution.streams.MaterializedFactory;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Objects;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.MaterializedFactory;
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.KsqlAggregateFunction;
import io.confluent.ksql.function.UdafAggregator;
Expand All @@ -38,8 +40,6 @@
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.streams.MaterializedFactory;
import io.confluent.ksql.streams.StreamsUtil;
import io.confluent.ksql.util.KsqlConfig;
import java.time.Duration;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.MaterializedFactory;
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.KsqlAggregateFunction;
import io.confluent.ksql.function.TableAggregationFunction;
Expand All @@ -33,8 +35,6 @@
import io.confluent.ksql.serde.KeySerde;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.streams.MaterializedFactory;
import io.confluent.ksql.streams.StreamsUtil;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.execution.plan.StreamMapValues;
import io.confluent.ksql.execution.plan.StreamSource;
import io.confluent.ksql.execution.plan.StreamToTable;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.StreamMapValuesBuilder;
import io.confluent.ksql.execution.streams.StreamSourceBuilder;
import io.confluent.ksql.execution.streams.StreamToTableBuilder;
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metastore.model.DataSource;
Expand All @@ -56,7 +59,6 @@
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.streams.StreamsFactories;
import io.confluent.ksql.streams.StreamsUtil;
import io.confluent.ksql.util.IdentifierUtil;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.SchemaUtil;
Expand All @@ -67,19 +69,16 @@
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.Topology.AutoOffsetReset;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.KeyValueStore;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
Expand Down Expand Up @@ -223,41 +222,26 @@ public static SchemaKStream<?> forSource(
this.streamsFactories = requireNonNull(streamsFactories);
}

@SuppressWarnings("unchecked")
public SchemaKTable<K> toTable(
final KeyFormat keyFormat,
final ValueFormat valueFormat,
final Serde<GenericRow> valueSerde,
final QueryContext.Stacker contextStacker
final QueryContext.Stacker contextStacker,
final KsqlQueryBuilder queryBuilder
) {
final Materialized<K, GenericRow, KeyValueStore<Bytes, byte[]>> materialized =
streamsFactories.getMaterializedFactory().create(
keySerde,
valueSerde,
StreamsUtil.buildOpName(contextStacker.getQueryContext()));
final KTable<K, GenericRow> ktable = kstream
// 1. mapValues to transform null records into Optional<GenericRow>.EMPTY. We eventually
// need to aggregate the KStream to produce the KTable. However the KStream aggregator
// filters out records with null keys or values. For tables, a null value for a key
// represents that the key was deleted. So we preserve these "tombstone" records by
// converting them to a not-null representation.
.mapValues(Optional::ofNullable)

// 2. Group by the key, so that we can:
.groupByKey()

// 3. Aggregate the KStream into a KTable using a custom aggregator that handles
// Optional.EMPTY
.aggregate(
() -> null,
(k, value, oldValue) -> value.orElse(null),
materialized);
final ExecutionStep<KTable<K, GenericRow>> step = ExecutionStepFactory.streamToTable(
contextStacker,
Formats.of(keyFormat, valueFormat, Collections.emptySet()),
sourceStep
);
final StreamToTable<KStream<K, GenericRow>, KTable<K, GenericRow>> step =
ExecutionStepFactory.streamToTable(
contextStacker,
Formats.of(keyFormat, valueFormat, Collections.emptySet()),
sourceStep
);
return new SchemaKTable<>(
ktable,
StreamToTableBuilder.build(
(KStream) kstream,
(StreamToTable) step,
queryBuilder,
streamsFactories.getMaterializedFactory()
),
step,
keyFormat,
keySerde,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.execution.plan.TableMapValues;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.execution.streams.TableMapValuesBuilder;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
Expand All @@ -40,7 +41,6 @@
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.streams.StreamsFactories;
import io.confluent.ksql.streams.StreamsUtil;
import io.confluent.ksql.util.KsqlConfig;
import java.util.ArrayList;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,26 +532,16 @@ public void shouldBuildTableByConvertingFromStream() {
assertThat(returned, is(table));
}

@Test
@SuppressWarnings("unchecked")
public void shouldBuildReduceSerdeCorrectlyWhenBuildingTable() {
public void shouldPassBuilderWhenBuildingTable() {
// Given:
final DataSourceNode node = buildNodeWithMockSource();

// When:
node.buildStream(ksqlStreamBuilder);

// Then:
verify(ksqlStreamBuilder).buildValueSerde(
eq(FormatInfo.of(Format.JSON)),
eq(PhysicalSchema.from(node.getSchema(), SerdeOption.none())),
queryContextCaptor.capture()
);
assertThat(
queryContextCaptor.getValue().getContext(),
equalTo(ImmutableList.of("0", "reduce"))
);
verify(stream).toTable(any(), any(), same(rowSerde), any());
verify(stream).toTable(any(), any(), any(), same(ksqlStreamBuilder));
}

@Test
Expand All @@ -564,7 +554,7 @@ public void shouldBuildTableWithCorrectContext() {
node.buildStream(ksqlStreamBuilder);

// Then:
verify(stream).toTable(any(), any(), any(), stackerCaptor.capture());
verify(stream).toTable(any(), any(), stackerCaptor.capture(), any());
assertThat(
stackerCaptor.getValue().getQueryContext().getContext(),
equalTo(ImmutableList.of("0", "reduce")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.streams.MaterializedFactory;
import io.confluent.ksql.streams.StreamsUtil;
import io.confluent.ksql.execution.streams.MaterializedFactory;
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.util.KsqlConfig;
import java.time.Duration;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@
import io.confluent.ksql.serde.KeySerde;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.streams.MaterializedFactory;
import io.confluent.ksql.streams.StreamsUtil;
import io.confluent.ksql.execution.streams.MaterializedFactory;
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.testutils.AnalysisTestUtil;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.streams.GroupedFactory;
import io.confluent.ksql.streams.JoinedFactory;
import io.confluent.ksql.streams.MaterializedFactory;
import io.confluent.ksql.execution.streams.MaterializedFactory;
import io.confluent.ksql.streams.StreamsFactories;
import io.confluent.ksql.streams.StreamsUtil;
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.structured.SchemaKStream.Type;
import io.confluent.ksql.testutils.AnalysisTestUtil;
import io.confluent.ksql.util.KsqlConfig;
Expand Down Expand Up @@ -734,8 +734,8 @@ public void shouldBuildStepForToTable() {
final SchemaKTable result = initialSchemaKStream.toTable(
keyFormat,
valueFormat,
leftSerde,
childContextStacker
childContextStacker,
queryBuilder
);

// Then:
Expand Down Expand Up @@ -765,8 +765,8 @@ public void shouldConvertToTableWithCorrectProperties() {
final SchemaKTable result = initialSchemaKStream.toTable(
keyFormat,
valueFormat,
leftSerde,
childContextStacker
childContextStacker,
queryBuilder
);

// Then:
Expand All @@ -776,53 +776,6 @@ public void shouldConvertToTableWithCorrectProperties() {
assertThat(result.getKtable(), is(table));
}

@Test
public void shouldConvertToOptionalBeforeGroupingInToTable() {
// Given:
givenInitialSchemaKStreamUsesMocks();
when(mockKStream.mapValues(any(ValueMapper.class))).thenReturn(mockKStream);
final KGroupedStream groupedStream = mock(KGroupedStream.class);
final KTable table = mock(KTable.class);
when(mockKStream.groupByKey()).thenReturn(groupedStream);
when(groupedStream.aggregate(any(), any(), any())).thenReturn(table);

// When:
initialSchemaKStream.toTable(keyFormat, valueFormat, leftSerde, childContextStacker);

// Then:
InOrder inOrder = Mockito.inOrder(mockKStream);
final ArgumentCaptor<ValueMapper> captor = ArgumentCaptor.forClass(ValueMapper.class);
inOrder.verify(mockKStream).mapValues(captor.capture());
inOrder.verify(mockKStream).groupByKey();
assertThat(captor.getValue().apply(null), equalTo(Optional.empty()));
final GenericRow nonNull = new GenericRow(1, 2, 3);
assertThat(captor.getValue().apply(nonNull), equalTo(Optional.of(nonNull)));
}

@Test
public void shouldComputeAggregateCorrectlyInToTable() {
// Given:
givenInitialSchemaKStreamUsesMocks();
when(mockKStream.mapValues(any(ValueMapper.class))).thenReturn(mockKStream);
final KGroupedStream groupedStream = mock(KGroupedStream.class);
final KTable table = mock(KTable.class);
when(mockKStream.groupByKey()).thenReturn(groupedStream);
when(groupedStream.aggregate(any(), any(), any())).thenReturn(table);

// When:
initialSchemaKStream.toTable(keyFormat, valueFormat, leftSerde, childContextStacker);

// Then:
final ArgumentCaptor<Initializer> initCaptor = ArgumentCaptor.forClass(Initializer.class);
final ArgumentCaptor<Aggregator> captor = ArgumentCaptor.forClass(Aggregator.class);
verify(groupedStream)
.aggregate(initCaptor.capture(), captor.capture(), same(mockMaterialized));
assertThat(initCaptor.getValue().apply(), is(nullValue()));
assertThat(captor.getValue().apply(null, Optional.empty(), null), is(nullValue()));
final GenericRow nonNull = new GenericRow(1, 2, 3);
assertThat(captor.getValue().apply(null, Optional.of(nonNull), null), is(nonNull));
}

@SuppressWarnings("unchecked")
@Test
public void shouldPerformStreamToStreamLeftJoin() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.streams.GroupedFactory;
import io.confluent.ksql.streams.JoinedFactory;
import io.confluent.ksql.streams.MaterializedFactory;
import io.confluent.ksql.execution.streams.MaterializedFactory;
import io.confluent.ksql.streams.StreamsFactories;
import io.confluent.ksql.streams.StreamsUtil;
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.structured.SchemaKStream.Type;
import io.confluent.ksql.testutils.AnalysisTestUtil;
import io.confluent.ksql.util.KsqlConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,19 @@ public ExecutionStepProperties getProperties() {
return properties;
}


@Override
public List<ExecutionStep<?>> getSources() {
return ImmutableList.of(source);
}

public ExecutionStep<S> getSource() {
return source;
}

public Formats getFormats() {
return formats;
}

@Override
public T build(final KsqlQueryBuilder builder) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.streams;
package io.confluent.ksql.execution.streams;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.util.KsqlConfig;
Expand Down
Loading

0 comments on commit cbca611

Please sign in to comment.