Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: do not filter out rows where PARTITION BY resolves to null #4823

Merged
6 changes: 6 additions & 0 deletions docs-md/developer-guide/joins/partition-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ only if they are also both in the same partition after the repartition.
Otherwise, Kafka is likely to interleave messages. The use case will determine
if these ordering guarantees are acceptable.

!!! important
If the PARTITION BY expression evaluates to NULL, the resulting row is produced to a
random partition. You many want to use [COALESCE](../syntax-reference#coalesce) to wrap
the expression and convert any NULL values to a default value, for example,
`PARTITION BY COALESCE(MY_UDF_THAT_MAY_FAIL(Col0), 0)`.

For example, if you need to re-partition a stream to be keyed by a `product_id`
field, and keys need to be distributed over 6 partitions to make a join work,
use the following SQL statement:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,8 @@ private void logProcessingError(
final Exception e,
final GenericRow row
) {
processingLogger.error(
RecordProcessingError
.recordProcessingError(errorMsg + e.getMessage(), e, row)
);
processingLogger.error(RecordProcessingError
.recordProcessingError(errorMsg + e.getMessage(), e, row));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,14 @@
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-right-repartition", "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-left-repartition", "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-left-repartition", "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 0, "end": 11000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 0, "T_K": "", "T_ID": 0, "T_NAME": "zero", "T_VALUE": 0}, "timestamp": 0},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 10000, "end": 21000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 10000, "TT_K": "", "TT_ID": 0, "TT_F1": "blah", "TT_F2": 50}, "timestamp": 10000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 11000, "end": 22000, "type": "time"}, "key": 10, "value": {"T_ROWTIME": 11000, "T_K": "", "T_ID": 10, "T_NAME": "100", "T_VALUE": 5}, "timestamp": 11000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 13000, "end": 24000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 13000, "T_K": "", "T_ID": 0, "T_NAME": "foo", "T_VALUE": 100}, "timestamp": 13000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 15000, "end": 26000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 15000, "TT_K": "", "TT_ID": 0, "TT_F1": "a", "TT_F2": 10}, "timestamp": 15000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 16000, "end": 27000, "type": "time"}, "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 17000, "end": 28000, "type": "time"}, "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 30000, "end": 41000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 0, "end": 11000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 0, "T_K": "", "T_ID": 0, "T_NAME": "zero", "T_VALUE": 0}, "timestamp": 0},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: it's safe to change these as they are from an unreleased feature...

