Skip to content

Commit

Permalink
fix: do not filter out rows where PARTITION BY resolves to null (#4823)
Browse files Browse the repository at this point in the history
* fix: fix repartition semantics

Fixes: #4749

##### Background

This change fixes an issue with our repartition semantics.

Old style query semantics for partition by are broken:

S1: ROWKEY => B, C  (Meaning S1 has a schema with ROWKEY as the key column, and B and C as value columns - types aren't important).

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: ROWKEY => B, C

As you can see the schema of S2 is still the same.  However, the old data in the key has been lost as its been overridden with the data from B, and the key now duplicates the data stored in B.

This loss of data on a `SELECT * .. PARTITION BY` needed fixing.

Secondly, with new primitive key [work to remove the restriction on key column naming](#3536), the same query semantics do not work. e.g.

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: B => B, C

This fails as the `B` value column clashes with the `B` key column!

##### The fix

This commit fixes the PARTITION BY semantics so that any PARTITION BY on a specific column sees the old key column being moved to the value and the new key column being moved from the value to the key. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

Results in the schema: S2: B => C, A.

If a PARTITION BY is an expression other than a column reference, then ksql will synthesis a new column name. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY CAST(B AS INT);
```

Results in the schema: S2: KSQL_COL_0 => B, C, A.

[This github issue](#4813) will add the ability to use aliases in PARTITION BY expressions, allowing a custom name to be assigned.

* fix: do not filter out rows where PARTITION BY resolves to null

Fixes: #4747

This commit removes the filter that was excluding any rows where the `PARTITION BY` clause resolved to a `null` value, i.e. either because the result was `null` or because an error occurred evaluating the expression.

This change will only affect new queries started. Pre-existing queries will continue to run as before.

* docs: call out limitation of partiiton by NULL
  • Loading branch information
big-andy-coates authored Mar 23, 2020
1 parent 9a26ac3 commit e75a792
Show file tree
Hide file tree
Showing 13 changed files with 227 additions and 181 deletions.
6 changes: 6 additions & 0 deletions docs-md/developer-guide/joins/partition-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ only if they are also both in the same partition after the repartition.
Otherwise, Kafka is likely to interleave messages. The use case will determine
if these ordering guarantees are acceptable.

!!! important
If the PARTITION BY expression evaluates to NULL, the resulting row is produced to a
random partition. You many want to use [COALESCE](../syntax-reference#coalesce) to wrap
the expression and convert any NULL values to a default value, for example,
`PARTITION BY COALESCE(MY_UDF_THAT_MAY_FAIL(Col0), 0)`.

For example, if you need to re-partition a stream to be keyed by a `product_id`
field, and keys need to be distributed over 6 partitions to make a join work,
use the following SQL statement:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,8 @@ private void logProcessingError(
final Exception e,
final GenericRow row
) {
processingLogger.error(
RecordProcessingError
.recordProcessingError(errorMsg + e.getMessage(), e, row)
);
processingLogger.error(RecordProcessingError
.recordProcessingError(errorMsg + e.getMessage(), e, row));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,14 @@
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-right-repartition", "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-left-repartition", "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-left-repartition", "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 0, "end": 11000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 0, "T_K": "", "T_ID": 0, "T_NAME": "zero", "T_VALUE": 0}, "timestamp": 0},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 10000, "end": 21000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 10000, "TT_K": "", "TT_ID": 0, "TT_F1": "blah", "TT_F2": 50}, "timestamp": 10000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 11000, "end": 22000, "type": "time"}, "key": 10, "value": {"T_ROWTIME": 11000, "T_K": "", "T_ID": 10, "T_NAME": "100", "T_VALUE": 5}, "timestamp": 11000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 13000, "end": 24000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 13000, "T_K": "", "T_ID": 0, "T_NAME": "foo", "T_VALUE": 100}, "timestamp": 13000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 15000, "end": 26000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 15000, "TT_K": "", "TT_ID": 0, "TT_F1": "a", "TT_F2": 10}, "timestamp": 15000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 16000, "end": 27000, "type": "time"}, "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 17000, "end": 28000, "type": "time"}, "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 30000, "end": 41000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 0, "end": 11000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 0, "T_K": "", "T_ID": 0, "T_NAME": "zero", "T_VALUE": 0}, "timestamp": 0},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 10000, "end": 21000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 10000, "TT_K": "", "TT_ID": 0, "TT_F1": "blah", "TT_F2": 50}, "timestamp": 10000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 11000, "end": 22000, "type": "time"}, "key": 10, "value": {"T_ROWTIME": 11000, "T_K": "", "T_ID": 10, "T_NAME": "100", "T_VALUE": 5}, "timestamp": 11000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 13000, "end": 24000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 13000, "T_K": "", "T_ID": 0, "T_NAME": "foo", "T_VALUE": 100}, "timestamp": 13000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 15000, "end": 26000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 15000, "TT_K": "", "TT_ID": 0, "TT_F1": "a", "TT_F2": 10}, "timestamp": 15000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 16000, "end": 27000, "type": "time"}, "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 17000, "end": 28000, "type": "time"}, "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 30000, "end": 41000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": null, "F2": null}, "timestamp": 0},
{"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "blah", "F2": 50}, "timestamp": 10000},
{"topic": "LEFT_OUTER_JOIN", "key": 10, "value": {"T_ID": 10, "NAME": "100", "VALUE": 5, "F1": null, "F2": null}, "timestamp": 11000},
Expand Down Expand Up @@ -309,14 +309,14 @@
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-right-repartition", "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-left-repartition", "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-left-repartition", "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 0, "end": 11000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 0, "T_K": "", "T_ID": 0, "T_NAME": "zero", "T_VALUE": 0}, "timestamp": 0},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 10000, "end": 21000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 10000, "TT_K": "", "TT_ID": 0, "TT_F1": "blah", "TT_F2": 50}, "timestamp": 10000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 11000, "end": 22000, "type": "time"}, "key": 10, "value": {"T_ROWTIME": 11000, "T_K": "", "T_ID": 10, "T_NAME": "100", "T_VALUE": 5}, "timestamp": 11000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 13000, "end": 24000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 13000, "T_K": "", "T_ID": 0, "T_NAME": "foo", "T_VALUE": 100}, "timestamp": 13000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 15000, "end": 26000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 15000, "TT_K": "", "TT_ID": 0, "TT_F1": "a", "TT_F2": 10}, "timestamp": 15000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 16000, "end": 27000, "type": "time"}, "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 17000, "end": 28000, "type": "time"}, "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 30000, "end": 41000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 0, "end": 11000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 0, "T_K": "", "T_ID": 0, "T_NAME": "zero", "T_VALUE": 0}, "timestamp": 0},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 10000, "end": 21000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 10000, "TT_K": "", "TT_ID": 0, "TT_F1": "blah", "TT_F2": 50}, "timestamp": 10000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 11000, "end": 22000, "type": "time"}, "key": 10, "value": {"T_ROWTIME": 11000, "T_K": "", "T_ID": 10, "T_NAME": "100", "T_VALUE": 5}, "timestamp": 11000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 13000, "end": 24000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 13000, "T_K": "", "T_ID": 0, "T_NAME": "foo", "T_VALUE": 100}, "timestamp": 13000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 15000, "end": 26000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 15000, "TT_K": "", "TT_ID": 0, "TT_F1": "a", "TT_F2": 10}, "timestamp": 15000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 16000, "end": 27000, "type": "time"}, "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 17000, "end": 28000, "type": "time"}, "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 30000, "end": 41000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "", "F2": 0}, "timestamp": 0},
{"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "blah", "F2": 50}, "timestamp": 10000},
{"topic": "LEFT_OUTER_JOIN", "key": 10, "value": {"T_ID": 10, "NAME": "100", "VALUE": 5, "F1": "", "F2": 0}, "timestamp": 11000},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,26 @@
]
}
},
{
"name": "nulls",
"statements": [
"CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar) with (kafka_topic='test_topic', value_format = 'delimited');",
"CREATE STREAM REPARTITIONED AS select id from TEST partition by name;"
],
"properties": {
"ksql.any.key.name.enabled": true
},
"inputs": [
{"topic": "test_topic", "key": 1, "value": null},
{"topic": "test_topic", "key": 2, "value": "4,"},
{"topic": "test_topic", "key": 3, "value": "5,zero"}
],
"outputs": [
{"topic": "REPARTITIONED", "key": null, "value": null},
{"topic": "REPARTITIONED", "key": null, "value": "4"},
{"topic": "REPARTITIONED", "key": "zero", "value": "5"}
]
},
{
"name": "partition by with projection select some",
"statements": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,51 @@
{"topic": "OUTPUT", "key": 0.11, "value": "3"},
{"topic": "OUTPUT", "key": 1.1, "value": "4"}
]
},
{
"name": "project nulls",
"statements": [
"CREATE STREAM INPUT (ID INT KEY, NAME STRING) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT as SELECT name FROM INPUT;"
],
"properties": {
"ksql.any.key.name.enabled": true
},
"inputs": [
{"topic": "test_topic", "key": 1, "value": {"NAME": "Nick"}},
{"topic": "test_topic", "key": null, "value": {"NAME": "null key"}},
{"topic": "test_topic", "key": 2, "value": {}},
{"topic": "test_topic", "key": 3, "value": null},
{"topic": "test_topic", "key": 4, "value": {"NAME": "Fred"}}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"NAME": "Nick"}},
{"topic": "OUTPUT", "key": null, "value": {"NAME": "null key"}},
{"topic": "OUTPUT", "key": 2, "value": {"NAME": null}},
{"topic": "OUTPUT", "key": 3, "value": null},
{"topic": "OUTPUT", "key": 4, "value": {"NAME": "Fred"}}
]
},
{
"name": "filter nulls",
"statements": [
"CREATE STREAM INPUT (ID INT KEY, NAME STRING) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT as SELECT * FROM INPUT WHERE ID IS NOT NULL AND NAME IS NOT NULL;"
],
"properties": {
"ksql.any.key.name.enabled": true
},
"inputs": [
{"topic": "test_topic", "key": 1, "value": {"NAME": "Nick"}},
{"topic": "test_topic", "key": null, "value": {"NAME": "null key"}},
{"topic": "test_topic", "key": 2, "value": {}},
{"topic": "test_topic", "key": 3, "value": null},
{"topic": "test_topic", "key": 4, "value": {"NAME": "Fred"}}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"NAME": "Nick"}},
{"topic": "OUTPUT", "key": 4, "value": {"NAME": "Fred"}}
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package io.confluent.ksql.parser.json;

import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -26,7 +26,6 @@
import io.confluent.ksql.schema.ksql.types.SqlStruct;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import org.hamcrest.CoreMatchers;
import org.junit.Test;

public class KsqlTypesSerdeModuleTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,27 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.KeyValue;

public final class PartitionByParams {

private final LogicalSchema schema;
private BiPredicate<Struct, GenericRow> predicate;
private BiFunction<Struct, GenericRow, KeyValue<Struct, GenericRow>> mapper;
private final BiFunction<Object, GenericRow, KeyValue<Struct, GenericRow>> mapper;

public PartitionByParams(
final LogicalSchema schema,
final BiPredicate<Struct, GenericRow> predicate,
final BiFunction<Struct, GenericRow, KeyValue<Struct, GenericRow>> mapper
final BiFunction<Object, GenericRow, KeyValue<Struct, GenericRow>> mapper
) {
this.schema = requireNonNull(schema, "schema");
this.predicate = requireNonNull(predicate, "predicate");
this.mapper = requireNonNull(mapper, "mapper");
}

public LogicalSchema getSchema() {
return schema;
}

public BiPredicate<Struct, GenericRow> getPredicate() {
return predicate;
}

public BiFunction<Struct, GenericRow, KeyValue<Struct, GenericRow>> getMapper() {
public BiFunction<Object, GenericRow, KeyValue<Struct, GenericRow>> getMapper() {
return mapper;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.confluent.ksql.util.KsqlConfig;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Function;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -85,7 +84,10 @@ public static PartitionByParams build(
final LogicalSchema resultSchema =
buildSchema(sourceSchema, partitionBy, functionRegistry, partitionByCol);

return buildMapper(resultSchema, partitionByCol, evaluator);
final BiFunction<Object, GenericRow, KeyValue<Struct, GenericRow>> mapper =
buildMapper(resultSchema, partitionByCol, evaluator);

return new PartitionByParams(resultSchema, mapper);
}

public static LogicalSchema buildSchema(
Expand Down Expand Up @@ -143,7 +145,7 @@ private static Optional<Column> getPartitionByCol(
return Optional.of(column);
}

private static PartitionByParams buildMapper(
private static BiFunction<Object, GenericRow, KeyValue<Struct, GenericRow>> buildMapper(
final LogicalSchema resultSchema,
final Optional<Column> partitionByCol,
final Function<GenericRow, Object> evaluator
Expand All @@ -155,15 +157,7 @@ private static PartitionByParams buildMapper(

final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(resultSchema);

final BiPredicate<Struct, GenericRow> predicate = (k, v) -> {
if (v == null) {
return false;
}

return evaluator.apply(v) != null;
};

final BiFunction<Struct, GenericRow, KeyValue<Struct, GenericRow>> mapper = (k, v) -> {
return (k, v) -> {
final Object newKey = evaluator.apply(v);
final Struct structKey = keyBuilder.build(newKey);

Expand All @@ -173,8 +167,6 @@ private static PartitionByParams buildMapper(

return new KeyValue<>(structKey, v);
};

return new PartitionByParams(resultSchema, predicate, mapper);
}

private static Function<GenericRow, Object> buildExpressionEvaluator(
Expand Down
Loading

0 comments on commit e75a792

Please sign in to comment.