Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: query id for TERMINATE should be case insensitive #5005

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs-md/tutorials/basics-docker.md
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,7 @@ Your output should resemble:
```
Message
------------------------------------------------------------
Insert Into query is running with query ID: InsertQuery_43
Insert Into query is running with query ID: INSERTQUERY_43
------------------------------------------------------------
```

Expand Down Expand Up @@ -1305,7 +1305,7 @@ Your output should resemble:
```
Query ID | Status | Sink Name | Sink Kafka Topic | Query String
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
InsertQuery_43 | RUNNING | ALL_ORDERS | ALL_ORDERS | INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY EMIT CHANGES;
INSERTQUERY_43 | RUNNING | ALL_ORDERS | ALL_ORDERS | INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY EMIT CHANGES;
CSAS_ALL_ORDERS_17 | RUNNING | ALL_ORDERS | ALL_ORDERS | CREATE STREAM ALL_ORDERS WITH (KAFKA_TOPIC='ALL_ORDERS', PARTITIONS=1, REPLICAS=1) AS SELECT 'LOCAL' SRC, *FROM ORDERS_SRC_LOCAL ORDERS_SRC_LOCALEMIT CHANGES;
...
```
Expand Down
2 changes: 1 addition & 1 deletion docs/includes/ksql-includes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ Your output should resemble:
Query ID | Kafka Topic | Query String
-------------------------------------------------------------------------------------------------------------------
CSAS_ALL_ORDERS_0 | ALL_ORDERS | CREATE STREAM ALL_ORDERS AS SELECT 'LOCAL' AS SRC, * FROM ORDERS_SRC_LOCAL;
InsertQuery_1 | ALL_ORDERS | INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY;
INSERTQUERY_1 | ALL_ORDERS | INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY;
-------------------------------------------------------------------------------------------------------------------

.. insert-into_02_end
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,25 @@
import com.google.errorprone.annotations.Immutable;
import java.util.Objects;

/**
* A query id.
*
* <p>For backwards compatibility reasons query ids must preserve their case, as their text
* representation is used, amoung other things, for internal topic naming.
*
* <p>However, two ids with the same text, with different case, should compare equal. This is needed
* so that look ups against query ids are not case-sensitive.
*/
@Immutable
public class QueryId {

private final String id;
private final String cachedUpperCase;

@JsonCreator
public QueryId(final String id) {
this.id = requireNonNull(id, "id");
this.cachedUpperCase = id.toUpperCase();
}

@JsonValue
Expand All @@ -46,11 +57,11 @@ public boolean equals(final Object o) {
return false;
}
final QueryId queryId = (QueryId) o;
return Objects.equals(id, queryId.id);
return Objects.equals(cachedUpperCase, queryId.cachedUpperCase);
}