{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 10000, "end": 21000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 10000, "TT_K": "", "TT_ID": 0, "TT_F1": "blah", "TT_F2": 50}, "timestamp": 10000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 11000, "end": 22000, "type": "time"}, "key": 10, "value": {"T_ROWTIME": 11000, "T_K": "", "T_ID": 10, "T_NAME": "100", "T_VALUE": 5}, "timestamp": 11000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 13000, "end": 24000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 13000, "T_K": "", "T_ID": 0, "T_NAME": "foo", "T_VALUE": 100}, "timestamp": 13000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 15000, "end": 26000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 15000, "TT_K": "", "TT_ID": 0, "TT_F1": "a", "TT_F2": 10}, "timestamp": 15000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 16000, "end": 27000, "type": "time"}, "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 17000, "end": 28000, "type": "time"}, "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 30000, "end": 41000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": null, "F2": null}, "timestamp": 0},
{"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "blah", "F2": 50}, "timestamp": 10000},
{"topic": "LEFT_OUTER_JOIN", "key": 10, "value": {"T_ID": 10, "NAME": "100", "VALUE": 5, "F1": null, "F2": null}, "timestamp": 11000},
Expand Down Expand Up @@ -309,14 +309,14 @@
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-right-repartition", "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-left-repartition", "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-left-repartition", "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 0, "end": 11000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 0, "T_K": "", "T_ID": 0, "T_NAME": "zero", "T_VALUE": 0}, "timestamp": 0},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 10000, "end": 21000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 10000, "TT_K": "", "TT_ID": 0, "TT_F1": "blah", "TT_F2": 50}, "timestamp": 10000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 11000, "end": 22000, "type": "time"}, "key": 10, "value": {"T_ROWTIME": 11000, "T_K": "", "T_ID": 10, "T_NAME": "100", "T_VALUE": 5}, "timestamp": 11000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 13000, "end": 24000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 13000, "T_K": "", "T_ID": 0, "T_NAME": "foo", "T_VALUE": 100}, "timestamp": 13000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 15000, "end": 26000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 15000, "TT_K": "", "TT_ID": 0, "TT_F1": "a", "TT_F2": 10}, "timestamp": 15000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 16000, "end": 27000, "type": "time"}, "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 17000, "end": 28000, "type": "time"}, "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 30000, "end": 41000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 0, "end": 11000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 0, "T_K": "", "T_ID": 0, "T_NAME": "zero", "T_VALUE": 0}, "timestamp": 0},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 10000, "end": 21000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 10000, "TT_K": "", "TT_ID": 0, "TT_F1": "blah", "TT_F2": 50}, "timestamp": 10000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 11000, "end": 22000, "type": "time"}, "key": 10, "value": {"T_ROWTIME": 11000, "T_K": "", "T_ID": 10, "T_NAME": "100", "T_VALUE": 5}, "timestamp": 11000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 13000, "end": 24000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 13000, "T_K": "", "T_ID": 0, "T_NAME": "foo", "T_VALUE": 100}, "timestamp": 13000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 15000, "end": 26000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 15000, "TT_K": "", "TT_ID": 0, "TT_F1": "a", "TT_F2": 10}, "timestamp": 15000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 16000, "end": 27000, "type": "time"}, "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 17000, "end": 28000, "type": "time"}, "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 30000, "end": 41000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "", "F2": 0}, "timestamp": 0},
{"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "blah", "F2": 50}, "timestamp": 10000},
{"topic": "LEFT_OUTER_JOIN", "key": 10, "value": {"T_ID": 10, "NAME": "100", "VALUE": 5, "F1": "", "F2": 0}, "timestamp": 11000},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,26 @@
]
}
},
{
"name": "nulls",
"statements": [
"CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar) with (kafka_topic='test_topic', value_format = 'delimited');",
"CREATE STREAM REPARTITIONED AS select id from TEST partition by name;"
],
"properties": {
"ksql.any.key.name.enabled": true
},
"inputs": [
{"topic": "test_topic", "key": 1, "value": null},
{"topic": "test_topic", "key": 2, "value": "4,"},
{"topic": "test_topic", "key": 3, "value": "5,zero"}
],
"outputs": [
{"topic": "REPARTITIONED", "key": null, "value": null},
{"topic": "REPARTITIONED", "key": null, "value": "4"},
{"topic": "REPARTITIONED", "key": "zero", "value": "5"}
]
},
{
"name": "partition by with projection select some",
"statements": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,51 @@
{"topic": "OUTPUT", "key": 0.11, "value": "3"},
{"topic": "OUTPUT", "key": 1.1, "value": "4"}
]
},
{
"name": "project nulls",
"statements": [
"CREATE STREAM INPUT (ID INT KEY, NAME STRING) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT as SELECT name FROM INPUT;"
],
"properties": {
"ksql.any.key.name.enabled": true
},
"inputs": [
{"topic": "test_topic", "key": 1, "value": {"NAME": "Nick"}},
{"topic": "test_topic", "key": null, "value": {"NAME": "null key"}},
{"topic": "test_topic", "key": 2, "value": {}},
{"topic": "test_topic", "key": 3, "value": null},
{"topic": "test_topic", "key": 4, "value": {"NAME": "Fred"}}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"NAME": "Nick"}},
{"topic": "OUTPUT", "key": null, "value": {"NAME": "null key"}},
{"topic": "OUTPUT", "key": 2, "value": {"NAME": null}},
{"topic": "OUTPUT", "key": 3, "value": null},
{"topic": "OUTPUT", "key": 4, "value": {"NAME": "Fred"}}
]
},
{
"name": "filter nulls",
"statements": [
"CREATE STREAM INPUT (ID INT KEY, NAME STRING) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT as SELECT * FROM INPUT WHERE ID IS NOT NULL AND NAME IS NOT NULL;"
],
"properties": {
"ksql.any.key.name.enabled": true
},
"inputs": [
{"topic": "test_topic", "key": 1, "value": {"NAME": "Nick"}},
{"topic": "test_topic", "key": null, "value": {"NAME": "null key"}},
{"topic": "test_topic", "key": 2, "value": {}},
{"topic": "test_topic", "key": 3, "value": null},
{"topic": "test_topic", "key": 4, "value": {"NAME": "Fred"}}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"NAME": "Nick"}},
{"topic": "OUTPUT", "key": 4, "value": {"NAME": "Fred"}}
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package io.confluent.ksql.parser.json;

