Skip to content

Commit

Permalink
chore: passing consistency token using request properties in HARouting
Browse files Browse the repository at this point in the history
  • Loading branch information
Patrick Stuedi committed Nov 4, 2021
1 parent a651677 commit 20c2d2f
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.confluent.ksql.util.KsqlRequestConfig;
import io.confluent.ksql.util.OffsetVector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -340,10 +341,14 @@ private static void forwardTo(
.map(location -> Integer.toString(location.getPartition()))
.collect(Collectors.joining(","));
// Add skip forward flag to properties
final Map<String, Object> requestProperties = ImmutableMap.of(
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING, true,
KsqlRequestConfig.KSQL_REQUEST_INTERNAL_REQUEST, true,
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS, partitions);
final Map<String, Object> requestProperties = new HashMap<>();
requestProperties.put(KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING, true);
requestProperties.put(KsqlRequestConfig.KSQL_REQUEST_INTERNAL_REQUEST, true);
requestProperties.put(KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS, partitions);
if (consistencyOffsetVector.isPresent()) {
requestProperties.put(KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR,
consistencyOffsetVector.get().serialize());
}
final RestResponse<Integer> response;

try {
Expand All @@ -355,8 +360,7 @@ private static void forwardTo(
statement.getSessionConfig().getOverrides(),
requestProperties,
streamedRowsHandler(owner, pullQueryQueue, rowFactory, outputSchema),
shouldCancelRequests,
consistencyOffsetVector.map(ConsistencyOffsetVector::serialize)
shouldCancelRequests
);
} catch (Exception e) {
// If we threw some explicit exception, then let it bubble up. All of the row handling is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void shouldCallRouteQuery_success() throws InterruptedException, Executio
return null;
}).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
doNothing().when(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any())).thenAnswer(
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any())).thenAnswer(
new Answer() {
private int count = 0;

Expand Down Expand Up @@ -229,7 +229,7 @@ public Object answer(InvocationOnMock i) {
// Then:
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
verify(ksqlClient, times(2)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any());
verify(ksqlClient, times(2)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any());
assertThat(pullQueryQueue.size(), is(2));
assertThat(pullQueryQueue.pollRow(1, TimeUnit.SECONDS).getRow(), is(ROW1));
assertThat(pullQueryQueue.pollRow(1, TimeUnit.SECONDS).getRow(), is(ROW2));
Expand All @@ -248,7 +248,7 @@ public void shouldCallRouteQuery_twoRound() throws InterruptedException, Executi
throw new StandbyFallbackException("Error!");
}).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
doNothing().when(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any())).thenAnswer(
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any())).thenAnswer(
new Answer() {
private int count = 0;

Expand Down Expand Up @@ -285,7 +285,7 @@ public Object answer(InvocationOnMock i) {
// Then:
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
verify(ksqlClient, times(3)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any());
verify(ksqlClient, times(3)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any());

assertThat(pullQueryQueue.size(), is(2));
assertThat(pullQueryQueue.pollRow(1, TimeUnit.SECONDS).getRow(), is(ROW2));
Expand All @@ -309,7 +309,7 @@ public void shouldCallRouteQuery_partitionFailure() throws InterruptedException,
return null;
}).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());

