Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move selectKey impl to plan builder #3362

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.codegen.ExpressionMetadata;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.util.StructKeyUtil;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,17 @@
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.execution.plan.StreamFilter;
import io.confluent.ksql.execution.plan.StreamMapValues;
import io.confluent.ksql.execution.plan.StreamSelectKey;
import io.confluent.ksql.execution.plan.StreamSource;
import io.confluent.ksql.execution.plan.StreamToTable;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.StreamFilterBuilder;
import io.confluent.ksql.execution.streams.StreamMapValuesBuilder;
import io.confluent.ksql.execution.streams.StreamSelectKeyBuilder;
import io.confluent.ksql.execution.streams.StreamSourceBuilder;
import io.confluent.ksql.execution.streams.StreamToTableBuilder;
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.KeyField;
Expand Down Expand Up @@ -697,34 +700,19 @@ public SchemaKStream<Struct> selectKey(
);
}

final int keyIndexInValue = getSchema().valueColumnIndex(proposedKey.fullName())
.orElseThrow(IllegalStateException::new);

final KStream keyedKStream = kstream
.filter((key, value) -> value != null && extractColumn(keyIndexInValue, value) != null)
.selectKey((key, value) ->
StructKeyUtil.asStructKey(extractColumn(keyIndexInValue, value).toString()))
.mapValues((key, row) -> {
if (updateRowKey) {
final Object rowKey = key.get(key.schema().fields().get(0));
row.getColumns().set(SchemaUtil.ROWKEY_INDEX, rowKey);
}
return row;
});

final KeyField newKeyField = getSchema().isMetaColumn(fieldName)
? resultantKeyField.withName(Optional.empty())
: resultantKeyField;

final KeySerde<Struct> selectKeySerde = keySerde.rebind(StructKeyUtil.ROWKEY_SERIALIZED_SCHEMA);
final ExecutionStep<KStream<K, GenericRow>> step = ExecutionStepFactory.streamSelectKey(
final StreamSelectKey<K> step = ExecutionStepFactory.streamSelectKey(
contextStacker,
sourceStep,
fieldName,
updateRowKey
);
return (SchemaKStream<Struct>) new SchemaKStream(
keyedKStream,
return new SchemaKStream<>(
StreamSelectKeyBuilder.build(kstream, step),
step,
keyFormat,
selectKeySerde,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.execution.streams.TableFilterBuilder;
import io.confluent.ksql.execution.streams.TableMapValuesBuilder;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.metastore.model.KeyField.LegacyField;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.QualifiedName;
import io.confluent.ksql.execution.expression.tree.QualifiedNameReference;
import io.confluent.ksql.execution.util.StructKeyUtil;
import java.util.Collections;
import org.apache.kafka.connect.data.Struct;
import org.easymock.EasyMock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.MaterializedFactory;
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metastore.MetaStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,24 @@
package io.confluent.ksql.execution.plan;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.KStream;

@Immutable
public class StreamSelectKey<S> implements ExecutionStep<S> {
public class StreamSelectKey<K> implements ExecutionStep<KStream<Struct, GenericRow>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need K here? Can we not just use ExecutionStep<KStream<?, GenericRow>> for the source param / field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't pass an ExecutionStep<KStream<K, GenericRow>>, which is what SchemaKStream has, to ExecutionStep<KStream<?, GenericRow>>. We could make the type ExecutionStep<? extends KStream<?, GenericRow>>, but this felt easier to read.

private final ExecutionStepProperties properties;
private final ExecutionStep<S> source;
private final ExecutionStep<KStream<K, GenericRow>> source;
private final String fieldName;
private final boolean updateRowKey;

public StreamSelectKey(
final ExecutionStepProperties properties,
final ExecutionStep<S> source,
final ExecutionStep<KStream<K, GenericRow>> source,
final String fieldName,
final boolean updateRowKey) {
this.properties = Objects.requireNonNull(properties, "properties");
Expand All @@ -48,8 +51,16 @@ public List<ExecutionStep<?>> getSources() {
return Collections.singletonList(source);
}

public boolean isUpdateRowKey() {
return updateRowKey;
}

public String getFieldName() {
return fieldName;
}

@Override
public S build(final KsqlQueryBuilder streamsBuilder) {
public KStream<Struct, GenericRow> build(final KsqlQueryBuilder streamsBuilder) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.structured;
package io.confluent.ksql.execution.util;

import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.util.SchemaUtil;
Expand All @@ -25,7 +25,7 @@
/**
* Helper for dealing with Struct keys.
*/
final class StructKeyUtil {
public final class StructKeyUtil {

private static final Schema ROWKEY_STRUCT_SCHEMA = SchemaBuilder
.struct()
Expand All @@ -35,15 +35,15 @@ final class StructKeyUtil {
private static final org.apache.kafka.connect.data.Field ROWKEY_FIELD =
ROWKEY_STRUCT_SCHEMA.fields().get(0);

static final PersistenceSchema ROWKEY_SERIALIZED_SCHEMA = PersistenceSchema.from(
public static final PersistenceSchema ROWKEY_SERIALIZED_SCHEMA = PersistenceSchema.from(
(ConnectSchema) ROWKEY_STRUCT_SCHEMA,
false
);

private StructKeyUtil() {
}

static Struct asStructKey(final String rowKey) {
public static Struct asStructKey(final String rowKey) {
final Struct keyStruct = new Struct(ROWKEY_STRUCT_SCHEMA);
keyStruct.put(ROWKEY_FIELD, rowKey);
return keyStruct;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ public static <K> StreamStreamJoin<KStream<K, GenericRow>> streamStreamJoin(
);
}

public static <K> StreamSelectKey<KStream<K, GenericRow>> streamSelectKey(
@SuppressWarnings("unchecked")
public static <K> StreamSelectKey<K> streamSelectKey(
final QueryContext.Stacker stacker,
final ExecutionStep<KStream<K, GenericRow>> source,
final String fieldName,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.streams;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.plan.StreamSelectKey;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.SchemaUtil;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.KStream;

public final class StreamSelectKeyBuilder {
private StreamSelectKeyBuilder() {
}

public static KStream<Struct, GenericRow> build(
final KStream<?, GenericRow> kstream,
final StreamSelectKey<?> selectKey) {
final LogicalSchema sourceSchema = selectKey.getSources().get(0).getProperties().getSchema();
final Column keyColumn = sourceSchema.findValueColumn(selectKey.getFieldName())
.orElseThrow(IllegalArgumentException::new);
final int keyIndexInValue = sourceSchema.valueColumnIndex(keyColumn.fullName())
.orElseThrow(IllegalStateException::new);
final boolean updateRowKey = selectKey.isUpdateRowKey();
return kstream
.filter((key, value) ->
value != null && extractColumn(sourceSchema, keyIndexInValue, value) != null
).selectKey((key, value) ->
StructKeyUtil.asStructKey(
extractColumn(sourceSchema, keyIndexInValue, value).toString()
)
).mapValues((key, row) -> {
if (updateRowKey) {
final Object rowKey = key.get(key.schema().fields().get(0));
row.getColumns().set(SchemaUtil.ROWKEY_INDEX, rowKey);
}
return row;
});
}

private static Object extractColumn(
final LogicalSchema schema,
final int keyIndexInValue,
final GenericRow value
) {
if (value.getColumns().size() != schema.value().size()) {
throw new IllegalStateException("Field count mismatch. "
+ "Schema fields: " + schema
+ ", row:" + value);
}
return value
.getColumns()
.get(keyIndexInValue);
}
}
Loading