Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add SOURCE streams/tables to query anonymizer #8022

Merged
merged 1 commit into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,11 @@ public String visitCreateStreamAs(final CreateStreamAsContext context) {
public String visitCreateStream(final CreateStreamContext context) {
final StringBuilder stringBuilder = new StringBuilder("CREATE ");

// optional source
if (context.SOURCE() != null) {
stringBuilder.append("SOURCE ");
}

// optional replace
if (context.OR() != null && context.REPLACE() != null) {
stringBuilder.append("OR REPLACE ");
Expand Down Expand Up @@ -637,6 +642,11 @@ public String visitCreateTableAs(final CreateTableAsContext context) {
public String visitCreateTable(final CreateTableContext context) {
final StringBuilder stringBuilder = new StringBuilder("CREATE ");

// optional source
if (context.SOURCE() != null) {
stringBuilder.append("SOURCE ");
}

// optional replace
if (context.OR() != null && context.REPLACE() != null) {
stringBuilder.append("OR REPLACE ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ public void shouldAnonymizeCreateStreamQueryCorrectly() {
Approvals.verify(output);
}

@Test
public void shouldAnonymizeCreateSourceStreamQueryCorrectly() {
final String output = anon.anonymize(
"CREATE SOURCE STREAM my_stream (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)\n"
+ "WITH (kafka_topic='locations', value_format='json');");

Approvals.verify(output);
}

@Test
public void shouldAnonymizeCreateStreamAsQueryCorrectly() {
final String output = anon.anonymize(
Expand All @@ -181,6 +190,15 @@ public void shouldAnonymizeCreateTableCorrectly() {
Approvals.verify(output);
}

@Test
public void shouldAnonymizeCreateSourceTableCorrectly() {
final String output = anon.anonymize(
"CREATE SOURCE TABLE my_table (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)\n"
+ "WITH (kafka_topic='locations', value_format='json');");

Approvals.verify(output);
}

@Test
public void shouldAnonymizeCreateTableAsQueryCorrectly() {
final String output = anon.anonymize(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE SOURCE STREAM stream1 (column1 VARCHAR, column2 DOUBLE, column3 DOUBLE) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE SOURCE TABLE table1 (column1 VARCHAR, column2 DOUBLE, column3 DOUBLE) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]');