Skip to content

Commit

Permalink
chore: disable pull query support from /ksql endpoint (#3862)
Browse files Browse the repository at this point in the history
* chore: disable pull query support from `/ksql` endpoint
* chore: update error messages to include doc link

fixes: #3850
fixes: #3836
fixes: #3847

Support for pull queries has moved to the `/query` endpoint

Need to disable temporarily until RQTT can handle pull queries.

#3864 tracks re-enabling.
  • Loading branch information
big-andy-coates authored Nov 15, 2019
1 parent ab4e409 commit b04794f
Show file tree
Hide file tree
Showing 14 changed files with 347 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,19 @@

public class StaticQueryValidator implements QueryValidator {

private static final String NEW_QUERY_SYNTAX_HELP = " "
+ "Did you mean to execute a push query? Add an 'EMIT CHANGES' clause to do so."
+ System.lineSeparator()
+ System.lineSeparator()
private static final String PUSH_PULL_QUERY_DOC_LINK = "https://cnfl.io/queries";

public static final String NEW_QUERY_SYNTAX_SHORT_HELP = ""
+ "Refer to " + PUSH_PULL_QUERY_DOC_LINK + " for info on query types. "
+ "If you intended to issue a push query, resubmit with the EMIT CHANGES clause";

public static final String NEW_QUERY_SYNTAX_ADDITIONAL_HELP = ""
+ "Query syntax in KSQL has changed. There are now two broad categories of queries:"
+ System.lineSeparator()
+ "- Pull queries: query the current state of the system, return a result, and terminate "
+ "the query."
+ "- Pull queries: query the current state of the system, return a result, and terminate. "
+ System.lineSeparator()
+ "- Push queries: query the state of the system in motion and continue to output "
+ "results until they meet a LIMIT clause condition or the user terminates the query."
+ "results until they meet a LIMIT condition or are terminated by the user."
+ System.lineSeparator()
+ System.lineSeparator()
+ "'EMIT CHANGES' is used to to indicate a query is a push query. "
Expand All @@ -59,44 +61,50 @@ public class StaticQueryValidator implements QueryValidator {
+ "Note: Persistent queries, e.g. `CREATE TABLE AS ...`, have an implicit "
+ "`EMIT CHANGES`, but we recommend adding `EMIT CHANGES` to these statements.";

private static final String NEW_QUERY_SYNTAX_LONG_HELP = ""
+ NEW_QUERY_SYNTAX_SHORT_HELP
+ System.lineSeparator()
+ System.lineSeparator()
+ NEW_QUERY_SYNTAX_ADDITIONAL_HELP;

private static final List<Rule> RULES = ImmutableList.of(
Rule.of(
analysis -> analysis.getResultMaterialization() == ResultMaterialization.FINAL,
"Static queries don't support `EMIT CHANGES`."
"Pull queries don't support `EMIT CHANGES`."
),
Rule.of(
analysis -> !analysis.getInto().isPresent(),
"Static queries don't support output to sinks."
"Pull queries don't support output to sinks."
),
Rule.of(
analysis -> !analysis.isJoin(),
"Static queries don't support JOIN clauses."
"Pull queries don't support JOIN clauses."
),
Rule.of(
analysis -> !analysis.getWindowExpression().isPresent(),
"Static queries don't support WINDOW clauses."
"Pull queries don't support WINDOW clauses."
),
Rule.of(
analysis -> analysis.getGroupByExpressions().isEmpty(),
"Static queries don't support GROUP BY clauses."
"Pull queries don't support GROUP BY clauses."
),
Rule.of(
analysis -> !analysis.getPartitionBy().isPresent(),
"Static queries don't support PARTITION BY clauses."
"Pull queries don't support PARTITION BY clauses."
),
Rule.of(
analysis -> !analysis.getHavingExpression().isPresent(),
"Static queries don't support HAVING clauses."
"Pull queries don't support HAVING clauses."
),
Rule.of(
analysis -> !analysis.getLimitClause().isPresent(),
"Static queries don't support LIMIT clauses."
"Pull queries don't support LIMIT clauses."
),
Rule.of(
analysis -> analysis.getSelectColumnRefs().stream()
.map(ColumnRef::name)
.noneMatch(n -> n.equals(SchemaUtil.ROWTIME_NAME)),
"Static queries don't support ROWTIME in select columns."
"Pull queries don't support ROWTIME in select columns."
)
);

Expand All @@ -105,7 +113,7 @@ public void validate(final Analysis analysis) {
try {
RULES.forEach(rule -> rule.check(analysis));
} catch (final KsqlException e) {
throw new KsqlException(e.getMessage() + NEW_QUERY_SYNTAX_HELP, e);
throw new KsqlException(e.getMessage() + " " + NEW_QUERY_SYNTAX_LONG_HELP, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,20 @@ public void setUp() {
}

@Test
public void shouldThrowOnStaticQueryThatIsNotFinal() {
public void shouldThrowOnPullQueryThatIsNotFinal() {
// Given:
when(analysis.getResultMaterialization()).thenReturn(ResultMaterialization.CHANGES);

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Static queries don't support `EMIT CHANGES`");
expectedException.expectMessage("Pull queries don't support `EMIT CHANGES`");

// When:
validator.validate(analysis);
}

@Test(expected = KsqlException.class)
public void shouldThrowOnStaticQueryIfSinkSupplied() {
public void shouldThrowOnPullQueryIfSinkSupplied() {
// Given:
when(analysis.getInto()).thenReturn(Optional.of(into));

Expand All @@ -85,27 +85,27 @@ public void shouldThrowOnStaticQueryIfSinkSupplied() {
}

@Test
public void shouldThrowOnStaticQueryThatIsJoin() {
public void shouldThrowOnPullQueryThatIsJoin() {
// Given:
when(analysis.isJoin()).thenReturn(true);

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Static queries don't support JOIN clauses.");
expectedException.expectMessage("Pull queries don't support JOIN clauses.");

// When:
validator.validate(analysis);
}

@Test
public void shouldThrowOnStaticQueryThatIsWindowed() {
public void shouldThrowOnPullQueryThatIsWindowed() {
// Given:

when(analysis.getWindowExpression()).thenReturn(Optional.of(windowExpression));

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Static queries don't support WINDOW clauses.");
expectedException.expectMessage("Pull queries don't support WINDOW clauses.");

// When:
validator.validate(analysis);
Expand All @@ -118,7 +118,7 @@ public void shouldThrowOnGroupBy() {

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Static queries don't support GROUP BY clauses.");
expectedException.expectMessage("Pull queries don't support GROUP BY clauses.");

// When:
validator.validate(analysis);
Expand All @@ -131,7 +131,7 @@ public void shouldThrowOnPartitionBy() {

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Static queries don't support PARTITION BY clauses.");
expectedException.expectMessage("Pull queries don't support PARTITION BY clauses.");

// When:
validator.validate(analysis);
Expand All @@ -144,7 +144,7 @@ public void shouldThrowOnHavingClause() {

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Static queries don't support HAVING clauses.");
expectedException.expectMessage("Pull queries don't support HAVING clauses.");

// When:
validator.validate(analysis);
Expand All @@ -157,7 +157,7 @@ public void shouldThrowOnLimitClause() {

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Static queries don't support LIMIT clauses.");
expectedException.expectMessage("Pull queries don't support LIMIT clauses.");

// When:
validator.validate(analysis);
Expand All @@ -171,7 +171,7 @@ public void shouldThrowOnRowTimeInProjection() {

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Static queries don't support ROWTIME in select columns.");
expectedException.expectMessage("Pull queries don't support ROWTIME in select columns.");

// When:
validator.validate(analysis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package io.confluent.ksql.services;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;

import io.confluent.ksql.services.DefaultServiceContext.MemoizedSupplier;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.StreamedRow;
import java.net.URI;
import java.util.List;

/**
* A KSQL client implementation for use when communication with other nodes is not supported.
Expand All @@ -35,5 +37,10 @@ private DisabledKsqlClient() {
public RestResponse<KsqlEntityList> makeKsqlRequest(URI serverEndPoint, String sql) {
throw new UnsupportedOperationException("KSQL client is disabled");
}

@Override
public RestResponse<List<StreamedRow>> makeQueryRequest(URI serverEndPoint, String sql) {
throw new UnsupportedOperationException("KSQL client is disabled");
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.StreamedRow;
import java.net.URI;
import java.util.List;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
Expand All @@ -27,4 +29,9 @@ RestResponse<KsqlEntityList> makeKsqlRequest(
URI serverEndPoint,
String sql
);

RestResponse<List<StreamedRow>> makeQueryRequest(
URI serverEndPoint,
String sql
);
}
Loading

0 comments on commit b04794f

Please sign in to comment.