Skip to content

Commit

Permalink
feat: move joins to plan builder (#3361)
Browse files Browse the repository at this point in the history
Moves code for joining kstreams/ktables out of SchemaKStream/Table
and into execution plan builders. Also moves KsqlValueJoiner into
ksql-streams, so that it can be used from the plan builders. Also
adds a buildKeySerde API for building both windowed and unwindowed
keys, since most of the plan builders dont have type information
for the key.
  • Loading branch information
rodesai authored Sep 20, 2019
1 parent 6757d9f commit e243c74
Show file tree
Hide file tree
Showing 21 changed files with 1,340 additions and 345 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.confluent.ksql.planner.plan;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
Expand All @@ -25,9 +24,7 @@
import io.confluent.ksql.parser.tree.WithinExpression;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.structured.SchemaKStream;
Expand All @@ -43,18 +40,13 @@
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serde;


public class JoinNode extends PlanNode {

public enum JoinType {
INNER, LEFT, OUTER
}

private static final String LEFT_SERDE_CONTEXT_NAME = "left";
private static final String RIGHT_SERDE_CONTEXT_NAME = "right";

private final JoinType joinType;
private final DataSourceNode left;
private final DataSourceNode right;
Expand Down Expand Up @@ -285,25 +277,6 @@ static ValueFormat getFormatForSource(final DataSourceNode sourceNode) {
.getValueFormat();
}

Serde<GenericRow> getSerDeForSource(
final DataSourceNode sourceNode,
final QueryContext.Stacker contextStacker
) {
final ValueFormat valueFormat = getFormatForSource(sourceNode);

final LogicalSchema logicalSchema = sourceNode.getSchema()
.withoutAlias();

return builder.buildValueSerde(
valueFormat.getFormatInfo(),
PhysicalSchema.from(
logicalSchema,
SerdeOption.none()
),
contextStacker.getQueryContext()
);
}

/**
* The key field of the resultant joined stream.
*
Expand Down Expand Up @@ -336,7 +309,6 @@ static KeyField getOuterJoinedKeyField(final String leftAlias, final KeyField le
}

private static final class StreamToStreamJoiner<K> extends Joiner<K> {

private StreamToStreamJoiner(
final KsqlQueryBuilder builder,
final JoinNode joinNode,
Expand Down Expand Up @@ -370,9 +342,9 @@ public SchemaKStream<K> join() {
joinNode.withinExpression.get().joinWindow(),
getFormatForSource(joinNode.left),
getFormatForSource(joinNode.right),
getSerDeForSource(joinNode.left, contextStacker.push(LEFT_SERDE_CONTEXT_NAME)),
getSerDeForSource(joinNode.right, contextStacker.push(RIGHT_SERDE_CONTEXT_NAME)),
contextStacker);
contextStacker,
builder
);
case OUTER:
return leftStream.outerJoin(
rightStream,
Expand All @@ -381,9 +353,9 @@ public SchemaKStream<K> join() {
joinNode.withinExpression.get().joinWindow(),
getFormatForSource(joinNode.left),
getFormatForSource(joinNode.right),
getSerDeForSource(joinNode.left, contextStacker.push(LEFT_SERDE_CONTEXT_NAME)),
getSerDeForSource(joinNode.right, contextStacker.push(RIGHT_SERDE_CONTEXT_NAME)),
contextStacker);
contextStacker,
builder
);
case INNER:
return leftStream.join(
rightStream,
Expand All @@ -392,9 +364,9 @@ public SchemaKStream<K> join() {
joinNode.withinExpression.get().joinWindow(),
getFormatForSource(joinNode.left),
getFormatForSource(joinNode.right),
getSerDeForSource(joinNode.left, contextStacker.push(LEFT_SERDE_CONTEXT_NAME)),
getSerDeForSource(joinNode.right, contextStacker.push(RIGHT_SERDE_CONTEXT_NAME)),
contextStacker);
contextStacker,
builder
);
default:
throw new KsqlException("Invalid join type encountered: " + joinNode.joinType);
}
Expand Down Expand Up @@ -432,17 +404,19 @@ public SchemaKStream<K> join() {
joinNode.schema,
getJoinedKeyField(joinNode.left.getAlias(), leftStream.getKeyField()),
getFormatForSource(joinNode.left),
getSerDeForSource(joinNode.left, contextStacker.push(LEFT_SERDE_CONTEXT_NAME)),
contextStacker);
contextStacker,
builder
);

case INNER:
return leftStream.join(
rightTable,
joinNode.schema,
getJoinedKeyField(joinNode.left.getAlias(), leftStream.getKeyField()),
getFormatForSource(joinNode.left),
getSerDeForSource(joinNode.left, contextStacker.push(LEFT_SERDE_CONTEXT_NAME)),
contextStacker);
contextStacker,
builder
);
case OUTER:
throw new KsqlException("Full outer joins between streams and tables are not supported.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.streams;

import io.confluent.ksql.execution.streams.GroupedFactory;
import io.confluent.ksql.execution.streams.JoinedFactory;
import io.confluent.ksql.execution.streams.MaterializedFactory;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Objects;
Expand Down
Loading

0 comments on commit e243c74

Please sign in to comment.