From 82937ffdbfaaa0dd2a1c3079369c0f38db23c0ab Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Tue, 12 Jan 2021 10:55:27 -0800 Subject: [PATCH] chore: address feedback --- .../io/confluent/ksql/util/KsqlConfig.java | 4 +++- .../UdafAggregateFunctionFactory.java | 2 +- .../function/udaf/array/CollectListUdaf.java | 4 ++++ .../function/udaf/array/CollectSetUdaf.java | 4 ++++ .../ksql/planner/plan/AggregateNode.java | 3 +-- .../ksql/structured/SchemaKGroupedStream.java | 3 +-- .../ksql/structured/SchemaKGroupedTable.java | 5 ++--- .../UdafAggregateFunctionFactoryTest.java | 9 +++++---- .../udaf/array/CollectListUdafTest.java | 13 ++++++++++++ .../udaf/array/CollectSetUdafTest.java | 13 ++++++++++++ .../structured/SchemaKGroupedStreamTest.java | 20 +++++++++---------- .../structured/SchemaKGroupedTableTest.java | 20 +++++++++---------- .../execution/util/ExpressionTypeManager.java | 2 +- .../ksql/execution/function/UdafUtilTest.java | 12 +++++------ .../streams/AggregateParamsFactoryTest.java | 8 ++++---- .../streams/StreamAggregateBuilderTest.java | 6 +++--- .../streams/TableAggregateBuilderTest.java | 4 ++-- 17 files changed, 83 insertions(+), 49 deletions(-) diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 30ed01a9d5f1..41eb3130cfae 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -509,7 +509,9 @@ String getName() { public static final Set SSL_CONFIG_NAMES = sslConfigNames(); public static final Set STREAM_TOPIC_CONFIG_NAMES = streamTopicConfigNames(); - public static final KsqlConfig EMPTY = new KsqlConfig(ImmutableMap.of()); + public static KsqlConfig empty() { + return new KsqlConfig(ImmutableMap.of()); + } private static ConfigDef configDef(final ConfigGeneration generation) { return generation == ConfigGeneration.CURRENT ? CURRENT_DEF : LEGACY_DEF; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafAggregateFunctionFactory.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafAggregateFunctionFactory.java index 6967311e3cd6..1578a9b7dda1 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafAggregateFunctionFactory.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafAggregateFunctionFactory.java @@ -100,7 +100,7 @@ private List buildAllParams( allParams.add(primitiveType); } catch (final Exception e) { throw new KsqlFunctionException("Only primitive init arguments are supported by UDAF " - + getName() + ", but got " + arg); + + getName() + ", but got " + arg, e); } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/array/CollectListUdaf.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/array/CollectListUdaf.java index 697c113666a1..9d698d62ddc2 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/array/CollectListUdaf.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/array/CollectListUdaf.java @@ -75,6 +75,10 @@ private static final class Collect implements TableUdaf, List>, public void configure(final Map map) { final Object limit = map.get(LIMIT_CONFIG); this.limit = (limit == null) ? this.limit : ((Number) limit).intValue(); + + if (this.limit < 0) { + this.limit = Integer.MAX_VALUE; + } } @Override diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/array/CollectSetUdaf.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/array/CollectSetUdaf.java index 14dabc650034..495c6cb542d7 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/array/CollectSetUdaf.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/array/CollectSetUdaf.java @@ -73,6 +73,10 @@ private static final class Collect implements Udaf, List>, Conf public void configure(final Map map) { final Object limit = map.get(LIMIT_CONFIG); this.limit = (limit == null) ? this.limit : ((Number) limit).intValue(); + + if (this.limit < 0) { + this.limit = Integer.MAX_VALUE; + } } @Override diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java index f4d7e8df4870..50e1dcf1f3bc 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java @@ -216,8 +216,7 @@ private SchemaKTable aggregate( functions, windowExpression, valueFormat.getFormatInfo(), - aggregationContext, - ksqlConfig + aggregationContext ); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java b/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java index 716c946e6e2f..0e9484a2879c 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java @@ -67,8 +67,7 @@ public SchemaKTable aggregate( final List aggregations, final Optional windowExpression, final FormatInfo valueFormat, - final Stacker contextStacker, - final KsqlConfig config + final Stacker contextStacker ) { final ExecutionStep> step; final KeyFormat keyFormat; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedTable.java b/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedTable.java index 14a5f79337f9..f0c0c8f6d491 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedTable.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedTable.java @@ -73,15 +73,14 @@ public SchemaKTable aggregate( final List aggregations, final Optional windowExpression, final FormatInfo valueFormat, - final Stacker contextStacker, - final KsqlConfig config + final Stacker contextStacker ) { if (windowExpression.isPresent()) { throw new KsqlException("Windowing not supported for table aggregations."); } final List unsupportedFunctionNames = aggregations.stream() - .map(call -> UdafUtil.resolveAggregateFunction(functionRegistry, call, schema, config)) + .map(call -> UdafUtil.resolveAggregateFunction(functionRegistry, call, schema, ksqlConfig)) .filter(function -> !(function instanceof TableAggregationFunction)) .map(KsqlAggregateFunction::name) .map(FunctionName::text) diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/UdafAggregateFunctionFactoryTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/UdafAggregateFunctionFactoryTest.java index 6480e54276e2..5b33c5c4f72c 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/UdafAggregateFunctionFactoryTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/UdafAggregateFunctionFactoryTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.function.udf.UdfMetadata; import io.confluent.ksql.schema.ksql.types.SqlTypes; import java.math.BigDecimal; @@ -58,7 +59,7 @@ public void shouldAppendInitParamTypesWhenLookingUpFunction() { // When: functionFactory.createAggregateFunction( ImmutableList.of(SqlTypes.STRING), - new AggregateFunctionInitArguments(0, ImmutableList.of(1)) + new AggregateFunctionInitArguments(0, ImmutableMap.of(), ImmutableList.of(1)) ); // Then: @@ -70,7 +71,7 @@ public void shouldHandleNullLiteralParams() { // When: functionFactory.createAggregateFunction( ImmutableList.of(SqlTypes.STRING), - new AggregateFunctionInitArguments(0, Arrays.asList(null, 5L)) + new AggregateFunctionInitArguments(0, ImmutableMap.of(), Arrays.asList(null, 5L)) ); // Then: @@ -82,7 +83,7 @@ public void shouldHandleInitParamsOfAllPrimitiveTypes() { // When: functionFactory.createAggregateFunction( ImmutableList.of(SqlTypes.STRING), - new AggregateFunctionInitArguments(0, ImmutableList.of(true, 1, 1L, 1.0d, "s")) + new AggregateFunctionInitArguments(0, ImmutableMap.of(), ImmutableList.of(true, 1, 1L, 1.0d, "s")) ); // Then: did not throw. @@ -94,7 +95,7 @@ public void shouldThrowOnUnsupportedInitParamType() { final Exception e = assertThrows(KsqlFunctionException.class, () -> functionFactory.createAggregateFunction( ImmutableList.of(SqlTypes.STRING), - new AggregateFunctionInitArguments(0, ImmutableList.of(BigDecimal.ONE)) + new AggregateFunctionInitArguments(0, ImmutableMap.of(), ImmutableList.of(BigDecimal.ONE)) ) ); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/array/CollectListUdafTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/array/CollectListUdafTest.java index d481d70bf6b3..03d264b64428 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/array/CollectListUdafTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/array/CollectListUdafTest.java @@ -78,6 +78,19 @@ public void shouldRespectSizeLimit() { assertThat(runningList, not(hasItem(11))); } + @Test + public void shouldIgnoreNegativeLimit() { + final TableUdaf, List> udaf = CollectListUdaf.createCollectListInt(); + ((Configurable) udaf).configure(ImmutableMap.of(CollectListUdaf.LIMIT_CONFIG, -10)); + + List runningList = udaf.initialize(); + for (int i = 1; i <= 25; i++) { + runningList = udaf.aggregate(i, runningList); + } + + assertThat(runningList, hasSize(25)); + } + @Test public void shouldUndo() { final TableUdaf, List> udaf = CollectListUdaf.createCollectListInt(); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/array/CollectSetUdafTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/array/CollectSetUdafTest.java index ff395a7e1322..895ce8262db2 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/array/CollectSetUdafTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/array/CollectSetUdafTest.java @@ -76,4 +76,17 @@ public void shouldRespectSizeLimit() { assertThat(runningList, not(hasItem(1001))); } + @Test + public void shouldIgnoreNegativeLimit() { + final Udaf, List> udaf = CollectSetUdaf.createCollectSetInt(); + ((Configurable) udaf).configure(ImmutableMap.of(CollectSetUdaf.LIMIT_CONFIG, -1)); + + List runningList = udaf.initialize(); + for (int i = 1; i <= 25; i++) { + runningList = udaf.aggregate(i, runningList); + } + + assertThat(runningList, hasSize(25)); + } + } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java index 54ef47ce0a0d..13651eda01b9 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java @@ -131,8 +131,8 @@ public void shouldReturnKTableWithOutputSchema() { ImmutableList.of(AGG), Optional.empty(), valueFormat.getFormatInfo(), - queryContext, - KsqlConfig.EMPTY); + queryContext + ); // Then: assertThat(result.getSchema(), is(OUT_SCHEMA)); @@ -146,8 +146,8 @@ public void shouldBuildStepForAggregate() { ImmutableList.of(AGG), Optional.empty(), valueFormat.getFormatInfo(), - queryContext, - KsqlConfig.EMPTY); + queryContext + ); // Then: assertThat( @@ -176,8 +176,8 @@ public void shouldBuildStepForAggregateWithKeyFormatSerdeFeaturesInherited() { ImmutableList.of(AGG), Optional.empty(), valueFormat.getFormatInfo(), - queryContext, - KsqlConfig.EMPTY); + queryContext + ); // Then: assertThat( @@ -211,8 +211,8 @@ public void shouldBuildStepForAggregateWithKeyFormatSerdeFeaturesWrappingIsNotSe ImmutableList.of(AGG), Optional.empty(), valueFormat.getFormatInfo(), - queryContext, - KsqlConfig.EMPTY); + queryContext + ); // Then: assertThat( @@ -237,8 +237,8 @@ public void shouldBuildStepForWindowedAggregate() { ImmutableList.of(AGG), Optional.of(windowExp), valueFormat.getFormatInfo(), - queryContext, - KsqlConfig.EMPTY); + queryContext + ); // Then: assertThat( diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java index 2df9c1336f16..9cee6079316e 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java @@ -101,8 +101,8 @@ public void shouldFailWindowedTableAggregation() { ImmutableList.of(SUM, COUNT), Optional.of(windowExp), valueFormat.getFormatInfo(), - queryContext, - KsqlConfig.EMPTY) + queryContext + ) ); // Then: @@ -122,8 +122,8 @@ public void shouldFailUnsupportedAggregateFunction() { ImmutableList.of(MIN, MAX), Optional.empty(), valueFormat.getFormatInfo(), - queryContext, - KsqlConfig.EMPTY) + queryContext + ) ); // Then: @@ -153,8 +153,8 @@ public void shouldBuildStepForAggregate() { ImmutableList.of(SUM, COUNT), Optional.empty(), valueFormat.getFormatInfo(), - queryContext, - KsqlConfig.EMPTY); + queryContext + ); // Then: assertThat( @@ -184,8 +184,8 @@ public void shouldBuildStepForAggregateWithKeyFormatSerdeFeatures() { ImmutableList.of(SUM, COUNT), Optional.empty(), valueFormat.getFormatInfo(), - queryContext, - KsqlConfig.EMPTY); + queryContext + ); // Then: assertThat( @@ -213,8 +213,8 @@ public void shouldReturnKTableWithOutputSchema() { ImmutableList.of(SUM, COUNT), Optional.empty(), valueFormat.getFormatInfo(), - queryContext, - KsqlConfig.EMPTY); + queryContext + ); // Then: assertThat(result.getSchema(), is(OUT_SCHEMA)); diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java index 6e5aa17bbc98..4ba02c7fef00 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java @@ -424,7 +424,7 @@ public Void visitFunctionCall( // use an empty KsqlConfig here because the expression type // of an aggregate function does not depend on the configuration final AggregateFunctionInitArguments args = - UdafUtil.createAggregateFunctionInitArgs(0, node, KsqlConfig.EMPTY); + UdafUtil.createAggregateFunctionInitArgs(0, node, KsqlConfig.empty()); final KsqlAggregateFunction aggFunc = functionRegistry .getAggregateFunction(node.getName(), schema, args); diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/function/UdafUtilTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/function/UdafUtilTest.java index a9778eb99a10..051a9ea7d01f 100644 --- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/function/UdafUtilTest.java +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/function/UdafUtilTest.java @@ -83,7 +83,7 @@ public void init() { public void shouldResolveUDAF() { // When: final KsqlAggregateFunction returned = - UdafUtil.resolveAggregateFunction(functionRegistry, FUNCTION_CALL, SCHEMA, KsqlConfig.EMPTY); + UdafUtil.resolveAggregateFunction(functionRegistry, FUNCTION_CALL, SCHEMA, KsqlConfig.empty()); // Then: assertThat(returned, is(function)); @@ -92,7 +92,7 @@ public void shouldResolveUDAF() { @Test public void shouldGetAggregateWithCorrectName() { // When: - UdafUtil.resolveAggregateFunction(functionRegistry, FUNCTION_CALL, SCHEMA, KsqlConfig.EMPTY); + UdafUtil.resolveAggregateFunction(functionRegistry, FUNCTION_CALL, SCHEMA, KsqlConfig.empty()); // Then: verify(functionRegistry).getAggregateFunction(eq(FUNCTION_NAME), any(), any()); @@ -101,7 +101,7 @@ public void shouldGetAggregateWithCorrectName() { @Test public void shouldGetAggregateWithCorrectType() { // When: - UdafUtil.resolveAggregateFunction(functionRegistry, FUNCTION_CALL, SCHEMA, KsqlConfig.EMPTY); + UdafUtil.resolveAggregateFunction(functionRegistry, FUNCTION_CALL, SCHEMA, KsqlConfig.empty()); // Then: verify(functionRegistry).getAggregateFunction(any(), eq(SqlTypes.BIGINT), any()); @@ -117,7 +117,7 @@ public void shouldNotThrowIfFirstParamNotALiteral() { )); // When: - UdafUtil.createAggregateFunctionInitArgs(0, functionCall, KsqlConfig.EMPTY); + UdafUtil.createAggregateFunctionInitArgs(0, functionCall, KsqlConfig.empty()); // Then: did not throw. } @@ -134,7 +134,7 @@ public void shouldThrowIfSecondParamIsNotALiteral() { // When: final Exception e = assertThrows( KsqlException.class, - () -> UdafUtil.createAggregateFunctionInitArgs(0, functionCall, KsqlConfig.EMPTY) + () -> UdafUtil.createAggregateFunctionInitArgs(0, functionCall, KsqlConfig.empty()) ); // Then: @@ -155,7 +155,7 @@ public void shouldThrowIfSubsequentParamsAreNotLiteral() { // When: final Exception e = assertThrows( KsqlException.class, - () -> UdafUtil.createAggregateFunctionInitArgs(0, functionCall, KsqlConfig.EMPTY) + () -> UdafUtil.createAggregateFunctionInitArgs(0, functionCall, KsqlConfig.empty()) ); // Then: diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/AggregateParamsFactoryTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/AggregateParamsFactoryTest.java index a8d65f62d721..ffd2b002b341 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/AggregateParamsFactoryTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/AggregateParamsFactoryTest.java @@ -115,7 +115,7 @@ public void init() { functionRegistry, FUNCTIONS, false, - KsqlConfig.EMPTY + KsqlConfig.empty() ); } @@ -133,7 +133,7 @@ public void shouldCreateUndoAggregatorWithCorrectParams() { NON_AGG_COLUMNS, functionRegistry, ImmutableList.of(TABLE_AGG), - KsqlConfig.EMPTY + KsqlConfig.empty() ); // Then: @@ -178,7 +178,7 @@ public void shouldReturnUndoAggregator() { NON_AGG_COLUMNS, functionRegistry, ImmutableList.of(TABLE_AGG), - KsqlConfig.EMPTY + KsqlConfig.empty() ); // When: @@ -237,7 +237,7 @@ public void shouldReturnCorrectWindowedSchema() { functionRegistry, FUNCTIONS, true, - KsqlConfig.EMPTY + KsqlConfig.empty() ); // When: diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java index c15783641d4f..c3f6c09627f2 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java @@ -210,7 +210,7 @@ public void init() { when(queryBuilder.buildKeySerde(any(), any(), any())).thenReturn(keySerde); when(queryBuilder.buildValueSerde(any(), any(), any())).thenReturn(valueSerde); when(queryBuilder.getFunctionRegistry()).thenReturn(functionRegistry); - when(queryBuilder.getKsqlConfig()).thenReturn(KsqlConfig.EMPTY); + when(queryBuilder.getKsqlConfig()).thenReturn(KsqlConfig.empty()); when(aggregateParamsFactory.create(any(), any(), any(), any(), anyBoolean(), any())) .thenReturn(aggregateParams); when(aggregateParams.getAggregator()).thenReturn((KudafAggregator) aggregator); @@ -432,7 +432,7 @@ public void shouldBuildAggregatorParamsCorrectlyForUnwindowedAggregate() { functionRegistry, FUNCTIONS, false, - KsqlConfig.EMPTY + KsqlConfig.empty() ); } @@ -671,7 +671,7 @@ public void shouldBuildAggregatorParamsCorrectlyForWindowedAggregate() { // Then: verify(aggregateParamsFactory) - .create(INPUT_SCHEMA, NON_AGG_COLUMNS, functionRegistry, FUNCTIONS, true, KsqlConfig.EMPTY); + .create(INPUT_SCHEMA, NON_AGG_COLUMNS, functionRegistry, FUNCTIONS, true, KsqlConfig.empty()); } } diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java index 41225a1c65ad..45767c87c595 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java @@ -160,7 +160,7 @@ public void init() { when(queryBuilder.buildKeySerde(any(), any(), any())).thenReturn(keySerde); when(queryBuilder.buildValueSerde(any(), any(), any())).thenReturn(valueSerde); when(queryBuilder.getFunctionRegistry()).thenReturn(functionRegistry); - when(queryBuilder.getKsqlConfig()).thenReturn(KsqlConfig.EMPTY); + when(queryBuilder.getKsqlConfig()).thenReturn(KsqlConfig.empty()); when(aggregateParamsFactory.createUndoable(any(), any(), any(), any(), any())) .thenReturn(aggregateParams); when(aggregateParams.getAggregator()).thenReturn((KudafAggregator)aggregator); @@ -270,7 +270,7 @@ public void shouldBuildAggregatorParamsCorrectlyForAggregate() { NON_AGG_COLUMNS, functionRegistry, FUNCTIONS, - KsqlConfig.EMPTY + KsqlConfig.empty() ); }