Skip to content

Commit

Permalink
chore: split key format feature flag
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia committed Dec 2, 2020
1 parent 07dc0c7 commit 47a9c95
Show file tree
Hide file tree
Showing 16 changed files with 122 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,17 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_KEY_FORMAT_ENABLED = "ksql.key.format.enabled";
public static final Boolean KSQL_KEY_FORMAT_ENABLED_DEFAULT = false;
public static final String KSQL_KEY_FORMAT_ENABLED_DOC =
"Feature flag for non-Kafka key formats";
"Feature flag for key formats under development";

public static final String KSQL_COMPLEX_KEY_FORMAT_ENABLED = "ksql.complex.key.format.enabled";
public static final Boolean KSQL_COMPLEX_KEY_FORMAT_ENABLED_DEFAULT = false;
public static final String KSQL_COMPLEX_KEY_FORMAT_ENABLED_DOC =
"Feature flag for complex (non-primitive) keys";

public static final String KSQL_MULTICOL_KEY_FORMAT_ENABLED = "ksql.multicol.key.format.enabled";
public static final Boolean KSQL_MULTICOL_KEY_FORMAT_ENABLED_DEFAULT = false;
public static final String KSQL_MULTICOL_KEY_FORMAT_ENABLED_DOC =
"Feature flag for multi-column keys";