when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any())).thenAnswer(
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any())).thenAnswer(
new Answer() {
private int count = 0;

Expand Down Expand Up @@ -350,7 +350,7 @@ public Object answer(InvocationOnMock i) {
// Then:
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
verify(ksqlClient, times(3)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any());
verify(ksqlClient, times(3)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any());

assertThat(pullQueryQueue.size(), is(4));
assertThat(pullQueryQueue.pollRow(1, TimeUnit.SECONDS).getRow(), is(ROW2));
Expand All @@ -369,7 +369,7 @@ public void shouldCallRouteQuery_twoRound_networkError()
throws InterruptedException, ExecutionException {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(i -> {
throw new RuntimeException("Network error!");
}
Expand All @@ -387,7 +387,7 @@ public void shouldCallRouteQuery_twoRound_networkError()

// Then:
verify(ksqlClient, times(1)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(),
any(), any());
any());
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location2.removeHeadHost())), any(), any(), any());

assertThat(pullQueryQueue.size(), is(1));
Expand All @@ -409,7 +409,7 @@ public void shouldCallRouteQuery_allStandbysFail() {
doAnswer(i -> {
throw new StandbyFallbackException("Error1!");
}).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any())).thenAnswer(
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any())).thenAnswer(
new Answer() {
private int count = 0;

Expand Down Expand Up @@ -453,7 +453,7 @@ public Object answer(InvocationOnMock i) {
// Then:
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
verify(ksqlClient, times(4)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any());
verify(ksqlClient, times(4)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any());

assertThat(e.getCause().getMessage(), containsString("Exhausted standby hosts to try."));

Expand Down Expand Up @@ -514,7 +514,7 @@ public void shouldCallRouteQuery_allFilteredWithCause() {
public void shouldNotRouteToFilteredHost() throws InterruptedException, ExecutionException {
// Given:
location1 = new PartitionLocation(Optional.empty(), 1, ImmutableList.of(badNode, node1));
when(ksqlClient.makeQueryRequest(any(), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(any(), any(), any(), any(), any(), any()))
.then(invocationOnMock -> RestResponse.successful(200, 2));
locate(location1, location2, location3, location4);

Expand All @@ -533,7 +533,7 @@ public void shouldNotRouteToFilteredHost() throws InterruptedException, Executio

// Then:
verify(ksqlClient, never())
.makeQueryRequest(eq(badNode.location()), any(), any(), any(), any(), any(), any());
.makeQueryRequest(eq(badNode.location()), any(), any(), any(), any(), any());

final double fetch_count = getMetricValue("-partition-fetch-count");
final double resubmission_count = getMetricValue("-partition-fetch-resubmission-count");
Expand All @@ -545,7 +545,7 @@ public void shouldNotRouteToFilteredHost() throws InterruptedException, Executio
public void forwardingError_errorRow() {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(i -> {
Map<String, ?> requestProperties = i.getArgument(3);
Consumer<List<StreamedRow>> rowConsumer = i.getArgument(4);
Expand Down Expand Up @@ -582,7 +582,7 @@ public void forwardingError_errorRow() {
public void forwardingError_authError() {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(i -> {
Map<String, ?> requestProperties = i.getArgument(3);
Consumer<List<StreamedRow>> rowConsumer = i.getArgument(4);
Expand Down Expand Up @@ -616,7 +616,7 @@ public void forwardingError_authError() {
public void forwardingError_throwsError() {
// Given:
locate(location5);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenThrow(new RuntimeException("Network Error"));

// When:
Expand All @@ -643,7 +643,7 @@ public void forwardingError_throwsError() {
public void forwardingError_cancelled() throws ExecutionException, InterruptedException {
// Given:
locate(location5);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(a -> {
Consumer<List<StreamedRow>> rowConsumer = a.getArgument(4);
rowConsumer.accept(
Expand Down Expand Up @@ -675,7 +675,7 @@ public void forwardingError_cancelled() throws ExecutionException, InterruptedEx
public void forwardingError_noRows() {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(i -> {
Map<String, ?> requestProperties = i.getArgument(3);
Consumer<List<StreamedRow>> rowConsumer = i.getArgument(4);
Expand Down Expand Up @@ -710,7 +710,7 @@ public void forwardingError_noRows() {
public void forwardingError_invalidSchema() {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(i -> {
Map<String, ?> requestProperties = i.getArgument(3);
Consumer<List<StreamedRow>> rowConsumer = i.getArgument(4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

Expand Down Expand Up @@ -66,8 +65,7 @@ public RestResponse<Integer> makeQueryRequest(
final Map<String, ?> configOverrides,
final Map<String, ?> requestProperties,
final Consumer<List<StreamedRow>> rowConsumer,
final CompletableFuture<Void> shouldCloseConnection,
final Optional<String> serializedOffsetVector
final CompletableFuture<Void> shouldCloseConnection
) {
throw new UnsupportedOperationException("KSQL client is disabled");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -78,8 +77,7 @@ RestResponse<Integer> makeQueryRequest(
Map<String, ?> configOverrides,
Map<String, ?> requestProperties,
Consumer<List<StreamedRow>> rowConsumer,
CompletableFuture<Void> shouldCloseConnection,
Optional<String> serializedOffsetVector
CompletableFuture<Void> shouldCloseConnection
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ public RestResponse<Integer> makeQueryRequest(
final Map<String, ?> configOverrides,
final Map<String, ?> requestProperties,
final Consumer<List<StreamedRow>> rowConsumer,
final CompletableFuture<Void> shouldCloseConnection,
final Optional<String> serializedOffsetVector
final CompletableFuture<Void> shouldCloseConnection
) {
final KsqlTarget target = sharedClient
.target(serverEndPoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

Expand Down Expand Up @@ -93,8 +92,7 @@ public RestResponse<Integer> makeQueryRequest(
final Map<String, ?> configOverrides,
final Map<String, ?> requestProperties,
final Consumer<List<StreamedRow>> rowConsumer,
final CompletableFuture<Void> shouldCloseConnection,
final Optional<String> serializedOffsetVector
final CompletableFuture<Void> shouldCloseConnection
) {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,9 @@ public RestResponse<Integer> makeQueryRequest(
final Map<String, ?> configOverrides,
final Map<String, ?> requestProperties,
final Consumer<List<StreamedRow>> rowConsumer,
final CompletableFuture<Void> shouldCloseConnection,
final Optional<String> serializedOffsetVector) {
final CompletableFuture<Void> shouldCloseConnection) {
return getClient().makeQueryRequest(serverEndPoint, sql, configOverrides, requestProperties,
rowConsumer, shouldCloseConnection, serializedOffsetVector);
rowConsumer, shouldCloseConnection);
}

@Override
Expand Down

0 comments on commit 20c2d2f

Please sign in to comment.