@Override
public int hashCode() {
return Objects.hash(id);
return Objects.hash(cachedUpperCase);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,61 @@
import static org.hamcrest.Matchers.is;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.testing.EqualsTester;
import io.confluent.ksql.json.JsonMapper;
import org.junit.Test;

public class QueryIdTest {

private final ObjectMapper objectMapper = JsonMapper.INSTANCE.mapper;

@SuppressWarnings("UnstableApiUsage")
@Test
public void shouldSerializeCorrectly() throws Exception {
public void shouldImplementEqualsProperly() {
new EqualsTester()
.addEqualityGroup(new QueryId("matching"), new QueryId("MaTcHiNg"))
.addEqualityGroup(new QueryId("different"))
.testEquals();
}

@Test
public void shouldBeCaseInsensitiveOnCommparison() {
// When:
final QueryId id = new QueryId("Mixed-Case-Id");

// Then:
assertThat(id, is(new QueryId("MIXED-CASE-ID")));
}

@Test
public void shouldPreserveCase() {
// When:
final QueryId id = new QueryId("Mixed-Case-Id");

// Then:
assertThat(id.toString(), is("Mixed-Case-Id"));
}

@Test
public void shouldSerializeMaintainingCase() throws Exception {
// Given:
final QueryId id = new QueryId("query-id");
final QueryId id = new QueryId("Query-Id");

// When:
final String serialized = objectMapper.writeValueAsString(id);

assertThat(serialized, is("\"query-id\""));
assertThat(serialized, is("\"Query-Id\""));
}

@Test
public void shouldDeserializeCorrectly() throws Exception {
public void shouldDeserializeMaintainingCase() throws Exception {
// Given:
final String serialized = "\"an-id\"";
final String serialized = "\"An-Id\"";

// When:
final QueryId deserialized = objectMapper.readValue(serialized, QueryId.class);

// Then:
assertThat(deserialized, is(new QueryId("an-id")));
assertThat(deserialized.getId(), is("An-Id"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ public SourceName getIntoSourceName() {

@Override
public QueryId getQueryId(final QueryIdGenerator queryIdGenerator) {
final String base = queryIdGenerator.getNext();
final String base = queryIdGenerator.getNext().toUpperCase();
if (!doCreateInto) {
return new QueryId("InsertQuery_" + base);
return new QueryId("INSERTQUERY_" + base);
}
if (getNodeOutputType().equals(DataSourceType.KTABLE)) {
return new QueryId("CTAS_" + getId().toString() + "_" + base);
return new QueryId("CTAS_" + getId().toString().toUpperCase() + "_" + base);
}
return new QueryId("CSAS_" + getId().toString() + "_" + base);
return new QueryId("CSAS_" + getId().toString().toUpperCase() + "_" + base);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,11 @@ public void shouldCreateExecutionPlanForInsert() {
final String[] lines = planText.split("\n");
Assert.assertTrue(lines.length == 3);
Assert.assertEquals(lines[0],
" > [ SINK ] | Schema: ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE | Logger: InsertQuery_1.S1");
" > [ SINK ] | Schema: ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE | Logger: INSERTQUERY_1.S1");
Assert.assertEquals(lines[1],
"\t\t > [ PROJECT ] | Schema: ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE | Logger: InsertQuery_1.Project");
"\t\t > [ PROJECT ] | Schema: ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE | Logger: INSERTQUERY_1.Project");
Assert.assertEquals(lines[2],
"\t\t\t\t > [ SOURCE ] | Schema: ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE, ROWTIME BIGINT, ROWKEY STRING | Logger: InsertQuery_1.KsqlTopic.Source");
"\t\t\t\t > [ SOURCE ] | Schema: ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE, ROWTIME BIGINT, ROWKEY STRING | Logger: INSERTQUERY_1.KsqlTopic.Source");
assertThat(queryMetadataList.get(1), instanceOf(PersistentQueryMetadata.class));
final PersistentQueryMetadata persistentQuery = (PersistentQueryMetadata)
queryMetadataList.get(1);
Expand Down Expand Up @@ -299,12 +299,12 @@ public void shouldRekeyIfPartitionByDoesNotMatchResultKey() {
final String[] lines = planText.split("\n");
assertThat(lines.length, equalTo(4));
assertThat(lines[0], equalTo(" > [ SINK ] | Schema: ROWKEY BIGINT KEY, COL0 BIGINT, COL1 STRING, COL2 "
+ "DOUBLE | Logger: InsertQuery_1.S1"));
+ "DOUBLE | Logger: INSERTQUERY_1.S1"));
assertThat(lines[2],
containsString("[ REKEY ] | Schema: ROWKEY BIGINT KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE, ROWTIME BIGINT, ROWKEY STRING "
+ "| Logger: InsertQuery_1.PartitionBy"));
+ "| Logger: INSERTQUERY_1.PartitionBy"));
assertThat(lines[1], containsString("[ PROJECT ] | Schema: ROWKEY BIGINT KEY, COL0 BIGINT, COL1 STRING"
+ ", COL2 DOUBLE | Logger: InsertQuery_1.Project"));
+ ", COL2 DOUBLE | Logger: INSERTQUERY_1.Project"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void shouldComputeQueryIdCorrectlyForInsertInto() {

// Then:
verify(queryIdGenerator, times(1)).getNext();
assertThat(queryId, equalTo(new QueryId("InsertQuery_" + QUERY_ID_VALUE)));
assertThat(queryId, equalTo(new QueryId("INSERTQUERY_" + QUERY_ID_VALUE)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,7 @@ public Node visitTerminateQuery(final SqlBaseParser.TerminateQueryContext contex
? TerminateQuery.all(location)
: TerminateQuery.query(
location,
// use case sensitive parsing here to maintain backwards compatibility
new QueryId(ParserUtil.getIdentifierText(true, context.identifier()))
new QueryId(ParserUtil.getIdentifierText(false, context.identifier()))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ public void shouldRecoverRecreates() {
server1.submitCommands(
"CREATE STREAM A (C1 STRING, C2 INT) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT C1 FROM A;",
"TERMINATE CSAS_B_0;",
"TERMINATE CsAs_b_0;",
"DROP STREAM B;",
"CREATE STREAM B AS SELECT C2 FROM A;"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,7 @@ public void shouldThrowOnTerminateUnknownQuery() {
expectedException.expect(KsqlRestException.class);
expectedException.expect(exceptionStatusCode(is(Code.BAD_REQUEST)));
expectedException.expect(exceptionErrorMessage(errorMessage(is(
"Unknown queryId: unknown_query_id"))));
"Unknown queryId: UNKNOWN_QUERY_ID"))));
expectedException.expect(exceptionStatementErrorMessage(statement(is(
"TERMINATE unknown_query_id;"))));

Expand Down