Skip to content

Commit

Permalink
chore: modify CST query to include the table name
Browse files Browse the repository at this point in the history
  • Loading branch information
ConfluentJenkins committed Aug 27, 2021
1 parent 09487c6 commit 2373945
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -559,13 +559,12 @@ private ExecutorPlans planQuery(
Optional.of(outputNode)
);
final QueryId queryId = QueryIdUtil.buildId(
statement.getStatement(),
engineContext,
engineContext.idGenerator(),
outputNode,
ksqlConfig.getBoolean(KsqlConfig.KSQL_CREATE_OR_REPLACE_ENABLED),
withQueryId,
statement.getStatement() instanceof CreateTable
&& ((CreateTable) statement.getStatement()).isSource()
withQueryId
);

if (withQueryId.isPresent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.google.common.collect.Iterables;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.query.QueryId;
Expand Down Expand Up @@ -82,35 +84,36 @@ private static void validateWithQueryId(final String queryId) {
/**
* Builds a {@link QueryId} for a physical plan specification.
*
* @param statement the statement that requires the query ID
* @param engineContext the context representing the current state of the engine
* @param idGenerator generates query ids
* @param outputNode the logical plan
* @param createOrReplaceEnabled whether or not the queryID can replace an existing one
* @return the {@link QueryId} to be used
*/
static QueryId buildId(
final Statement statement,
final EngineContext engineContext,
final QueryIdGenerator idGenerator,
final OutputNode outputNode,
final boolean createOrReplaceEnabled,
final Optional<String> withQueryId,
final boolean isSourceTable) {
final Optional<String> withQueryId) {
if (withQueryId.isPresent()) {
final String queryId = withQueryId.get().toUpperCase();
validateWithQueryId(queryId);
return new QueryId(queryId);
}
if (statement instanceof CreateTable && ((CreateTable) statement).isSource()) {
// Use the CST name as part of the QueryID
final String suffix = ((CreateTable) statement).getName().text().toUpperCase()
+ "_" + idGenerator.getNext().toUpperCase();
return new QueryId(ReservedQueryIdsPrefixes.CST + suffix);
}

if (!outputNode.getSinkName().isPresent()) {
if (isSourceTable) {
final String suffix = outputNode.getId().toString().toUpperCase()
+ "_" + idGenerator.getNext().toUpperCase();
return new QueryId(ReservedQueryIdsPrefixes.CST + suffix);
} else {
final String prefix =
"transient_" + outputNode.getSource().getLeftmostSourceNode().getAlias().text() + "_";
return new QueryId(prefix + Math.abs(ThreadLocalRandom.current().nextLong()));
}
final String prefix =
"transient_" + outputNode.getSource().getLeftmostSourceNode().getAlias().text() + "_";
return new QueryId(prefix + Math.abs(ThreadLocalRandom.current().nextLong()));
}

final KsqlStructuredDataOutputNode structured = (KsqlStructuredDataOutputNode) outputNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ public QueryPlan(
Comparator.comparing(Name::text),
Objects.requireNonNull(sources, "sources")
);
// Sink can be null when a query plan forms part of a create source table command.
// CST statements need a plan without a sink topic.
this.sink = sink;
this.physicalPlan = Objects.requireNonNull(physicalPlan, "physicalPlan");
this.queryId = Objects.requireNonNull(queryId, "queryId");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.planner.plan.DataSourceNode;
import io.confluent.ksql.planner.plan.KsqlBareOutputNode;
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
Expand Down Expand Up @@ -62,6 +65,8 @@ public class QueryIdUtilTest {
private DataSourceNode dataSourceNode;
@Mock
private SourceName sourceName;
@Mock
private Statement statement;
@Before
public void setup() {
when(engineContext.getQueryRegistry()).thenReturn(queryRegistry);
Expand All @@ -78,8 +83,8 @@ public void shouldGenerateUniqueRandomIdsForTransientQueries() {

// When:
long numUniqueIds = IntStream.range(0, 100)
.mapToObj(i -> QueryIdUtil.buildId(engineContext, idGenerator, transientPlan,
false, Optional.empty(), false))
.mapToObj(i -> QueryIdUtil.buildId(statement, engineContext, idGenerator, transientPlan,
false, Optional.empty()))
.distinct()
.count();

Expand All @@ -94,8 +99,8 @@ public void shouldComputeQueryIdCorrectlyForInsertInto() {
when(idGenerator.getNext()).thenReturn("1");

// When:
final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, plan,
false, Optional.empty(), false);
final QueryId queryId = QueryIdUtil.buildId(statement, engineContext, idGenerator, plan,
false, Optional.empty());

// Then:
assertThat(queryId, is(new QueryId("INSERTQUERY_1")));
Expand All @@ -112,8 +117,8 @@ public void shouldComputeQueryIdCorrectlyForNewStream() {
when(queryRegistry.getQueriesWithSink(SINK)).thenReturn(ImmutableSet.of());

// When:
final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, plan,
false, Optional.empty(), false);
final QueryId queryId = QueryIdUtil.buildId(statement, engineContext, idGenerator, plan,
false, Optional.empty());
// Then:
assertThat(queryId, is(new QueryId("CSAS_FOO_1")));
}
Expand All @@ -129,8 +134,8 @@ public void shouldComputeQueryIdCorrectlyForNewTable() {
when(queryRegistry.getQueriesWithSink(SINK)).thenReturn(ImmutableSet.of());

// When:
final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, plan,
false, Optional.empty(), false);
final QueryId queryId = QueryIdUtil.buildId(statement, engineContext, idGenerator, plan,
false, Optional.empty());

// Then:
assertThat(queryId, is(new QueryId("CTAS_FOO_1")));
Expand All @@ -139,13 +144,14 @@ public void shouldComputeQueryIdCorrectlyForNewTable() {
@Test
public void shouldComputeQueryIdCorrectlyForNewSourceTable() {
// Given:
when(plan.getSinkName()).thenReturn(Optional.empty());
when(plan.getId()).thenReturn(new PlanNodeId("FOO"));
final CreateTable createTableStmt = mock(CreateTable.class);
when(createTableStmt.getName()).thenReturn(SourceName.of("FOO"));
when(createTableStmt.isSource()).thenReturn(true);
when(idGenerator.getNext()).thenReturn("1");

// When:
final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, plan,
false, Optional.empty(), true);
final QueryId queryId = QueryIdUtil.buildId(createTableStmt, engineContext, idGenerator, plan,
false, Optional.empty());

// Then:
assertThat(queryId, is(new QueryId("CST_FOO_1")));
Expand All @@ -160,8 +166,8 @@ public void shouldReuseExistingQueryId() {
.thenReturn(ImmutableSet.of(new QueryId("CTAS_FOO_10")));

// When:
final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, plan,
true, Optional.empty(), false);
final QueryId queryId = QueryIdUtil.buildId(statement, engineContext, idGenerator, plan,
true, Optional.empty());

// Then:
assertThat(queryId, is(new QueryId("CTAS_FOO_10")));
Expand All @@ -177,8 +183,8 @@ public void shouldThrowOnReuseIfCreateOrReplacedIsDisabled() {
.thenReturn(ImmutableSet.of(new QueryId("CTAS_FOO_10")));

// When:
QueryIdUtil.buildId(engineContext, idGenerator, plan,
false, Optional.empty(), false);
QueryIdUtil.buildId(statement, engineContext, idGenerator, plan,
false, Optional.empty());
}

@Test
Expand All @@ -191,8 +197,8 @@ public void shouldThrowIfMultipleQueriesExist() {

// When:
final KsqlException e = assertThrows(KsqlException.class, () ->
QueryIdUtil.buildId(engineContext, idGenerator, plan, false,
Optional.empty(), false));
QueryIdUtil.buildId(statement, engineContext, idGenerator, plan, false,
Optional.empty()));

// Then:
assertThat(e.getMessage(), containsString("there are multiple queries writing"));
Expand All @@ -201,8 +207,8 @@ public void shouldThrowIfMultipleQueriesExist() {
@Test
public void shouldReturnWithQueryIdInUppercase(){
// When:
final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, plan,
false, Optional.of("my_query_id"), false);
final QueryId queryId = QueryIdUtil.buildId(statement, engineContext, idGenerator, plan,
false, Optional.of("my_query_id"));

// Then:
assertThat(queryId, is(new QueryId("MY_QUERY_ID")));
Expand All @@ -213,8 +219,8 @@ public void shouldThrowIfWithQueryIdIsReserved() {
// When:
final Exception e = assertThrows(
Exception.class,
() -> QueryIdUtil.buildId(engineContext, idGenerator, plan,
false, Optional.of("insertquery_custom"), false)
() -> QueryIdUtil.buildId(statement, engineContext, idGenerator, plan,
false, Optional.of("insertquery_custom"))
);

// Then:
Expand All @@ -228,8 +234,8 @@ public void shouldThrowIfWithQueryIdIsNotValid() {
// When:
final Exception e = assertThrows(
Exception.class,
() -> QueryIdUtil.buildId(engineContext, idGenerator, plan,
false, Optional.of("with space"), false)
() -> QueryIdUtil.buildId(statement, engineContext, idGenerator, plan,
false, Optional.of("with space"))
);

// Then:
Expand All @@ -247,8 +253,8 @@ public void shouldCreateTransientQueryIdWithSourceName() {
when(sourceName.text()).thenReturn(SOURCE);

// When:
final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, transientPlan,
false, Optional.empty(), false);
final QueryId queryId = QueryIdUtil.buildId(statement, engineContext, idGenerator, transientPlan,
false, Optional.empty());

// Then:
assertThat(queryId.toString(), containsString("transient_source"));
Expand Down

0 comments on commit 2373945

Please sign in to comment.