import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -26,7 +26,6 @@
import io.confluent.ksql.schema.ksql.types.SqlStruct;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import org.hamcrest.CoreMatchers;
import org.junit.Test;

public class KsqlTypesSerdeModuleTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,27 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.KeyValue;

public final class PartitionByParams {

private final LogicalSchema schema;
private BiPredicate<Struct, GenericRow> predicate;
private BiFunction<Struct, GenericRow, KeyValue<Struct, GenericRow>> mapper;
private final BiFunction<Object, GenericRow, KeyValue<Struct, GenericRow>> mapper;

public PartitionByParams(
final LogicalSchema schema,
final BiPredicate<Struct, GenericRow> predicate,
final BiFunction<Struct, GenericRow, KeyValue<Struct, GenericRow>> mapper
final BiFunction<Object, GenericRow, KeyValue<Struct, GenericRow>> mapper
) {
this.schema = requireNonNull(schema, "schema");
this.predicate = requireNonNull(predicate, "predicate");
this.mapper = requireNonNull(mapper, "mapper");
}

public LogicalSchema getSchema() {
return schema;
}

public BiPredicate<Struct, GenericRow> getPredicate() {
return predicate;
}

public BiFunction<Struct, GenericRow, KeyValue<Struct, GenericRow>> getMapper() {
public BiFunction<Object, GenericRow, KeyValue<Struct, GenericRow>> getMapper() {
return mapper;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.confluent.ksql.util.KsqlConfig;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Function;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -85,7 +84,10 @@ public static PartitionByParams build(
final LogicalSchema resultSchema =
buildSchema(sourceSchema, partitionBy, functionRegistry, partitionByCol);

return buildMapper(resultSchema, partitionByCol, evaluator);
final BiFunction<Object, GenericRow, KeyValue<Struct, GenericRow>> mapper =
buildMapper(resultSchema, partitionByCol, evaluator);

return new PartitionByParams(resultSchema, mapper);
}

public static LogicalSchema buildSchema(
Expand Down Expand Up @@ -143,7 +145,7 @@ private static Optional<Column> getPartitionByCol(
return Optional.of(column);
}

private static PartitionByParams buildMapper(
private static BiFunction<Object, GenericRow, KeyValue<Struct, GenericRow>> buildMapper(
final LogicalSchema resultSchema,
final Optional<Column> partitionByCol,
final Function<GenericRow, Object> evaluator
Expand All @@ -155,15 +157,7 @@ private static PartitionByParams buildMapper(

final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(resultSchema);

final BiPredicate<Struct, GenericRow> predicate = (k, v) -> {
if (v == null) {
return false;
}

return evaluator.apply(v) != null;
};

final BiFunction<Struct, GenericRow, KeyValue<Struct, GenericRow>> mapper = (k, v) -> {
return (k, v) -> {
final Object newKey = evaluator.apply(v);
final Struct structKey = keyBuilder.build(newKey);

Expand All @@ -173,8 +167,6 @@ private static PartitionByParams buildMapper(

return new KeyValue<>(structKey, v);
};

return new PartitionByParams(resultSchema, predicate, mapper);
}

private static Function<GenericRow, Object> buildExpressionEvaluator(
Expand Down
Loading