Skip to content

Commit

Permalink
chore: address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Jan 12, 2021
1 parent b35ea2b commit 82937ff
Show file tree
Hide file tree
Showing 17 changed files with 83 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,9 @@ String getName() {
public static final Set<String> SSL_CONFIG_NAMES = sslConfigNames();
public static final Set<String> STREAM_TOPIC_CONFIG_NAMES = streamTopicConfigNames();

public static final KsqlConfig EMPTY = new KsqlConfig(ImmutableMap.of());
public static KsqlConfig empty() {
return new KsqlConfig(ImmutableMap.of());
}

private static ConfigDef configDef(final ConfigGeneration generation) {
return generation == ConfigGeneration.CURRENT ? CURRENT_DEF : LEGACY_DEF;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private List<SqlType> buildAllParams(
allParams.add(primitiveType);
} catch (final Exception e) {
throw new KsqlFunctionException("Only primitive init arguments are supported by UDAF "
+ getName() + ", but got " + arg);
+ getName() + ", but got " + arg, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ private static final class Collect<T> implements TableUdaf<T, List<T>, List<T>>,
public void configure(final Map<String, ?> map) {
final Object limit = map.get(LIMIT_CONFIG);
this.limit = (limit == null) ? this.limit : ((Number) limit).intValue();

if (this.limit < 0) {
this.limit = Integer.MAX_VALUE;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ private static final class Collect<T> implements Udaf<T, List<T>, List<T>>, Conf
public void configure(final Map<String, ?> map) {
final Object limit = map.get(LIMIT_CONFIG);
this.limit = (limit == null) ? this.limit : ((Number) limit).intValue();

if (this.limit < 0) {
this.limit = Integer.MAX_VALUE;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,7 @@ private SchemaKTable<?> aggregate(
functions,
windowExpression,
valueFormat.getFormatInfo(),
aggregationContext,
ksqlConfig
aggregationContext
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ public SchemaKTable<?> aggregate(
final List<FunctionCall> aggregations,
final Optional<WindowExpression> windowExpression,
final FormatInfo valueFormat,
final Stacker contextStacker,
final KsqlConfig config
final Stacker contextStacker
) {
final ExecutionStep<? extends KTableHolder<?>> step;
final KeyFormat keyFormat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,14 @@ public SchemaKTable<GenericKey> aggregate(
final List<FunctionCall> aggregations,
final Optional<WindowExpression> windowExpression,
final FormatInfo valueFormat,
final Stacker contextStacker,
final KsqlConfig config
final Stacker contextStacker
) {
if (windowExpression.isPresent()) {
throw new KsqlException("Windowing not supported for table aggregations.");
}

final List<String> unsupportedFunctionNames = aggregations.stream()
.map(call -> UdafUtil.resolveAggregateFunction(functionRegistry, call, schema, config))
.map(call -> UdafUtil.resolveAggregateFunction(functionRegistry, call, schema, ksqlConfig))
.filter(function -> !(function instanceof TableAggregationFunction))
.map(KsqlAggregateFunction::name)
.map(FunctionName::text)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.function.udf.UdfMetadata;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import java.math.BigDecimal;
Expand Down Expand Up @@ -58,7 +59,7 @@ public void shouldAppendInitParamTypesWhenLookingUpFunction() {
// When:
functionFactory.createAggregateFunction(
ImmutableList.of(SqlTypes.STRING),
new AggregateFunctionInitArguments(0, ImmutableList.of(1))
new AggregateFunctionInitArguments(0, ImmutableMap.of(), ImmutableList.of(1))
);

// Then:
Expand All @@ -70,7 +71,7 @@ public void shouldHandleNullLiteralParams() {
// When:
functionFactory.createAggregateFunction(
ImmutableList.of(SqlTypes.STRING),
new AggregateFunctionInitArguments(0, Arrays.asList(null, 5L))
new AggregateFunctionInitArguments(0, ImmutableMap.of(), Arrays.asList(null, 5L))
);

// Then:
Expand All @@ -82,7 +83,7 @@ public void shouldHandleInitParamsOfAllPrimitiveTypes() {
// When:
functionFactory.createAggregateFunction(
ImmutableList.of(SqlTypes.STRING),
new AggregateFunctionInitArguments(0, ImmutableList.of(true, 1, 1L, 1.0d, "s"))
new AggregateFunctionInitArguments(0, ImmutableMap.of(), ImmutableList.of(true, 1, 1L, 1.0d, "s"))
);

// Then: did not throw.
Expand All @@ -94,7 +95,7 @@ public void shouldThrowOnUnsupportedInitParamType() {
final Exception e = assertThrows(KsqlFunctionException.class,
() -> functionFactory.createAggregateFunction(
ImmutableList.of(SqlTypes.STRING),
new AggregateFunctionInitArguments(0, ImmutableList.of(BigDecimal.ONE))
new AggregateFunctionInitArguments(0, ImmutableMap.of(), ImmutableList.of(BigDecimal.ONE))
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ public void shouldRespectSizeLimit() {
assertThat(runningList, not(hasItem(11)));
}

@Test
public void shouldIgnoreNegativeLimit() {
final TableUdaf<Integer, List<Integer>, List<Integer>> udaf = CollectListUdaf.createCollectListInt();
((Configurable) udaf).configure(ImmutableMap.of(CollectListUdaf.LIMIT_CONFIG, -10));

List<Integer> runningList = udaf.initialize();
for (int i = 1; i <= 25; i++) {
runningList = udaf.aggregate(i, runningList);
}

assertThat(runningList, hasSize(25));
}

@Test
public void shouldUndo() {
final TableUdaf<Integer, List<Integer>, List<Integer>> udaf = CollectListUdaf.createCollectListInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,17 @@ public void shouldRespectSizeLimit() {
assertThat(runningList, not(hasItem(1001)));
}

@Test
public void shouldIgnoreNegativeLimit() {
final Udaf<Integer, List<Integer>, List<Integer>> udaf = CollectSetUdaf.createCollectSetInt();
((Configurable) udaf).configure(ImmutableMap.of(CollectSetUdaf.LIMIT_CONFIG, -1));

List<Integer> runningList = udaf.initialize();
for (int i = 1; i <= 25; i++) {
runningList = udaf.aggregate(i, runningList);
}

assertThat(runningList, hasSize(25));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ public void shouldReturnKTableWithOutputSchema() {
ImmutableList.of(AGG),
Optional.empty(),
valueFormat.getFormatInfo(),
queryContext,
KsqlConfig.EMPTY);
queryContext
);

// Then:
assertThat(result.getSchema(), is(OUT_SCHEMA));
Expand All @@ -146,8 +146,8 @@ public void shouldBuildStepForAggregate() {
ImmutableList.of(AGG),
Optional.empty(),
valueFormat.getFormatInfo(),
queryContext,
KsqlConfig.EMPTY);
queryContext
);

// Then:
assertThat(
Expand Down Expand Up @@ -176,8 +176,8 @@ public void shouldBuildStepForAggregateWithKeyFormatSerdeFeaturesInherited() {
ImmutableList.of(AGG),
Optional.empty(),
valueFormat.getFormatInfo(),
queryContext,
KsqlConfig.EMPTY);
queryContext
);

// Then:
assertThat(
Expand Down Expand Up @@ -211,8 +211,8 @@ public void shouldBuildStepForAggregateWithKeyFormatSerdeFeaturesWrappingIsNotSe
ImmutableList.of(AGG),
Optional.empty(),
valueFormat.getFormatInfo(),
queryContext,
KsqlConfig.EMPTY);
queryContext
);

// Then:
assertThat(
Expand All @@ -237,8 +237,8 @@ public void shouldBuildStepForWindowedAggregate() {
ImmutableList.of(AGG),
Optional.of(windowExp),
valueFormat.getFormatInfo(),
queryContext,
KsqlConfig.EMPTY);
queryContext
);

// Then:
assertThat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ public void shouldFailWindowedTableAggregation() {
ImmutableList.of(SUM, COUNT),
Optional.of(windowExp),
valueFormat.getFormatInfo(),
queryContext,
KsqlConfig.EMPTY)
queryContext
)
);

// Then:
Expand All @@ -122,8 +122,8 @@ public void shouldFailUnsupportedAggregateFunction() {
ImmutableList.of(MIN, MAX),
Optional.empty(),
valueFormat.getFormatInfo(),
queryContext,
KsqlConfig.EMPTY)
queryContext
)
);

// Then:
Expand Down Expand Up @@ -153,8 +153,8 @@ public void shouldBuildStepForAggregate() {
ImmutableList.of(SUM, COUNT),
Optional.empty(),
valueFormat.getFormatInfo(),
queryContext,
KsqlConfig.EMPTY);
queryContext
);

// Then:
assertThat(
Expand Down Expand Up @@ -184,8 +184,8 @@ public void shouldBuildStepForAggregateWithKeyFormatSerdeFeatures() {
ImmutableList.of(SUM, COUNT),
Optional.empty(),
valueFormat.getFormatInfo(),
queryContext,
KsqlConfig.EMPTY);
queryContext
);

// Then:
assertThat(
Expand Down Expand Up @@ -213,8 +213,8 @@ public void shouldReturnKTableWithOutputSchema() {
ImmutableList.of(SUM, COUNT),
Optional.empty(),
valueFormat.getFormatInfo(),
queryContext,
KsqlConfig.EMPTY);
queryContext
);

// Then:
assertThat(result.getSchema(), is(OUT_SCHEMA));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ public Void visitFunctionCall(
// use an empty KsqlConfig here because the expression type
// of an aggregate function does not depend on the configuration
final AggregateFunctionInitArguments args =
UdafUtil.createAggregateFunctionInitArgs(0, node, KsqlConfig.EMPTY);
UdafUtil.createAggregateFunctionInitArgs(0, node, KsqlConfig.empty());

final KsqlAggregateFunction<?,?,?> aggFunc = functionRegistry
.getAggregateFunction(node.getName(), schema, args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void init() {
public void shouldResolveUDAF() {
// When:
final KsqlAggregateFunction returned =
UdafUtil.resolveAggregateFunction(functionRegistry, FUNCTION_CALL, SCHEMA, KsqlConfig.EMPTY);
UdafUtil.resolveAggregateFunction(functionRegistry, FUNCTION_CALL, SCHEMA, KsqlConfig.empty());

// Then:
assertThat(returned, is(function));
Expand All @@ -92,7 +92,7 @@ public void shouldResolveUDAF() {
@Test
public void shouldGetAggregateWithCorrectName() {
// When:
UdafUtil.resolveAggregateFunction(functionRegistry, FUNCTION_CALL, SCHEMA, KsqlConfig.EMPTY);
UdafUtil.resolveAggregateFunction(functionRegistry, FUNCTION_CALL, SCHEMA, KsqlConfig.empty());

// Then:
verify(functionRegistry).getAggregateFunction(eq(FUNCTION_NAME), any(), any());
Expand All @@ -101,7 +101,7 @@ public void shouldGetAggregateWithCorrectName() {
@Test
public void shouldGetAggregateWithCorrectType() {
// When:
UdafUtil.resolveAggregateFunction(functionRegistry, FUNCTION_CALL, SCHEMA, KsqlConfig.EMPTY);
UdafUtil.resolveAggregateFunction(functionRegistry, FUNCTION_CALL, SCHEMA, KsqlConfig.empty());

// Then:
verify(functionRegistry).getAggregateFunction(any(), eq(SqlTypes.BIGINT), any());
Expand All @@ -117,7 +117,7 @@ public void shouldNotThrowIfFirstParamNotALiteral() {
));

// When:
UdafUtil.createAggregateFunctionInitArgs(0, functionCall, KsqlConfig.EMPTY);
UdafUtil.createAggregateFunctionInitArgs(0, functionCall, KsqlConfig.empty());

// Then: did not throw.
}
Expand All @@ -134,7 +134,7 @@ public void shouldThrowIfSecondParamIsNotALiteral() {
// When:
final Exception e = assertThrows(
KsqlException.class,
() -> UdafUtil.createAggregateFunctionInitArgs(0, functionCall, KsqlConfig.EMPTY)
() -> UdafUtil.createAggregateFunctionInitArgs(0, functionCall, KsqlConfig.empty())
);

// Then:
Expand All @@ -155,7 +155,7 @@ public void shouldThrowIfSubsequentParamsAreNotLiteral() {
// When:
final Exception e = assertThrows(
KsqlException.class,
() -> UdafUtil.createAggregateFunctionInitArgs(0, functionCall, KsqlConfig.EMPTY)
() -> UdafUtil.createAggregateFunctionInitArgs(0, functionCall, KsqlConfig.empty())
);

// Then:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void init() {
functionRegistry,
FUNCTIONS,
false,
KsqlConfig.EMPTY
KsqlConfig.empty()
);
}

Expand All @@ -133,7 +133,7 @@ public void shouldCreateUndoAggregatorWithCorrectParams() {
NON_AGG_COLUMNS,
functionRegistry,
ImmutableList.of(TABLE_AGG),
KsqlConfig.EMPTY
KsqlConfig.empty()
);

// Then:
Expand Down Expand Up @@ -178,7 +178,7 @@ public void shouldReturnUndoAggregator() {
NON_AGG_COLUMNS,
functionRegistry,
ImmutableList.of(TABLE_AGG),
KsqlConfig.EMPTY
KsqlConfig.empty()
);

// When:
Expand Down Expand Up @@ -237,7 +237,7 @@ public void shouldReturnCorrectWindowedSchema() {
functionRegistry,
FUNCTIONS,
true,
KsqlConfig.EMPTY
KsqlConfig.empty()
);

// When:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public void init() {
when(queryBuilder.buildKeySerde(any(), any(), any())).thenReturn(keySerde);
when(queryBuilder.buildValueSerde(any(), any(), any())).thenReturn(valueSerde);
when(queryBuilder.getFunctionRegistry()).thenReturn(functionRegistry);
when(queryBuilder.getKsqlConfig()).thenReturn(KsqlConfig.EMPTY);
when(queryBuilder.getKsqlConfig()).thenReturn(KsqlConfig.empty());
when(aggregateParamsFactory.create(any(), any(), any(), any(), anyBoolean(), any()))
.thenReturn(aggregateParams);
when(aggregateParams.getAggregator()).thenReturn((KudafAggregator) aggregator);
Expand Down Expand Up @@ -432,7 +432,7 @@ public void shouldBuildAggregatorParamsCorrectlyForUnwindowedAggregate() {
functionRegistry,
FUNCTIONS,
false,
KsqlConfig.EMPTY
KsqlConfig.empty()
);
}

Expand Down Expand Up @@ -671,7 +671,7 @@ public void shouldBuildAggregatorParamsCorrectlyForWindowedAggregate() {

// Then:
verify(aggregateParamsFactory)
.create(INPUT_SCHEMA, NON_AGG_COLUMNS, functionRegistry, FUNCTIONS, true, KsqlConfig.EMPTY);
.create(INPUT_SCHEMA, NON_AGG_COLUMNS, functionRegistry, FUNCTIONS, true, KsqlConfig.empty());
}
}

Expand Down
Loading

0 comments on commit 82937ff

Please sign in to comment.