Skip to content

Commit

Permalink
feat: move selectKey impl to plan builder (#3362)
Browse files Browse the repository at this point in the history
Moves rekeying from SchemaKStream and into an execution plan builder.
  • Loading branch information
rodesai authored Sep 18, 2019
1 parent a909737 commit f312fcc
Show file tree
Hide file tree
Showing 10 changed files with 342 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.codegen.ExpressionMetadata;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.util.StructKeyUtil;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,17 @@
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.execution.plan.StreamFilter;
import io.confluent.ksql.execution.plan.StreamMapValues;
import io.confluent.ksql.execution.plan.StreamSelectKey;
import io.confluent.ksql.execution.plan.StreamSource;
import io.confluent.ksql.execution.plan.StreamToTable;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.StreamFilterBuilder;
import io.confluent.ksql.execution.streams.StreamMapValuesBuilder;
import io.confluent.ksql.execution.streams.StreamSelectKeyBuilder;
import io.confluent.ksql.execution.streams.StreamSourceBuilder;
import io.confluent.ksql.execution.streams.StreamToTableBuilder;
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.KeyField;
Expand Down Expand Up @@ -697,34 +700,19 @@ public SchemaKStream<Struct> selectKey(
);
}

final int keyIndexInValue = getSchema().valueColumnIndex(proposedKey.fullName())
.orElseThrow(IllegalStateException::new);

final KStream keyedKStream = kstream
.filter((key, value) -> value != null && extractColumn(keyIndexInValue, value) != null)
.selectKey((key, value) ->
StructKeyUtil.asStructKey(extractColumn(keyIndexInValue, value).toString()))
.mapValues((key, row) -> {
if (updateRowKey) {
final Object rowKey = key.get(key.schema().fields().get(0));
row.getColumns().set(SchemaUtil.ROWKEY_INDEX, rowKey);
}
return row;
});

final KeyField newKeyField = getSchema().isMetaColumn(fieldName)
? resultantKeyField.withName(Optional.empty())
: resultantKeyField;

final KeySerde<Struct> selectKeySerde = keySerde.rebind(StructKeyUtil.ROWKEY_SERIALIZED_SCHEMA);
final ExecutionStep<KStream<K, GenericRow>> step = ExecutionStepFactory.streamSelectKey(
final StreamSelectKey<K> step = ExecutionStepFactory.streamSelectKey(
contextStacker,
sourceStep,
fieldName,
updateRowKey
);
return (SchemaKStream<Struct>) new SchemaKStream(
keyedKStream,
return new SchemaKStream<>(
StreamSelectKeyBuilder.build(kstream, step),
step,
keyFormat,
selectKeySerde,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.execution.streams.TableFilterBuilder;
import io.confluent.ksql.execution.streams.TableMapValuesBuilder;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.metastore.model.KeyField.LegacyField;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.QualifiedName;
import io.confluent.ksql.execution.expression.tree.QualifiedNameReference;
import io.confluent.ksql.execution.util.StructKeyUtil;
import java.util.Collections;
import org.apache.kafka.connect.data.Struct;
import org.easymock.EasyMock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.MaterializedFactory;
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metastore.MetaStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,24 @@
package io.confluent.ksql.execution.plan;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.KStream;

@Immutable
public class StreamSelectKey<S> implements ExecutionStep<S> {
public class StreamSelectKey<K> implements ExecutionStep<KStream<Struct, GenericRow>> {
private final ExecutionStepProperties properties;
private final ExecutionStep<S> source;
private final ExecutionStep<KStream<K, GenericRow>> source;
private final String fieldName;
private final boolean updateRowKey;

public StreamSelectKey(
final ExecutionStepProperties properties,
final ExecutionStep<S> source,
final ExecutionStep<KStream<K, GenericRow>> source,
final String fieldName,
final boolean updateRowKey) {
this.properties = Objects.requireNonNull(properties, "properties");
Expand All @@ -48,8 +51,16 @@ public List<ExecutionStep<?>> getSources() {
return Collections.singletonList(source);
}

public boolean isUpdateRowKey() {
return updateRowKey;
}

public String getFieldName() {
return fieldName;
}

@Override
public S build(final KsqlQueryBuilder streamsBuilder) {
public KStream<Struct, GenericRow> build(final KsqlQueryBuilder streamsBuilder) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.structured;
package io.confluent.ksql.execution.util;

import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.util.SchemaUtil;
Expand All @@ -25,7 +25,7 @@
/**
* Helper for dealing with Struct keys.
*/
final class StructKeyUtil {
public final class StructKeyUtil {

private static final Schema ROWKEY_STRUCT_SCHEMA = SchemaBuilder
.struct()
Expand All @@ -35,15 +35,15 @@ final class StructKeyUtil {
private static final org.apache.kafka.connect.data.Field ROWKEY_FIELD =
ROWKEY_STRUCT_SCHEMA.fields().get(0);

static final PersistenceSchema ROWKEY_SERIALIZED_SCHEMA = PersistenceSchema.from(
public static final PersistenceSchema ROWKEY_SERIALIZED_SCHEMA = PersistenceSchema.from(
(ConnectSchema) ROWKEY_STRUCT_SCHEMA,
false
);

private StructKeyUtil() {
}

static Struct asStructKey(final String rowKey) {
public static Struct asStructKey(final String rowKey) {
final Struct keyStruct = new Struct(ROWKEY_STRUCT_SCHEMA);
keyStruct.put(ROWKEY_FIELD, rowKey);
return keyStruct;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ public static <K> StreamStreamJoin<KStream<K, GenericRow>> streamStreamJoin(
);
}

public static <K> StreamSelectKey<KStream<K, GenericRow>> streamSelectKey(
@SuppressWarnings("unchecked")
public static <K> StreamSelectKey<K> streamSelectKey(
final QueryContext.Stacker stacker,
final ExecutionStep<KStream<K, GenericRow>> source,
final String fieldName,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.streams;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.plan.StreamSelectKey;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.SchemaUtil;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.KStream;

public final class StreamSelectKeyBuilder {
private StreamSelectKeyBuilder() {
}

public static KStream<Struct, GenericRow> build(
final KStream<?, GenericRow> kstream,
final StreamSelectKey<?> selectKey) {
final LogicalSchema sourceSchema = selectKey.getSources().get(0).getProperties().getSchema();
final Column keyColumn = sourceSchema.findValueColumn(selectKey.getFieldName())
.orElseThrow(IllegalArgumentException::new);
final int keyIndexInValue = sourceSchema.valueColumnIndex(keyColumn.fullName())
.orElseThrow(IllegalStateException::new);
final boolean updateRowKey = selectKey.isUpdateRowKey();
return kstream
.filter((key, value) ->
value != null && extractColumn(sourceSchema, keyIndexInValue, value) != null
).selectKey((key, value) ->
StructKeyUtil.asStructKey(
extractColumn(sourceSchema, keyIndexInValue, value).toString()
)
).mapValues((key, row) -> {
if (updateRowKey) {
final Object rowKey = key.get(key.schema().fields().get(0));
row.getColumns().set(SchemaUtil.ROWKEY_INDEX, rowKey);
}
return row;
});
}

private static Object extractColumn(
final LogicalSchema schema,
final int keyIndexInValue,
final GenericRow value
) {
if (value.getColumns().size() != schema.value().size()) {
throw new IllegalStateException("Field count mismatch. "
+ "Schema fields: " + schema
+ ", row:" + value);
}
return value
.getColumns()
.get(keyIndexInValue);
}
}
Loading

0 comments on commit f312fcc

Please sign in to comment.