Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: split key format feature flag #6695

Merged
merged 3 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,17 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_KEY_FORMAT_ENABLED = "ksql.key.format.enabled";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i've merged the other PR, so if you want to we can rename this (or just leave it as is, i'm OK with that too)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave it for now since the old name is already embedded in some historic plans (with value false), no need to introduce yet another config to clutter the history. Plus this flag is going away soon ;D

public static final Boolean KSQL_KEY_FORMAT_ENABLED_DEFAULT = false;
public static final String KSQL_KEY_FORMAT_ENABLED_DOC =
"Feature flag for non-Kafka key formats";
"Feature flag for key formats under development";

public static final String KSQL_COMPLEX_KEY_FORMAT_ENABLED = "ksql.complex.key.format.enabled";
public static final Boolean KSQL_COMPLEX_KEY_FORMAT_ENABLED_DEFAULT = false;
public static final String KSQL_COMPLEX_KEY_FORMAT_ENABLED_DOC =
"Feature flag for complex (non-primitive) keys";

public static final String KSQL_MULTICOL_KEY_FORMAT_ENABLED = "ksql.multicol.key.format.enabled";
public static final Boolean KSQL_MULTICOL_KEY_FORMAT_ENABLED_DEFAULT = false;
public static final String KSQL_MULTICOL_KEY_FORMAT_ENABLED_DOC =
"Feature flag for multi-column keys";

