Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
rodesai committed Sep 18, 2019
1 parent f6212f9 commit ea95693
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.Topology.AutoOffsetReset;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
Expand Down Expand Up @@ -824,15 +823,15 @@ public SchemaKGroupedStream groupBy(
throw new UnsupportedOperationException("Group by on windowed should always require rekey");
}
final KeySerde<Struct> structKeySerde = (KeySerde) keySerde;
final StreamGroupByKey<KStream<K, GenericRow>, KGroupedStream<Struct, GenericRow>> step =
final StreamGroupByKey step =
ExecutionStepFactory.streamGroupByKey(
contextStacker,
sourceStep,
(ExecutionStep) sourceStep,
Formats.of(rekeyedKeyFormat, valueFormat, SerdeOption.none())
);
return new SchemaKGroupedStream(
StreamGroupByBuilder.build(
(KStream<Object, GenericRow>) kstream,
(KStream) kstream,
step,
queryBuilder,
streamsFactories.getGroupedFactory()
Expand All @@ -856,16 +855,15 @@ public SchemaKGroupedStream groupBy(
final Optional<String> newKeyCol = getSchema().findValueColumn(aggregateKeyName)
.map(Column::name);

final StreamGroupBy<KStream<K, GenericRow>, KGroupedStream<Struct, GenericRow>> source =
ExecutionStepFactory.streamGroupBy(
contextStacker,
sourceStep,
Formats.of(rekeyedKeyFormat, valueFormat, SerdeOption.none()),
groupByExpressions
);
final StreamGroupBy<K> source = ExecutionStepFactory.streamGroupBy(
contextStacker,
sourceStep,
Formats.of(rekeyedKeyFormat, valueFormat, SerdeOption.none()),
groupByExpressions
);
return new SchemaKGroupedStream(
StreamGroupByBuilder.build(
(KStream<Object, GenericRow>) kstream,
kstream,
source,
queryBuilder,
streamsFactories.getGroupedFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
Expand Down Expand Up @@ -259,16 +258,15 @@ public SchemaKGroupedStream groupBy(
final Optional<String> newKeyField =
getSchema().findValueColumn(aggregateKeyName).map(Column::fullName);

final TableGroupBy<KTable<K, GenericRow>, KGroupedTable<Struct, GenericRow>> step =
ExecutionStepFactory.tableGroupBy(
contextStacker,
sourceTableStep,
Formats.of(groupedKeyFormat, valueFormat, SerdeOption.none()),
groupByExpressions
);
final TableGroupBy<K> step = ExecutionStepFactory.tableGroupBy(
contextStacker,
sourceTableStep,
Formats.of(groupedKeyFormat, valueFormat, SerdeOption.none()),
groupByExpressions
);
return new SchemaKGroupedTable(
TableGroupByBuilder.build(
(KTable<Object, GenericRow>) ktable,
ktable,
step,
queryBuilder,
streamsFactories.getGroupedFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.confluent.ksql.execution.plan;

import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.util.List;

public interface ExecutionStep<S> {
Expand All @@ -23,4 +24,8 @@ public interface ExecutionStep<S> {
List<ExecutionStep<?>> getSources();

S build(KsqlQueryBuilder queryBuilder);

default LogicalSchema getSchema() {
return getProperties().getSchema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,26 @@
package io.confluent.ksql.execution.plan;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.expression.tree.Expression;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;

@Immutable
public class StreamGroupBy<S, G> implements ExecutionStep<G> {
public class StreamGroupBy<K> implements ExecutionStep<KGroupedStream<Struct, GenericRow>> {
private final ExecutionStepProperties properties;
private final ExecutionStep<S> source;
private final ExecutionStep<KStream<K, GenericRow>> source;
private final Formats formats;
private final List<Expression> groupByExpressions;

public StreamGroupBy(
final ExecutionStepProperties properties,
final ExecutionStep<S> source,
final ExecutionStep<KStream<K, GenericRow>> source,
final Formats formats,
final List<Expression> groupByExpressions) {
this.properties = Objects.requireNonNull(properties, "properties");
Expand All @@ -57,8 +61,12 @@ public Formats getFormats() {
return formats;
}

public ExecutionStep<KStream<K, GenericRow>> getSource() {
return source;
}

@Override
public G build(final KsqlQueryBuilder streamsBuilder) {
public KGroupedStream<Struct, GenericRow> build(final KsqlQueryBuilder streamsBuilder) {
throw new UnsupportedOperationException();
}

Expand All @@ -70,7 +78,7 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
final StreamGroupBy<?, ?> that = (StreamGroupBy<?, ?>) o;
final StreamGroupBy that = (StreamGroupBy) o;
return Objects.equals(properties, that.properties)
&& Objects.equals(source, that.source)
&& Objects.equals(formats, that.formats)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,24 @@
package io.confluent.ksql.execution.plan;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;

@Immutable
public class StreamGroupByKey<S, G> implements ExecutionStep<G> {
public class StreamGroupByKey implements ExecutionStep<KGroupedStream<Struct, GenericRow>> {
private final ExecutionStepProperties properties;
private final ExecutionStep<S> source;
private final ExecutionStep<KStream<Struct, GenericRow>> source;
private final Formats formats;

public StreamGroupByKey(
final ExecutionStepProperties properties,
final ExecutionStep<S> source,
final ExecutionStep<KStream<Struct, GenericRow>> source,
final Formats formats) {
this.properties = Objects.requireNonNull(properties, "properties");
this.formats = Objects.requireNonNull(formats, "formats");
Expand All @@ -45,7 +49,7 @@ public List<ExecutionStep<?>> getSources() {
return Collections.singletonList(source);
}

public ExecutionStep<S> getSource() {
public ExecutionStep<KStream<Struct, GenericRow>> getSource() {
return source;
}

Expand All @@ -54,7 +58,7 @@ public Formats getFormats() {
}

@Override
public G build(final KsqlQueryBuilder streamsBuilder) {
public KGroupedStream<Struct, GenericRow> build(final KsqlQueryBuilder streamsBuilder) {
throw new UnsupportedOperationException();
}

Expand All @@ -66,7 +70,7 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
final StreamGroupByKey<?, ?> that = (StreamGroupByKey<?, ?>) o;
final StreamGroupByKey that = (StreamGroupByKey) o;
return Objects.equals(properties, that.properties)
&& Objects.equals(source, that.source)
&& Objects.equals(formats, that.formats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,26 @@
package io.confluent.ksql.execution.plan;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.expression.tree.Expression;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KTable;

@Immutable
public class TableGroupBy<T, G> implements ExecutionStep<G> {
public class TableGroupBy<K> implements ExecutionStep<KGroupedTable<Struct, GenericRow>> {
private final ExecutionStepProperties properties;
private final ExecutionStep<T> source;
private final ExecutionStep<KTable<K, GenericRow>> source;
private final Formats formats;
private final List<Expression> groupByExpressions;

public TableGroupBy(
final ExecutionStepProperties properties,
final ExecutionStep<T> source,
final ExecutionStep<KTable<K, GenericRow>> source,
final Formats formats,
final List<Expression> groupByExpressions
) {
Expand Down Expand Up @@ -58,8 +62,12 @@ public List<Expression> getGroupByExpressions() {
return groupByExpressions;
}

public ExecutionStep<KTable<K, GenericRow>> getSource() {
return source;
}

@Override
public G build(final KsqlQueryBuilder builder) {
public KGroupedTable<Struct, GenericRow> build(final KsqlQueryBuilder builder) {
throw new UnsupportedOperationException();
}

Expand All @@ -71,7 +79,7 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
final TableGroupBy<?, ?> that = (TableGroupBy<?, ?>) o;
final TableGroupBy<?> that = (TableGroupBy<?>) o;
return Objects.equals(properties, that.properties)
&& Objects.equals(source, that.source)
&& Objects.equals(formats, that.formats)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,11 @@ public static <K> TableTableJoin<KTable<K, GenericRow>> tableTableJoin(
);
}

public static <K> StreamGroupBy<KStream<K, GenericRow>, KGroupedStream<Struct, GenericRow>>
streamGroupBy(
final QueryContext.Stacker stacker,
final ExecutionStep<KStream<K, GenericRow>> sourceStep,
final Formats format,
final List<Expression> groupingExpressions
public static <K> StreamGroupBy<K> streamGroupBy(
final QueryContext.Stacker stacker,
final ExecutionStep<KStream<K, GenericRow>> sourceStep,
final Formats format,
final List<Expression> groupingExpressions
) {
final QueryContext queryContext = stacker.getQueryContext();
return new StreamGroupBy<>(
Expand All @@ -336,14 +335,13 @@ public static <K> TableTableJoin<KTable<K, GenericRow>> tableTableJoin(
);
}

public static <K> StreamGroupByKey<KStream<K, GenericRow>, KGroupedStream<Struct, GenericRow>>
streamGroupByKey(
final QueryContext.Stacker stacker,
final ExecutionStep<KStream<K, GenericRow>> sourceStep,
final Formats formats
public static StreamGroupByKey streamGroupByKey(
final QueryContext.Stacker stacker,
final ExecutionStep<KStream<Struct, GenericRow>> sourceStep,
final Formats formats
) {
final QueryContext queryContext = stacker.getQueryContext();
return new StreamGroupByKey<>(
return new StreamGroupByKey(
sourceStep.getProperties().withQueryContext(queryContext),
sourceStep,
formats
Expand All @@ -369,12 +367,11 @@ public static <K> TableTableJoin<KTable<K, GenericRow>> tableTableJoin(
);
}

public static <K> TableGroupBy<KTable<K, GenericRow>, KGroupedTable<Struct, GenericRow>>
tableGroupBy(
final QueryContext.Stacker stacker,
final ExecutionStep<KTable<K, GenericRow>> sourceStep,
final Formats format,
final List<Expression> groupingExpressions
public static <K> TableGroupBy<K> tableGroupBy(
final QueryContext.Stacker stacker,
final ExecutionStep<KTable<K, GenericRow>> sourceStep,
final Formats format,
final List<Expression> groupingExpressions
) {
final QueryContext queryContext = stacker.getQueryContext();
return new TableGroupBy<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class GroupByMapper implements KeyValueMapper<Object, GenericRow, Object> {
class GroupByMapper<K> implements KeyValueMapper<K, GenericRow, Struct> {

private static final Logger LOG = LoggerFactory.getLogger(GroupByMapper.class);

Expand All @@ -43,7 +44,7 @@ class GroupByMapper implements KeyValueMapper<Object, GenericRow, Object> {
}

@Override
public Object apply(final Object key, final GenericRow row) {
public Struct apply(final K key, final GenericRow row) {
final String stringRowKey = IntStream.range(0, expressions.size())
.mapToObj(idx -> processColumn(idx, expressions.get(idx), row))
.collect(Collectors.joining(GROUP_BY_VALUE_SEPARATOR));
Expand Down
Loading

0 comments on commit ea95693

Please sign in to comment.