Skip to content

Commit

Permalink
fix: rebased and changes for refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
bbejeck committed Oct 4, 2019
1 parent c1c4c8c commit 689fc83
Show file tree
Hide file tree
Showing 10 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public void init() {
queryBuilder,
mock(SqlPredicateFactory.class),
mock(AggregateParams.Factory.class),
new StreamsFactories(mockGroupedFactory, mockJoinedFactory, mockMaterializedFactory)
new StreamsFactories(mockGroupedFactory, mockJoinedFactory, mockMaterializedFactory, mockStreamJoinedFactory)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.LongLiteral;
import io.confluent.ksql.execution.streams.StreamJoinedFactory;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.schema.ksql.ColumnRef;
Expand Down Expand Up @@ -205,7 +206,8 @@ public void init() {
new StreamsFactories(
groupedFactory,
mock(JoinedFactory.class),
mock(MaterializedFactory.class)
mock(MaterializedFactory.class),
mock(StreamJoinedFactory.class)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public <K> KStreamHolder<K> visitStreamStreamJoin(final StreamStreamJoin<K> join
right,
join,
queryBuilder,
streamsFactories.getJoinedFactory()
streamsFactories.getStreamJoinedFactory()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ public void init() {
new StreamsFactories(
mock(GroupedFactory.class),
mock(JoinedFactory.class),
materializedFactory
materializedFactory,
mock(StreamJoinedFactory.class)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ public void init() {
new StreamsFactories(
groupedFactory,
mock(JoinedFactory.class),
mock(MaterializedFactory.class)
mock(MaterializedFactory.class),
mock(StreamJoinedFactory.class)
)
);
streamGroupByKey = new StreamGroupByKey(PROPERTIES, sourceStep, FORMATS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -130,7 +131,7 @@ public void init() {
.thenReturn(leftSerde);
when(queryBuilder.buildValueSerde(eq(FormatInfo.of(Format.AVRO)), any(), any()))
.thenReturn(rightSerde);
when(streamJoinedFactory.create(any(Serde.class), any(), any(), any())).thenReturn(joined);
when(streamJoinedFactory.create(any(Serde.class), any(Serde.class), any(Serde.class), anyString(), anyString())).thenReturn(joined);
when(left.build(any())).thenReturn(
new KStreamHolder<>(leftKStream, keySerdeFactory));
when(right.build(any())).thenReturn(
Expand All @@ -141,15 +142,16 @@ public void init() {
mock(AggregateParams.Factory.class),
new StreamsFactories(
mock(GroupedFactory.class),
streamJoinedFactory,
mock(MaterializedFactory.class)
mock(JoinedFactory.class),
mock(MaterializedFactory.class),
streamJoinedFactory
)
);
}

@SuppressWarnings("unchecked")
private void givenLeftJoin() {
when(leftKStream.leftJoin(any(KStream.class), any(), any(), any())).thenReturn(resultKStream);
when(leftKStream.leftJoin(any(KStream.class), any(), any(), any(StreamJoined.class))).thenReturn(resultKStream);
join = new StreamStreamJoin<>(
new DefaultExecutionStepProperties(SCHEMA, CTX),
JoinType.LEFT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public void init() {
new StreamsFactories(
mock(GroupedFactory.class),
joinedFactory,
mock(MaterializedFactory.class)
mock(MaterializedFactory.class),
mock(StreamJoinedFactory.class)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public void setup() {
new StreamsFactories(
mock(GroupedFactory.class),
mock(JoinedFactory.class),
materializedFactory
materializedFactory,
mock(StreamJoinedFactory.class)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ public void init() {
new StreamsFactories(
mock(GroupedFactory.class),
mock(JoinedFactory.class),
materializedFactory
materializedFactory,
mock(StreamJoinedFactory.class)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public void init() {
new StreamsFactories(
groupedFactory,
mock(JoinedFactory.class),
mock(MaterializedFactory.class)
mock(MaterializedFactory.class),
mock(StreamJoinedFactory.class)
)
);
}
Expand Down

0 comments on commit 689fc83

Please sign in to comment.