public static final String KSQL_DEFAULT_KEY_FORMAT_CONFIG = "ksql.persistence.default.format.key";
private static final String KSQL_DEFAULT_KEY_FORMAT_DEFAULT = "KAFKA";
Expand Down Expand Up @@ -613,6 +623,18 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_KEY_FORMAT_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_KEY_FORMAT_ENABLED_DOC
).define(
KSQL_COMPLEX_KEY_FORMAT_ENABLED,
Type.BOOLEAN,
KSQL_COMPLEX_KEY_FORMAT_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_COMPLEX_KEY_FORMAT_ENABLED_DOC
).define(
KSQL_MULTICOL_KEY_FORMAT_ENABLED,
Type.BOOLEAN,
KSQL_MULTICOL_KEY_FORMAT_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_MULTICOL_KEY_FORMAT_ENABLED_DOC
).define(
KSQL_DEFAULT_KEY_FORMAT_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public final class PlannedTestUtils {
Expand All @@ -48,9 +49,10 @@ public static boolean isPlannedTestCase(final TestCase testCase) {

public static boolean isNotExcluded(final TestCase testCase) {
// Place temporary logic here to exclude test cases based on feature flags, etc.
return !(boolean) testCase
.properties()
.getOrDefault(KsqlConfig.KSQL_KEY_FORMAT_ENABLED, false);
final Map<String, Object> props = testCase.properties();
return !(boolean) props.getOrDefault(KsqlConfig.KSQL_KEY_FORMAT_ENABLED, false)
&& !(boolean) props.getOrDefault(KsqlConfig.KSQL_COMPLEX_KEY_FORMAT_ENABLED, false)
&& !(boolean) props.getOrDefault(KsqlConfig.KSQL_MULTICOL_KEY_FORMAT_ENABLED, false);
}

public static boolean isSamePlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
{
"name": "ARRAY",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (ID ARRAY<VARCHAR> KEY, V0 INT, V1 INT) WITH (kafka_topic='input', format='JSON');",
Expand All @@ -174,7 +174,7 @@
{
"name": "STRUCT",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (ID STRUCT<F1 VARCHAR> KEY, V0 INT, V1 INT) WITH (kafka_topic='input', format='JSON');",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@
{
"name": "ARRAY - key - no inference",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K ARRAY<VARCHAR> KEY, foo INT) WITH (kafka_topic='input_topic', format='AVRO');",
Expand All @@ -880,7 +880,7 @@
{
"name": "ARRAY - key - inference",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='AVRO');",
Expand Down Expand Up @@ -908,7 +908,7 @@
{
"name": "MAP - key - C*",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K MAP<INT, DOUBLE> KEY, foo INT) WITH (kafka_topic='input_topic', format='AVRO');"
Expand All @@ -921,7 +921,7 @@
{
"name": "nested MAP type - key - C*",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K ARRAY<MAP<INT, DOUBLE>> KEY, foo INT) WITH (kafka_topic='input_topic', format='AVRO');"
Expand All @@ -934,7 +934,7 @@
{
"name": "MAP - key - C*AS",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (k STRING KEY, v STRING) WITH (kafka_topic='input_topic', format='AVRO');",
Expand All @@ -948,7 +948,7 @@
{
"name": "nested MAP type - key - C*AS",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (k STRING KEY, v STRING) WITH (kafka_topic='input_topic', format='AVRO');",
Expand All @@ -962,7 +962,7 @@
{
"name": "STRUCT - key - no inference",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K STRUCT<F1 VARCHAR> KEY, foo INT) WITH (kafka_topic='input_topic', format='AVRO');",
Expand All @@ -982,7 +982,7 @@
{
"name": "STRUCT - key - inference",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"topics": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
{
"name": "ARRAY - C* - key",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K ARRAY<INT> KEY, foo INT) WITH (kafka_topic='input_topic', format='DELIMITED');"
Expand All @@ -153,7 +153,7 @@
{
"name": "ARRAY - C*AS - key",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (v INT) WITH (kafka_topic='input_topic', format='DELIMITED');",
Expand All @@ -167,7 +167,7 @@
{
"name": "MAP - C* - key",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K MAP<INT, DOUBLE> KEY, foo INT) WITH (kafka_topic='input_topic', format='DELIMITED');"
Expand All @@ -180,7 +180,7 @@
{
"name": "MAP - C*AS - key",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (k STRING KEY, v STRING) WITH (kafka_topic='input_topic', format='DELIMITED');",
Expand All @@ -194,7 +194,7 @@
{
"name": "STRUCT - C* - key",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K STRUCT<F1 DOUBLE> KEY, foo INT) WITH (kafka_topic='input_topic', format='DELIMITED');"
Expand All @@ -207,7 +207,7 @@
{
"name": "STRUCT - C*AS - key",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (v DOUBLE) WITH (kafka_topic='input_topic', format='DELIMITED');",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2258,7 +2258,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "windowed aggregate with struct key",
"format": ["JSON", "AVRO"],
Expand All @@ -2279,7 +2279,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "windowed aggregate with field within struct key",
"statements": [
Expand Down Expand Up @@ -2358,7 +2358,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "AVRO struct key group by primitive",
"statements": [
Expand Down Expand Up @@ -2403,7 +2403,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "AVRO group by struct",
"statements": [
Expand Down Expand Up @@ -2474,7 +2474,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "JSON group by array",
"statements": [
Expand All @@ -2494,7 +2494,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "JSON group by struct",
"statements": [
Expand All @@ -2514,7 +2514,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "JSON group by struct convert key format",
"statements": [
Expand Down Expand Up @@ -2554,7 +2554,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "JSON group by struct convert to incompatible key format",
"statements": [
Expand All @@ -2568,7 +2568,7 @@
},
{
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"name": "Struct key used in aggregate expression",
"statements": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@
{
"name": "ARRAY - key",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K ARRAY<DOUBLE> KEY, V INT) WITH (kafka_topic='input_topic', format='JSON');",
Expand Down Expand Up @@ -327,7 +327,7 @@
{
"name": "MAP - key - C*",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K MAP<INT, DOUBLE> KEY, foo INT) WITH (kafka_topic='input_topic', format='JSON');"
Expand All @@ -340,7 +340,7 @@
{
"name": "nested MAP type - key - C*",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K ARRAY<MAP<INT, DOUBLE>> KEY, foo INT) WITH (kafka_topic='input_topic', format='JSON');"
Expand All @@ -353,7 +353,7 @@
{
"name": "MAP - key - C*AS",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (k STRING KEY, v STRING) WITH (kafka_topic='input_topic', format='JSON');",
Expand All @@ -367,7 +367,7 @@
{
"name": "nested MAP type - key - C*AS",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (k STRING KEY, v STRING) WITH (kafka_topic='input_topic', format='JSON');",
Expand Down Expand Up @@ -411,7 +411,7 @@
{
"name": "STRUCT - key",
"properties": {
"ksql.key.format.enabled": true
"ksql.complex.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (K STRUCT<F DOUBLE> KEY, V INT) WITH (kafka_topic='input_topic', format='JSON');",
Expand Down
Loading