diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java index 991ccdc7646a..cce8e033c92f 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java @@ -276,7 +276,7 @@ public void init() { queryBuilder, mock(SqlPredicateFactory.class), mock(AggregateParams.Factory.class), - new StreamsFactories(mockGroupedFactory, mockJoinedFactory, mockMaterializedFactory) + new StreamsFactories(mockGroupedFactory, mockJoinedFactory, mockMaterializedFactory, mockStreamJoinedFactory) ); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java index 7b3f05a808a5..f46b0f33cc29 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java @@ -43,6 +43,7 @@ import io.confluent.ksql.execution.expression.tree.ComparisonExpression; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.expression.tree.LongLiteral; +import io.confluent.ksql.execution.streams.StreamJoinedFactory; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.schema.ksql.ColumnRef; @@ -205,7 +206,8 @@ public void init() { new StreamsFactories( groupedFactory, mock(JoinedFactory.class), - mock(MaterializedFactory.class) + mock(MaterializedFactory.class), + mock(StreamJoinedFactory.class) ) ); } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java index e1734c2b82d7..eb18a96207f5 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java @@ -153,7 +153,7 @@ public KStreamHolder visitStreamStreamJoin(final StreamStreamJoin join right, join, queryBuilder, - streamsFactories.getJoinedFactory() + streamsFactories.getStreamJoinedFactory() ); } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java index c01b4d74334e..61554f96bdbd 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java @@ -197,7 +197,8 @@ public void init() { new StreamsFactories( mock(GroupedFactory.class), mock(JoinedFactory.class), - materializedFactory + materializedFactory, + mock(StreamJoinedFactory.class) ) ); } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java index dbf42c9e83ab..493b18c18330 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java @@ -152,7 +152,8 @@ public void init() { new StreamsFactories( groupedFactory, mock(JoinedFactory.class), - mock(MaterializedFactory.class) + mock(MaterializedFactory.class), + mock(StreamJoinedFactory.class) ) ); streamGroupByKey = new StreamGroupByKey(PROPERTIES, sourceStep, FORMATS); diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilderTest.java index 57c3bb6bd8d7..1948d0f7637d 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilderTest.java @@ -3,6 +3,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; @@ -130,7 +131,7 @@ public void init() { .thenReturn(leftSerde); when(queryBuilder.buildValueSerde(eq(FormatInfo.of(Format.AVRO)), any(), any())) .thenReturn(rightSerde); - when(streamJoinedFactory.create(any(Serde.class), any(), any(), any())).thenReturn(joined); + when(streamJoinedFactory.create(any(Serde.class), any(Serde.class), any(Serde.class), anyString(), anyString())).thenReturn(joined); when(left.build(any())).thenReturn( new KStreamHolder<>(leftKStream, keySerdeFactory)); when(right.build(any())).thenReturn( @@ -141,15 +142,16 @@ public void init() { mock(AggregateParams.Factory.class), new StreamsFactories( mock(GroupedFactory.class), - streamJoinedFactory, - mock(MaterializedFactory.class) + mock(JoinedFactory.class), + mock(MaterializedFactory.class), + streamJoinedFactory ) ); } @SuppressWarnings("unchecked") private void givenLeftJoin() { - when(leftKStream.leftJoin(any(KStream.class), any(), any(), any())).thenReturn(resultKStream); + when(leftKStream.leftJoin(any(KStream.class), any(), any(), any(StreamJoined.class))).thenReturn(resultKStream); join = new StreamStreamJoin<>( new DefaultExecutionStepProperties(SCHEMA, CTX), JoinType.LEFT, diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilderTest.java index f866398f3628..0c1be1d6f407 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilderTest.java @@ -135,7 +135,8 @@ public void init() { new StreamsFactories( mock(GroupedFactory.class), joinedFactory, - mock(MaterializedFactory.class) + mock(MaterializedFactory.class), + mock(StreamJoinedFactory.class) ) ); } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamToTableBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamToTableBuilderTest.java index f7203cd6a154..462e6aa3fed2 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamToTableBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamToTableBuilderTest.java @@ -137,7 +137,8 @@ public void setup() { new StreamsFactories( mock(GroupedFactory.class), mock(JoinedFactory.class), - materializedFactory + materializedFactory, + mock(StreamJoinedFactory.class) ) ); } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java index 3b58b7caaf95..7578fc6ea665 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java @@ -172,7 +172,8 @@ public void init() { new StreamsFactories( mock(GroupedFactory.class), mock(JoinedFactory.class), - materializedFactory + materializedFactory, + mock(StreamJoinedFactory.class) ) ); } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableGroupByBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableGroupByBuilderTest.java index 17bf4c0a74e1..785012bc5183 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableGroupByBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableGroupByBuilderTest.java @@ -150,7 +150,8 @@ public void init() { new StreamsFactories( groupedFactory, mock(JoinedFactory.class), - mock(MaterializedFactory.class) + mock(MaterializedFactory.class), + mock(StreamJoinedFactory.class) ) ); }