diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java index 3981d8e326df..2cce54da554b 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java @@ -44,6 +44,12 @@ import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.expression.tree.FunctionCall; import io.confluent.ksql.execution.expression.tree.LongLiteral; +import io.confluent.ksql.execution.streams.StreamJoinedFactory; +import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.name.FunctionName; +import io.confluent.ksql.name.SourceName; +import io.confluent.ksql.schema.ksql.ColumnRef; +import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.plan.DefaultExecutionStepProperties; import io.confluent.ksql.execution.plan.PlanBuilder; import io.confluent.ksql.execution.plan.ExecutionStep; @@ -119,6 +125,8 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.streams.kstream.ValueMapper; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -143,6 +151,8 @@ public class SchemaKStreamTest { "group", Serdes.String(), Serdes.String()); private final Joined joined = Joined.with( Serdes.String(), Serdes.String(), Serdes.String(), "join"); + private final StreamJoined streamJoined = StreamJoined.with( + Serdes.String(), Serdes.String(), Serdes.String()).withName("join"); private final KeyField validJoinKeyField = KeyField.of( Optional.of(ColumnRef.of(SourceName.of("left"), ColumnName.of("COL1"))), metaStore.getSource(SourceName.of("TEST1")) @@ -179,6 +189,8 @@ public class SchemaKStreamTest { @Mock private MaterializedFactory mockMaterializedFactory; @Mock + private StreamJoinedFactory mockStreamJoinedFactory; + @Mock private KStream mockKStream; @Mock private KeySerde keySerde; @@ -206,6 +218,7 @@ public void init() { when(mockGroupedFactory.create(anyString(), any(Serde.class), any(Serde.class))) .thenReturn(grouped); when(mockJoinedFactory.create(any(), any(), any(), anyString())).thenReturn(joined); + when(mockStreamJoinedFactory.create(any(), any(), any(), anyString(), anyString())).thenReturn(streamJoined); final KsqlStream secondKsqlStream = (KsqlStream) metaStore.getSource(SourceName.of("ORDERS")); secondKStream = builder @@ -264,7 +277,7 @@ public void init() { queryBuilder, mock(SqlPredicateFactory.class), mock(AggregateParams.Factory.class), - new StreamsFactories(mockGroupedFactory, mockJoinedFactory, mockMaterializedFactory) + new StreamsFactories(mockGroupedFactory, mockJoinedFactory, mockMaterializedFactory, mockStreamJoinedFactory) ); } @@ -879,7 +892,7 @@ public void shouldPerformStreamToStreamLeftJoin() { any(KStream.class), any(KsqlValueJoiner.class), any(JoinWindows.class), - any(Joined.class)) + any(StreamJoined.class)) ).thenReturn(mockKStream); when(queryBuilder.buildValueSerde(any(), any(), any())) .thenReturn(leftSerde) @@ -900,12 +913,12 @@ public void shouldPerformStreamToStreamLeftJoin() { // Then: joinedKStream.getSourceStep().build(planBuilder); - verifyCreateJoined(rightSerde); + verifyCreateStreamJoined(rightSerde); verify(mockKStream).leftJoin( eq(secondKStream), any(KsqlValueJoiner.class), eq(joinWindow), - same(joined) + same(streamJoined) ); assertThat(joinedKStream, instanceOf(SchemaKStream.class)); assertEquals(SchemaKStream.Type.JOIN, joinedKStream.type); @@ -983,7 +996,7 @@ public void shouldPerformStreamToStreamInnerJoin() { any(KStream.class), any(KsqlValueJoiner.class), any(JoinWindows.class), - any(Joined.class)) + any(StreamJoined.class)) ).thenReturn(mockKStream); when(queryBuilder.buildValueSerde(any(), any(), any())) .thenReturn(leftSerde) @@ -1004,12 +1017,12 @@ public void shouldPerformStreamToStreamInnerJoin() { // Then: joinedKStream.getSourceStep().build(planBuilder); - verifyCreateJoined(rightSerde); + verifyCreateStreamJoined(rightSerde); verify(mockKStream).join( eq(secondKStream), any(KsqlValueJoiner.class), eq(joinWindow), - same(joined) + same(streamJoined) ); assertThat(joinedKStream, instanceOf(SchemaKStream.class)); @@ -1031,7 +1044,7 @@ public void shouldPerformStreamToStreamOuterJoin() { any(KStream.class), any(KsqlValueJoiner.class), any(JoinWindows.class), - any(Joined.class)) + any(StreamJoined.class)) ).thenReturn(mockKStream); when(queryBuilder.buildValueSerde(any(), any(), any())) .thenReturn(leftSerde) @@ -1052,12 +1065,12 @@ public void shouldPerformStreamToStreamOuterJoin() { // Then: joinedKStream.getSourceStep().build(planBuilder); - verifyCreateJoined(rightSerde); + verifyCreateStreamJoined(rightSerde); verify(mockKStream).outerJoin( eq(secondKStream), any(KsqlValueJoiner.class), eq(joinWindow), - same(joined) + same(streamJoined) ); assertThat(joinedKStream, instanceOf(SchemaKStream.class)); assertEquals(SchemaKStream.Type.JOIN, joinedKStream.type); @@ -1311,6 +1324,16 @@ private void givenSourcePropertiesWithSchema( ); } + private void verifyCreateStreamJoined(final Serde rightSerde) { + verify(mockStreamJoinedFactory).create( + same(keySerde), + same(leftSerde), + same(rightSerde), + eq(StreamsUtil.buildOpName(childContextStacker.getQueryContext())), + eq(StreamsUtil.buildOpName(childContextStacker.getQueryContext())) + ); + } + private SchemaKStream buildSchemaKStream( final LogicalSchema schema, final KeyField keyField, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java index 62de0c7dcb37..ce781ef7a41e 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java @@ -43,6 +43,7 @@ import io.confluent.ksql.execution.expression.tree.ComparisonExpression; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.expression.tree.LongLiteral; +import io.confluent.ksql.execution.streams.StreamJoinedFactory; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.schema.ksql.ColumnRef; @@ -206,7 +207,8 @@ public void init() { new StreamsFactories( groupedFactory, mock(JoinedFactory.class), - mock(MaterializedFactory.class) + mock(MaterializedFactory.class), + mock(StreamJoinedFactory.class) ) ); } diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java index 02e9692c8b3a..83a772924076 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java @@ -64,11 +64,11 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.hamcrest.Matcher; import org.hamcrest.StringDescription; // CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling +@SuppressWarnings("deprecation") public class TestExecutor implements Closeable { // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling @@ -333,7 +333,8 @@ private static void processSingleRecord( : recordTopic.getValueSerializer(schemaRegistryClient); final Object key = getKey(inputRecord); - final ConsumerRecord consumerRecord = new ConsumerRecordFactory<>( + final ConsumerRecord consumerRecord = + new org.apache.kafka.streams.test.ConsumerRecordFactory<>( keySerializer, valueSerializer ).create( diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java index 3e9c40edb8f6..b49d29213ecc 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java @@ -67,6 +67,7 @@ import org.apache.kafka.streams.TopologyTestDriver; // CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling +@SuppressWarnings("deprecation") public final class TestExecutorUtil { // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java index 5df35e50ae81..0bfb5f729e3d 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java @@ -160,7 +160,7 @@ public KStreamHolder visitStreamStreamJoin(final StreamStreamJoin join right, join, queryBuilder, - streamsFactories.getJoinedFactory() + streamsFactories.getStreamJoinedFactory() ); } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamJoinedFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamJoinedFactory.java new file mode 100644 index 000000000000..3832711fa42d --- /dev/null +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamJoinedFactory.java @@ -0,0 +1,58 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import io.confluent.ksql.util.KsqlConfig; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.kstream.StreamJoined; + +public interface StreamJoinedFactory { + StreamJoined create( + Serde keySerde, + Serde leftSerde, + Serde rightSerde, + String name, + String storeName); + + + static StreamJoinedFactory create(final KsqlConfig ksqlConfig) { + if (StreamsUtil.useProvidedName(ksqlConfig)) { + return new StreamJoinedFactory() { + @Override + public StreamJoined create( + final Serde keySerde, + final Serde leftSerde, + final Serde rightSerde, + final String name, + final String storeName) { + return StreamJoined.with(keySerde, leftSerde, rightSerde) + .withName(name).withStoreName(storeName); + } + }; + } + return new StreamJoinedFactory() { + @Override + public StreamJoined create( + final Serde keySerde, + final Serde leftSerde, + final Serde rightSerde, + final String name, + final String storeName) { + return StreamJoined.with(keySerde, leftSerde, rightSerde); + } + }; + } +} diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilder.java index df19c5413106..9a214b479e53 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilder.java @@ -26,8 +26,8 @@ import io.confluent.ksql.serde.KeySerde; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.JoinWindows; -import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.StreamJoined; public final class StreamStreamJoinBuilder { private static final String LEFT_SERDE_CTX = "left"; @@ -41,7 +41,7 @@ public static KStreamHolder build( final KStreamHolder right, final StreamStreamJoin join, final KsqlQueryBuilder queryBuilder, - final JoinedFactory joinedFactory) { + final StreamJoinedFactory streamJoinedFactory) { final Formats leftFormats = join.getLeftFormats(); final QueryContext queryContext = join.getProperties().getQueryContext(); final QueryContext.Stacker stacker = QueryContext.Stacker.of(queryContext); @@ -71,10 +71,11 @@ public static KStreamHolder build( leftPhysicalSchema, queryContext ); - final Joined joined = joinedFactory.create( + final StreamJoined joined = streamJoinedFactory.create( keySerde, leftSerde, rightSerde, + StreamsUtil.buildOpName(queryContext), StreamsUtil.buildOpName(queryContext) ); final KsqlValueJoiner joiner = new KsqlValueJoiner(leftSchema, rightSchema); diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamsFactories.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamsFactories.java index 5e7ff5ad2fd4..a1b361773620 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamsFactories.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamsFactories.java @@ -22,23 +22,27 @@ public class StreamsFactories { private final GroupedFactory groupedFactory; private final JoinedFactory joinedFactory; private final MaterializedFactory materializedFactory; + private final StreamJoinedFactory streamJoinedFactory; public static StreamsFactories create(final KsqlConfig ksqlConfig) { Objects.requireNonNull(ksqlConfig); return new StreamsFactories( GroupedFactory.create(ksqlConfig), JoinedFactory.create(ksqlConfig), - MaterializedFactory.create(ksqlConfig) + MaterializedFactory.create(ksqlConfig), + StreamJoinedFactory.create(ksqlConfig) ); } public StreamsFactories( final GroupedFactory groupedFactory, final JoinedFactory joinedFactory, - final MaterializedFactory materializedFactory) { + final MaterializedFactory materializedFactory, + final StreamJoinedFactory streamJoinedFactory) { this.groupedFactory = Objects.requireNonNull(groupedFactory); this.joinedFactory = Objects.requireNonNull(joinedFactory); this.materializedFactory = Objects.requireNonNull(materializedFactory); + this.streamJoinedFactory = Objects.requireNonNull(streamJoinedFactory); } public GroupedFactory getGroupedFactory() { @@ -52,4 +56,8 @@ public JoinedFactory getJoinedFactory() { public MaterializedFactory getMaterializedFactory() { return materializedFactory; } + + public StreamJoinedFactory getStreamJoinedFactory() { + return streamJoinedFactory; + } } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java index 6daddc53f26c..052e67004ff5 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java @@ -197,7 +197,8 @@ public void init() { new StreamsFactories( mock(GroupedFactory.class), mock(JoinedFactory.class), - materializedFactory + materializedFactory, + mock(StreamJoinedFactory.class) ) ); } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java index 4d11ae55bea5..a3d7cb63e173 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java @@ -152,7 +152,8 @@ public void init() { new StreamsFactories( groupedFactory, mock(JoinedFactory.class), - mock(MaterializedFactory.class) + mock(MaterializedFactory.class), + mock(StreamJoinedFactory.class) ) ); streamGroupByKey = new StreamGroupByKey(PROPERTIES, sourceStep, FORMATS); diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilderTest.java index 025208de274a..5a06e8668eac 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilderTest.java @@ -3,6 +3,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; @@ -10,6 +11,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import io.confluent.ksql.execution.context.QueryContext; @@ -37,8 +39,8 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.kstream.JoinWindows; -import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.StreamJoined; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -101,9 +103,9 @@ public class StreamStreamJoinBuilderTest { @Mock private ExecutionStep> right; @Mock - private Joined joined; + private StreamJoined joined; @Mock - private JoinedFactory joinedFactory; + private StreamJoinedFactory streamJoinedFactory; @Mock private KsqlQueryBuilder queryBuilder; @Mock @@ -128,7 +130,7 @@ public void init() { .thenReturn(leftSerde); when(queryBuilder.buildValueSerde(eq(FormatInfo.of(Format.AVRO)), any(), any())) .thenReturn(rightSerde); - when(joinedFactory.create(any(Serde.class), any(), any(), any())).thenReturn(joined); + when(streamJoinedFactory.create(any(Serde.class), any(Serde.class), any(Serde.class), anyString(), anyString())).thenReturn(joined); when(left.build(any())).thenReturn( new KStreamHolder<>(leftKStream, keySerdeFactory)); when(right.build(any())).thenReturn( @@ -139,15 +141,16 @@ public void init() { mock(AggregateParams.Factory.class), new StreamsFactories( mock(GroupedFactory.class), - joinedFactory, - mock(MaterializedFactory.class) + mock(JoinedFactory.class), + mock(MaterializedFactory.class), + streamJoinedFactory ) ); } @SuppressWarnings("unchecked") private void givenLeftJoin() { - when(leftKStream.leftJoin(any(KStream.class), any(), any(), any())).thenReturn(resultKStream); + when(leftKStream.leftJoin(any(KStream.class), any(), any(), any(StreamJoined.class))).thenReturn(resultKStream); join = new StreamStreamJoin<>( new DefaultExecutionStepProperties(SCHEMA, CTX), JoinType.LEFT, @@ -162,7 +165,7 @@ private void givenLeftJoin() { @SuppressWarnings("unchecked") private void givenOuterJoin() { - when(leftKStream.outerJoin(any(KStream.class), any(), any(), any())).thenReturn(resultKStream); + when(leftKStream.outerJoin(any(KStream.class), any(), any(), any(StreamJoined.class))).thenReturn(resultKStream); join = new StreamStreamJoin<>( new DefaultExecutionStepProperties(SCHEMA, CTX), JoinType.OUTER, @@ -177,7 +180,7 @@ private void givenOuterJoin() { @SuppressWarnings("unchecked") private void givenInnerJoin() { - when(leftKStream.join(any(KStream.class), any(), any(), any())).thenReturn(resultKStream); + when(leftKStream.join(any(KStream.class), any(), any(), any(StreamJoined.class))).thenReturn(resultKStream); join = new StreamStreamJoin<>( new DefaultExecutionStepProperties(SCHEMA, CTX), JoinType.INNER, @@ -251,6 +254,7 @@ public void shouldDoInnerJoin() { } @Test + @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") public void shouldBuildJoinedCorrectly() { // Given: givenInnerJoin(); @@ -259,7 +263,7 @@ public void shouldBuildJoinedCorrectly() { join.build(planBuilder); // Then: - verify(joinedFactory).create(keySerde, leftSerde, rightSerde, "jo-in"); + verify(streamJoinedFactory).create(keySerde, leftSerde, rightSerde, "jo-in", "jo-in"); } @Test diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilderTest.java index bc18605790e3..d41d7784d5a1 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilderTest.java @@ -135,7 +135,8 @@ public void init() { new StreamsFactories( mock(GroupedFactory.class), joinedFactory, - mock(MaterializedFactory.class) + mock(MaterializedFactory.class), + mock(StreamJoinedFactory.class) ) ); } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamToTableBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamToTableBuilderTest.java index df0c5aeac08d..c705f123de6f 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamToTableBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamToTableBuilderTest.java @@ -138,7 +138,8 @@ public void setup() { new StreamsFactories( mock(GroupedFactory.class), mock(JoinedFactory.class), - materializedFactory + materializedFactory, + mock(StreamJoinedFactory.class) ) ); } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java index 3281792032de..3265fe42aa0b 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java @@ -172,7 +172,8 @@ public void init() { new StreamsFactories( mock(GroupedFactory.class), mock(JoinedFactory.class), - materializedFactory + materializedFactory, + mock(StreamJoinedFactory.class) ) ); } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableGroupByBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableGroupByBuilderTest.java index 6a3502fa280b..82b3652bf30b 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableGroupByBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableGroupByBuilderTest.java @@ -151,7 +151,8 @@ public void init() { new StreamsFactories( groupedFactory, mock(JoinedFactory.class), - mock(MaterializedFactory.class) + mock(MaterializedFactory.class), + mock(StreamJoinedFactory.class) ) ); }