Skip to content

Commit

Permalink
fix: stream pull query internal overrides shouldn't clash with query …
Browse files Browse the repository at this point in the history
…configs (#8166)

Previously, if you SET 'auto.offset.reset'='earliest' or 'latest', the query would fail
with an exception. This patch fixes that and simply overrides the configs as desired.
  • Loading branch information
vvcephei authored Sep 24, 2021
1 parent cb82810 commit 0c57258
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import io.vertx.core.Context;
import java.io.Closeable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -343,18 +344,17 @@ public StreamPullQueryMetadata createStreamPullQuery(
}

// Stream pull query overrides.
final ConfiguredStatement<Query> statement = statementOrig.withConfigOverrides(
ImmutableMap.<String, Object>builder()
.putAll(statementOrig.getSessionConfig().getOverrides())
// Starting from earliest is semantically necessary.
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// Using a single thread keeps these queries as lightweight as possible, since we are
// not counting them against the transient query limit.
.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
// There's no point in EOS, since this query only produces side effects.
.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE)
.build()
);
final Map<String, Object> overrides =
new HashMap<>(statementOrig.getSessionConfig().getOverrides());
// Starting from earliest is semantically necessary.
overrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Using a single thread keeps these queries as lightweight as possible, since we are
// not counting them against the transient query limit.
overrides.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
// There's no point in EOS, since this query only produces side effects.
overrides.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE);

final ConfiguredStatement<Query> statement = statementOrig.withConfigOverrides(overrides);
final ImmutableMap<TopicPartition, Long> endOffsets =
getQueryInputEndOffsets(analysis, serviceContext.getAdminClient());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import static org.apache.kafka.common.resource.ResourceType.TOPIC;
import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -103,7 +102,6 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -499,6 +497,42 @@ public void shouldExecutePushQueryThatReturnsStreamOverRestV1() {
assertThat(messages.get(3), is("{\"finalMessage\":\"Limit Reached\"}]"));
}

@Test
public void shouldExecutePullQueryThatReturnsStreamOverRestV1AndIgnoreReset() {
// When:
final KsqlRequest request =
new KsqlRequest(
"SELECT USERID, PAGEID, VIEWTIME from " + PAGE_VIEW_STREAM + ";",
ImmutableMap.of("auto.offset.reset", "latest"), // should get ignored
ImmutableMap.of(),
null
);

final String response = RestIntegrationTestUtil.rawRestRequest(REST_APP, HTTP_1_1, POST,
"/query", request, KsqlMediaType.KSQL_V1_JSON.mediaType(),
Optional.empty())
.body()
.toString();

// Then:

assertThat(
response.replaceFirst("queryId\":\"transient_[^\"]*\"", "queryId\":\"XYZ\""),
equalTo(
"[{\"header\":{\"queryId\":\"XYZ\",\"schema\":\"`USERID` STRING, `PAGEID` STRING, `VIEWTIME` BIGINT\"}},\n"
+ "{\"row\":{\"columns\":[\"USER_1\",\"PAGE_1\",1]}},\n"
+ "{\"row\":{\"columns\":[\"USER_2\",\"PAGE_2\",2]}},\n"
+ "{\"row\":{\"columns\":[\"USER_4\",\"PAGE_3\",3]}},\n"
+ "{\"row\":{\"columns\":[\"USER_3\",\"PAGE_4\",4]}},\n"
+ "{\"row\":{\"columns\":[\"USER_0\",\"PAGE_5\",5]}},\n"
+ "{\"row\":{\"columns\":[\"USER_2\",\"PAGE_5\",6]}},\n"
+ "{\"row\":{\"columns\":[\"USER_3\",\"PAGE_5\",7]}},\n"
+ "\n"
+ "]\n"
)
);
}

@Test
public void shouldExecutePushQueryThatReturnsTableOverRestV1() {
// When:
Expand Down

0 comments on commit 0c57258

Please sign in to comment.