public static final String KSQL_DEFAULT_KEY_FORMAT_CONFIG = "ksql.persistence.default.format.key";
private static final String KSQL_DEFAULT_KEY_FORMAT_DEFAULT = "KAFKA";
Expand Down Expand Up @@ -613,6 +623,18 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_KEY_FORMAT_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_KEY_FORMAT_ENABLED_DOC
).define(
KSQL_COMPLEX_KEY_FORMAT_ENABLED,
Type.BOOLEAN,
KSQL_COMPLEX_KEY_FORMAT_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_COMPLEX_KEY_FORMAT_ENABLED_DOC
).define(
KSQL_MULTICOL_KEY_FORMAT_ENABLED,
Type.BOOLEAN,
KSQL_MULTICOL_KEY_FORMAT_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_MULTICOL_KEY_FORMAT_ENABLED_DOC
).define(
KSQL_DEFAULT_KEY_FORMAT_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public final class PlannedTestUtils {
Expand All @@ -48,9 +49,10 @@ public static boolean isPlannedTestCase(final TestCase testCase) {

public static boolean isNotExcluded(final TestCase testCase) {
// Place temporary logic here to exclude test cases based on feature flags, etc.
return !(boolean) testCase
.properties()
.getOrDefault(KsqlConfig.KSQL_KEY_FORMAT_ENABLED, false);
final Map<String, Object> props = testCase.properties();
return !(boolean) props.getOrDefault(KsqlConfig.KSQL_KEY_FORMAT_ENABLED, false)
&& !(boolean) props.getOrDefault(KsqlConfig.KSQL_COMPLEX_KEY_FORMAT_ENABLED, false)
&& !(boolean) props.getOrDefault(KsqlConfig.KSQL_MULTICOL_KEY_FORMAT_ENABLED, false);
}

public static boolean isSamePlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
{
"name": "ARRAY",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (ID ARRAY<VARCHAR> KEY, V0 INT, V1 INT) WITH (kafka_topic='input', format='JSON');",
Expand All @@ -174,7 +174,7 @@
{
"name": "STRUCT",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (ID STRUCT<F1 VARCHAR> KEY, V0 INT, V1 INT) WITH (kafka_topic='input', format='JSON');",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@
{
"name": "ARRAY - key - no inference",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K ARRAY<VARCHAR> KEY, foo INT) WITH (kafka_topic='input_topic', format='AVRO');",
Expand All @@ -880,7 +880,7 @@
{
"name": "ARRAY - key - inference",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='AVRO');",
Expand Down Expand Up @@ -908,7 +908,7 @@
{
"name": "MAP - key - C*",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K MAP<INT, DOUBLE> KEY, foo INT) WITH (kafka_topic='input_topic', format='AVRO');"
Expand All @@ -921,7 +921,7 @@
{
"name": "nested MAP type - key - C*",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K ARRAY<MAP<INT, DOUBLE>> KEY, foo INT) WITH (kafka_topic='input_topic', format='AVRO');"
Expand All @@ -934,7 +934,7 @@
{
"name": "MAP - key - C*AS",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (k STRING KEY, v STRING) WITH (kafka_topic='input_topic', format='AVRO');",
Expand All @@ -948,7 +948,7 @@
{
"name": "nested MAP type - key - C*AS",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (k STRING KEY, v STRING) WITH (kafka_topic='input_topic', format='AVRO');",
Expand All @@ -962,7 +962,7 @@
{
"name": "STRUCT - key - no inference",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K STRUCT<F1 VARCHAR> KEY, foo INT) WITH (kafka_topic='input_topic', format='AVRO');",
Expand All @@ -982,7 +982,7 @@
{
"name": "STRUCT - key - inference",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"topics": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
{
"name": "ARRAY - C* - key",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K ARRAY<INT> KEY, foo INT) WITH (kafka_topic='input_topic', format='DELIMITED');"
Expand All @@ -153,7 +153,7 @@
{
"name": "ARRAY - C*AS - key",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (v INT) WITH (kafka_topic='input_topic', format='DELIMITED');",
Expand All @@ -167,7 +167,7 @@
{
"name": "MAP - C* - key",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K MAP<INT, DOUBLE> KEY, foo INT) WITH (kafka_topic='input_topic', format='DELIMITED');"
Expand All @@ -180,7 +180,7 @@
{
"name": "MAP - C*AS - key",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (k STRING KEY, v STRING) WITH (kafka_topic='input_topic', format='DELIMITED');",
Expand All @@ -194,7 +194,7 @@
{
"name": "STRUCT - C* - key",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K STRUCT<F1 DOUBLE> KEY, foo INT) WITH (kafka_topic='input_topic', format='DELIMITED');"
Expand All @@ -207,7 +207,7 @@
{
"name": "STRUCT - C*AS - key",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (v DOUBLE) WITH (kafka_topic='input_topic', format='DELIMITED');",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2258,7 +2258,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "windowed aggregate with struct key",
"format": ["JSON", "AVRO"],
Expand All @@ -2279,7 +2279,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "windowed aggregate with field within struct key",
"statements": [
Expand Down Expand Up @@ -2358,7 +2358,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "AVRO struct key group by primitive",
"statements": [
Expand Down Expand Up @@ -2403,7 +2403,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "AVRO group by struct",
"statements": [
Expand Down Expand Up @@ -2474,7 +2474,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "JSON group by array",
"statements": [
Expand All @@ -2494,7 +2494,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "JSON group by struct",
"statements": [
Expand All @@ -2514,7 +2514,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "JSON group by struct convert key format",
"statements": [
Expand Down Expand Up @@ -2554,7 +2554,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "JSON group by struct convert to incompatible key format",
"statements": [
Expand All @@ -2568,7 +2568,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "Struct key used in aggregate expression",
"statements": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@
{
"name": "ARRAY - key",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K ARRAY<DOUBLE> KEY, V INT) WITH (kafka_topic='input_topic', format='JSON');",
Expand Down Expand Up @@ -327,7 +327,7 @@
{
"name": "MAP - key - C*",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K MAP<INT, DOUBLE> KEY, foo INT) WITH (kafka_topic='input_topic', format='JSON');"
Expand All @@ -340,7 +340,7 @@
{
"name": "nested MAP type - key - C*",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K ARRAY<MAP<INT, DOUBLE>> KEY, foo INT) WITH (kafka_topic='input_topic', format='JSON');"
Expand All @@ -353,7 +353,7 @@
{
"name": "MAP - key - C*AS",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (k STRING KEY, v STRING) WITH (kafka_topic='input_topic', format='JSON');",
Expand All @@ -367,7 +367,7 @@
{
"name": "nested MAP type - key - C*AS",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (k STRING KEY, v STRING) WITH (kafka_topic='input_topic', format='JSON');",
Expand Down Expand Up @@ -411,7 +411,7 @@
{
"name": "STRUCT - key",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K STRUCT<F DOUBLE> KEY, V INT) WITH (kafka_topic='input_topic', format='JSON');",
Expand Down
Loading

0 comments on commit 47a9c95

Please sign in to comment.