Skip to content

Commit

Permalink
refactor: use query context to build node names (#4007)
Browse files Browse the repository at this point in the history
Changes up how we build kstreams node names to use the query context
instead of the index-based name generator. This has a couple benefits.
It makes it easier to keep the name consistent as we don't have to worry
about generation order anymore. It also simplifies the step schemas.

This patch also includes cosmetic changes to the node names. Specifically,
they have all been converted to camel-case.
  • Loading branch information
rodesai authored Dec 6, 2019
1 parent f09f797 commit f90ac26
Show file tree
Hide file tree
Showing 505 changed files with 6,510 additions and 7,929 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private static FilterNode buildFilterNode(
final PlanNode sourcePlanNode,
final Expression filterExpression
) {
return new FilterNode(new PlanNodeId("Filter"), sourcePlanNode, filterExpression);
return new FilterNode(new PlanNodeId("WhereFilter"), sourcePlanNode, filterExpression);
}

private static RepartitionNode buildRepartitionNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,11 @@ public class AggregateNode extends PlanNode {

private static final String INTERNAL_COLUMN_NAME_PREFIX = "KSQL_INTERNAL_COL_";

private static final String PREPARE_OP_NAME = "prepare";
private static final String AGGREGATION_OP_NAME = "aggregate";
private static final String GROUP_BY_OP_NAME = "groupby";
private static final String HAVING_FILTER_OP_NAME = "having-filter";
private static final String PROJECT_OP_NAME = "project";

private static final String PRE_AGGR_SELECT_NODE_NAME = "PRE-AGGREGATE-SELECT";
private static final String PREPARE_OP_NAME = "Prepare";
private static final String AGGREGATION_OP_NAME = "Aggregate";
private static final String GROUP_BY_OP_NAME = "GroupBy";
private static final String HAVING_FILTER_OP_NAME = "HavingFilter";
private static final String PROJECT_OP_NAME = "Project";

private final PlanNode source;
private final LogicalSchema schema;
Expand Down Expand Up @@ -192,7 +190,6 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {

final SchemaKStream<?> aggregateArgExpanded = sourceSchemaKStream.select(
internalSchema.getAggArgExpansionList(),
PRE_AGGR_SELECT_NODE_NAME,
contextStacker.push(PREPARE_OP_NAME),
builder
);
Expand Down Expand Up @@ -240,7 +237,6 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
if (havingExpression.isPresent()) {
aggregated = aggregated.filter(
havingExpression.get(),
HAVING_FILTER_OP_NAME.toUpperCase(),
contextStacker.push(HAVING_FILTER_OP_NAME)
);
}
Expand All @@ -250,7 +246,6 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {

return aggregated.select(
finalSelects,
ProjectNode.SELECT_NODE_NAME,
contextStacker.push(PROJECT_OP_NAME),
builder
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
@Immutable
public class DataSourceNode extends PlanNode {

private static final String SOURCE_OP_NAME = "source";
private static final String SOURCE_OP_NAME = "Source";

private final DataSource<?> dataSource;
private final SourceName alias;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
@Immutable
public class FilterNode extends PlanNode {

private static final String WHERE_FILTER_OP_NAME = "WHERE-FILTER";
private final PlanNode source;
private final Expression predicate;
private final ImmutableList<SelectExpression> selectExpressions;
Expand Down Expand Up @@ -93,8 +92,7 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
return getSource().buildStream(builder)
.filter(
getPredicate(),
WHERE_FILTER_OP_NAME,
contextStacker.push(WHERE_FILTER_OP_NAME.toLowerCase())
contextStacker
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
@Immutable
public class ProjectNode extends PlanNode {

static final String SELECT_NODE_NAME = "SELECT";

private final PlanNode source;
private final LogicalSchema schema;
private final ImmutableList<SelectExpression> projectExpressions;
Expand Down Expand Up @@ -102,7 +100,6 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
return getSource().buildStream(builder)
.select(
getSelectExpressions(),
SELECT_NODE_NAME,
builder.buildNodeContext(getId().toString()),
builder
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,12 @@ public SchemaKStream<K> into(

public SchemaKStream<K> filter(
final Expression filterExpression,
final String stepName,
final Stacker contextStacker
) {
final StreamFilter<K> step = ExecutionStepFactory.streamFilter(
contextStacker,
sourceStep,
rewriteTimeComparisonForFilter(filterExpression),
stepName
rewriteTimeComparisonForFilter(filterExpression)
);

return new SchemaKStream<>(
Expand All @@ -143,16 +141,14 @@ static Expression rewriteTimeComparisonForFilter(final Expression expression) {

public SchemaKStream<K> select(
final List<SelectExpression> selectExpressions,
final String selectNodeName,
final QueryContext.Stacker contextStacker,
final KsqlQueryBuilder ksqlQueryBuilder
) {
final KeyField keyField = findKeyField(selectExpressions);
final StreamMapValues<K> step = ExecutionStepFactory.streamMapValues(
contextStacker,
sourceStep,
selectExpressions,
selectNodeName
selectExpressions
);

return new SchemaKStream<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,12 @@ public SchemaKTable<K> into(
@Override
public SchemaKTable<K> filter(
final Expression filterExpression,
final String stepName,
final Stacker contextStacker
) {
final TableFilter<K> step = ExecutionStepFactory.tableFilter(
contextStacker,
sourceTableStep,
rewriteTimeComparisonForFilter(filterExpression),
stepName
rewriteTimeComparisonForFilter(filterExpression)
);

return new SchemaKTable<>(
Expand All @@ -116,16 +114,14 @@ public SchemaKTable<K> filter(
@Override
public SchemaKTable<K> select(
final List<SelectExpression> selectExpressions,
final String selectNodeName,
final QueryContext.Stacker contextStacker,
final KsqlQueryBuilder ksqlQueryBuilder
) {
final KeyField keyField = findKeyField(selectExpressions);
final TableMapValues<K> step = ExecutionStepFactory.tableMapValues(
contextStacker,
sourceTableStep,
selectExpressions,
selectNodeName
selectExpressions
);

return new SchemaKTable<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void shouldCreateExecutionPlanForInsert() {
Assert.assertEquals(lines[1],
"\t\t > [ PROJECT ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE] | Logger: InsertQuery_1.Project");
Assert.assertEquals(lines[2],
"\t\t\t\t > [ SOURCE ] | Schema: [TEST1.ROWKEY STRING KEY, TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, TEST1.COL0 BIGINT, TEST1.COL1 STRING, TEST1.COL2 DOUBLE] | Logger: InsertQuery_1.KsqlTopic.source");
"\t\t\t\t > [ SOURCE ] | Schema: [TEST1.ROWKEY STRING KEY, TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, TEST1.COL0 BIGINT, TEST1.COL1 STRING, TEST1.COL2 DOUBLE] | Logger: InsertQuery_1.KsqlTopic.Source");
assertThat(queryMetadataList.get(1), instanceOf(PersistentQueryMetadata.class));
final PersistentQueryMetadata persistentQuery = (PersistentQueryMetadata)
queryMetadataList.get(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,6 @@ public class AggregateNodeTest {
private final ProcessingLogContext processingLogContext = ProcessingLogContext.create();
private final QueryId queryId = new QueryId("queryid");

@Before
public void setUp() {
when(ksqlStreamBuilder.buildUniqueNodeName(any())).thenAnswer(inv -> inv.getArgument(0));
}

@Test
public void shouldBuildSourceNode() {
// When:
Expand Down Expand Up @@ -210,19 +205,19 @@ public void shouldHaveSourceNodeForSecondSubtopolgyWithKsqlNameForRepartition()

// Then:
final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(
builder.build(), "Aggregate-groupby-repartition-source");
builder.build(), "Aggregate-GroupBy-repartition-source");
final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(node.predecessors(), equalTo(Collections.emptySet()));
assertThat(successors, equalTo(Collections.singletonList("KSTREAM-AGGREGATE-0000000005")));
assertThat(node.topicSet(), containsInAnyOrder("Aggregate-groupby-repartition"));
assertThat(node.topicSet(), containsInAnyOrder("Aggregate-GroupBy-repartition"));
}

@Test
public void shouldHaveKsqlNameForAggregationStateStore() {
build();
final TopologyDescription.Processor node = (TopologyDescription.Processor) getNodeByName(
builder.build(), "KSTREAM-AGGREGATE-0000000004");
assertThat(node.stores(), hasItem(equalTo("Aggregate-aggregate")));
assertThat(node.stores(), hasItem(equalTo("Aggregate-Aggregate-Materialize")));
}

@Test
Expand All @@ -234,9 +229,9 @@ public void shouldHaveSinkNodeWithSameTopicAsSecondSource() {

// Then:
final TopologyDescription.Sink sink = (TopologyDescription.Sink) getNodeByName(builder.build(),
"Aggregate-groupby-repartition-sink");
"Aggregate-GroupBy-repartition-sink");
final TopologyDescription.Source source = (TopologyDescription.Source) getNodeByName(
builder.build(), "Aggregate-groupby-repartition-source");
builder.build(), "Aggregate-GroupBy-repartition-source");
assertThat(sink.successors(), equalTo(Collections.emptySet()));
assertThat(source.topicSet(), hasItem(sink.topic()));
}
Expand Down Expand Up @@ -304,9 +299,9 @@ public void shouldCreateLoggers() {
.collect(Collectors.toList());

assertThat(loggers, contains(
"queryid.KsqlTopic.source",
"queryid.Aggregate.groupby",
"queryid.Aggregate.aggregate"
"queryid.KsqlTopic.Source",
"queryid.Aggregate.GroupBy",
"queryid.Aggregate.Aggregate.Materialize"
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public void shouldBuildSourceStreamWithCorrectParams() {
);
assertThat(
stackerCaptor.getValue().getQueryContext().getContext(),
equalTo(ImmutableList.of("0", "source"))
equalTo(ImmutableList.of("0", "Source"))
);
}

Expand All @@ -332,7 +332,7 @@ public void shouldBuildSourceStreamWithCorrectParamsWhenBuildingTable() {
);
assertThat(
stackerCaptor.getValue().getQueryContext().getContext(),
equalTo(ImmutableList.of("0", "source"))
equalTo(ImmutableList.of("0", "Source"))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand Down Expand Up @@ -45,8 +46,6 @@ public class FilterNodeTest {
private KsqlQueryBuilder ksqlStreamBuilder;
@Mock
private Stacker stacker;
@Mock
private Stacker updatedStacker;

private FilterNode node;

Expand All @@ -59,11 +58,10 @@ public void setup() {
when(sourceNode.buildStream(any()))
.thenReturn(schemaKStream);
when(sourceNode.getNodeOutputType()).thenReturn(DataSourceType.KSTREAM);
when(schemaKStream.filter(any(), any(), any()))
when(schemaKStream.filter(any(), any()))
.thenReturn(schemaKStream);

when(ksqlStreamBuilder.buildNodeContext(nodeId.toString())).thenReturn(stacker);
when(stacker.push(any())).thenReturn(updatedStacker);

node = new FilterNode(nodeId, sourceNode, predicate);
}
Expand All @@ -75,7 +73,7 @@ public void shouldApplyFilterCorrectly() {

// Then:
verify(sourceNode).buildStream(ksqlStreamBuilder);
verify(schemaKStream).filter(predicate, "WHERE-FILTER", updatedStacker);
verify(schemaKStream).filter(predicate, stacker);
}

@SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_INFERRED")
Expand All @@ -85,6 +83,6 @@ public void shouldUseRightOpName() {
node.buildStream(ksqlStreamBuilder);

// Then:
verify(stacker).push("where-filter");
verifyNoMoreInteractions(stacker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public void shouldHaveLeftJoin() {
= (TopologyDescription.Processor) getNodeByName(topology, "Join");
final List<String> predecessors = leftJoin.predecessors().stream()
.map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(leftJoin.stores(), equalTo(Utils.mkSet("KafkaTopic_Right-reduce")));
assertThat(leftJoin.stores(), equalTo(Utils.mkSet("KafkaTopic_Right-Reduce")));
assertThat(predecessors, equalTo(Collections.singletonList("Join-repartition-source")));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@
@RunWith(MockitoJUnitRunner.class)
public class KsqlBareOutputNodeTest {

private static final String FILTER_NODE = "WHERE-FILTER-unique";
private static final String FILTER_MAPVALUES_NODE = "SELECT-unique";
private static final String FILTER_NODE = "WhereFilter";
private static final String FILTER_MAPVALUES_NODE = "Project";
private static final String SIMPLE_SELECT_WITH_FILTER = "SELECT col0, col2, col3 FROM test1 WHERE col0 > 100 EMIT CHANGES;";

private SchemaKStream stream;
Expand Down Expand Up @@ -93,7 +93,6 @@ public void before() {
new QueryContext.Stacker()
.push(inv.getArgument(0).toString()));
when(ksqlStreamBuilder.buildKeySerde(any(), any(), any())).thenReturn(keySerde);
when(ksqlStreamBuilder.buildUniqueNodeName(any())).thenAnswer(inv -> inv.getArgument(0) + "-unique");

final KsqlBareOutputNode planNode = (KsqlBareOutputNode) AnalysisTestUtil
.buildLogicalPlan(ksqlConfig, SIMPLE_SELECT_WITH_FILTER, metaStore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void init() {
when(source.getNodeOutputType()).thenReturn(DataSourceType.KSTREAM);
when(source.getSelectExpressions()).thenReturn(ImmutableList.of(SELECT_0, SELECT_1));
when(ksqlStreamBuilder.buildNodeContext(NODE_ID.toString())).thenReturn(stacker);
when(stream.select(any(), any(), any(), any())).thenReturn((SchemaKStream) stream);
when(stream.select(any(), any(), any())).thenReturn((SchemaKStream) stream);

projectNode = new ProjectNode(
NODE_ID,
Expand Down Expand Up @@ -126,7 +126,6 @@ public void shouldCreateProjectionWithFieldNameExpressionPairs() {
// Then:
verify(stream).select(
eq(ImmutableList.of(SELECT_0, SELECT_1)),
eq("SELECT"),
eq(stacker),
same(ksqlStreamBuilder)
);
Expand Down
Loading

0 comments on commit f90ac26

Please sign in to comment.