diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/QueryAnonymizer.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/QueryAnonymizer.java index 5755896c87dc..da4fe1d15cb0 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/QueryAnonymizer.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/QueryAnonymizer.java @@ -571,6 +571,11 @@ public String visitCreateStreamAs(final CreateStreamAsContext context) { public String visitCreateStream(final CreateStreamContext context) { final StringBuilder stringBuilder = new StringBuilder("CREATE "); + // optional source + if (context.SOURCE() != null) { + stringBuilder.append("SOURCE "); + } + // optional replace if (context.OR() != null && context.REPLACE() != null) { stringBuilder.append("OR REPLACE "); @@ -637,6 +642,11 @@ public String visitCreateTableAs(final CreateTableAsContext context) { public String visitCreateTable(final CreateTableContext context) { final StringBuilder stringBuilder = new StringBuilder("CREATE "); + // optional source + if (context.SOURCE() != null) { + stringBuilder.append("SOURCE "); + } + // optional replace if (context.OR() != null && context.REPLACE() != null) { stringBuilder.append("OR REPLACE "); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.java index 3df49a77521e..d617f295c594 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.java @@ -160,6 +160,15 @@ public void shouldAnonymizeCreateStreamQueryCorrectly() { Approvals.verify(output); } + @Test + public void shouldAnonymizeCreateSourceStreamQueryCorrectly() { + final String output = anon.anonymize( + "CREATE SOURCE STREAM my_stream (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)\n" + + "WITH (kafka_topic='locations', value_format='json');"); + + Approvals.verify(output); + } + @Test public void shouldAnonymizeCreateStreamAsQueryCorrectly() { final String output = anon.anonymize( @@ -181,6 +190,15 @@ public void shouldAnonymizeCreateTableCorrectly() { Approvals.verify(output); } + @Test + public void shouldAnonymizeCreateSourceTableCorrectly() { + final String output = anon.anonymize( + "CREATE SOURCE TABLE my_table (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)\n" + + "WITH (kafka_topic='locations', value_format='json');"); + + Approvals.verify(output); + } + @Test public void shouldAnonymizeCreateTableAsQueryCorrectly() { final String output = anon.anonymize( diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeCreateSourceStreamQueryCorrectly.approved.txt b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeCreateSourceStreamQueryCorrectly.approved.txt new file mode 100644 index 000000000000..22c1c3892e71 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeCreateSourceStreamQueryCorrectly.approved.txt @@ -0,0 +1 @@ +CREATE SOURCE STREAM stream1 (column1 VARCHAR, column2 DOUBLE, column3 DOUBLE) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeCreateSourceTableCorrectly.approved.txt b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeCreateSourceTableCorrectly.approved.txt new file mode 100644 index 000000000000..b1081530a85a --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.shouldAnonymizeCreateSourceTableCorrectly.approved.txt @@ -0,0 +1 @@ +CREATE SOURCE TABLE table1 (column1 VARCHAR, column2 DOUBLE, column3 DOUBLE) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]'); \ No newline at end of file