Skip to content

Commit

Permalink
chore: passing consistency token using request properties in HARouting
Browse files Browse the repository at this point in the history
  • Loading branch information
Patrick Stuedi committed Nov 16, 2021
1 parent 9ae6a70 commit 5315afb
Show file tree
Hide file tree
Showing 8 changed files with 512 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings({"checkstyle:ClassDataAbstractionCoupling", "checkstyle:CyclomaticComplexity"})
public final class HARouting implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(HARouting.class);
Expand Down Expand Up @@ -340,10 +341,15 @@ private static void forwardTo(
.map(location -> Integer.toString(location.getPartition()))
.collect(Collectors.joining(","));
// Add skip forward flag to properties
final Map<String, Object> requestProperties = ImmutableMap.of(
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING, true,
KsqlRequestConfig.KSQL_REQUEST_INTERNAL_REQUEST, true,
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS, partitions);
final ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<String, Object>()
.put(KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING, true)
.put(KsqlRequestConfig.KSQL_REQUEST_INTERNAL_REQUEST, true)
.put(KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS, partitions);
if (consistencyOffsetVector.isPresent()) {
builder.put(KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR,
consistencyOffsetVector.get().serialize());
}
final Map<String, Object> requestProperties = builder.build();
final RestResponse<Integer> response;

try {
Expand All @@ -355,8 +361,7 @@ private static void forwardTo(
statement.getSessionConfig().getOverrides(),
requestProperties,
streamedRowsHandler(owner, pullQueryQueue, rowFactory, outputSchema),
shouldCancelRequests,
consistencyOffsetVector.map(ConsistencyOffsetVector::serialize)
shouldCancelRequests
);
} catch (Exception e) {
// If we threw some explicit exception, then let it bubble up. All of the row handling is
Expand Down Expand Up @@ -438,7 +443,11 @@ private static Consumer<List<StreamedRow>> streamedRowsHandler(
}

if (!row.getRow().isPresent()) {
throw new KsqlException("Missing row data on row " + i + " of chunk");
if (row.getConsistencyToken().isPresent()) {
continue;
} else {
throw new KsqlException("Missing row data on row " + i + " of chunk");
}
}

final List<?> r = row.getRow().get().getColumns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void shouldCallRouteQuery_success() throws InterruptedException, Executio
return null;
}).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
doNothing().when(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any())).thenAnswer(
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any())).thenAnswer(
new Answer() {
private int count = 0;

Expand Down Expand Up @@ -229,7 +229,7 @@ public Object answer(InvocationOnMock i) {
// Then:
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
verify(ksqlClient, times(2)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any());
verify(ksqlClient, times(2)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any());
assertThat(pullQueryQueue.size(), is(2));
assertThat(pullQueryQueue.pollRow(1, TimeUnit.SECONDS).getRow(), is(ROW1));
assertThat(pullQueryQueue.pollRow(1, TimeUnit.SECONDS).getRow(), is(ROW2));
Expand All @@ -248,7 +248,7 @@ public void shouldCallRouteQuery_twoRound() throws InterruptedException, Executi
throw new StandbyFallbackException("Error!");
}).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
doNothing().when(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any())).thenAnswer(
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any())).thenAnswer(
new Answer() {
private int count = 0;

Expand Down Expand Up @@ -285,7 +285,7 @@ public Object answer(InvocationOnMock i) {
// Then:
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
verify(ksqlClient, times(3)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any());
verify(ksqlClient, times(3)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any());

assertThat(pullQueryQueue.size(), is(2));
assertThat(pullQueryQueue.pollRow(1, TimeUnit.SECONDS).getRow(), is(ROW2));
Expand All @@ -309,7 +309,7 @@ public void shouldCallRouteQuery_partitionFailure() throws InterruptedException,
return null;
}).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());

when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any())).thenAnswer(
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any())).thenAnswer(
new Answer() {
private int count = 0;

Expand Down Expand Up @@ -350,7 +350,7 @@ public Object answer(InvocationOnMock i) {
// Then:
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
verify(ksqlClient, times(3)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any());
verify(ksqlClient, times(3)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any());

assertThat(pullQueryQueue.size(), is(4));
assertThat(pullQueryQueue.pollRow(1, TimeUnit.SECONDS).getRow(), is(ROW2));
Expand All @@ -369,7 +369,7 @@ public void shouldCallRouteQuery_twoRound_networkError()
throws InterruptedException, ExecutionException {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(i -> {
throw new RuntimeException("Network error!");
}
Expand All @@ -387,7 +387,7 @@ public void shouldCallRouteQuery_twoRound_networkError()

// Then:
verify(ksqlClient, times(1)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(),
any(), any());
any());
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location2.removeHeadHost())), any(), any(), any());

assertThat(pullQueryQueue.size(), is(1));
Expand All @@ -409,7 +409,7 @@ public void shouldCallRouteQuery_allStandbysFail() {
doAnswer(i -> {
throw new StandbyFallbackException("Error1!");
}).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any())).thenAnswer(
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any())).thenAnswer(
new Answer() {
private int count = 0;

Expand Down Expand Up @@ -453,7 +453,7 @@ public Object answer(InvocationOnMock i) {
// Then:
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
verify(ksqlClient, times(4)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any());
verify(ksqlClient, times(4)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any());

assertThat(e.getCause().getMessage(), containsString("Exhausted standby hosts to try."));

Expand Down Expand Up @@ -514,7 +514,7 @@ public void shouldCallRouteQuery_allFilteredWithCause() {
public void shouldNotRouteToFilteredHost() throws InterruptedException, ExecutionException {
// Given:
location1 = new PartitionLocation(Optional.empty(), 1, ImmutableList.of(badNode, node1));
when(ksqlClient.makeQueryRequest(any(), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(any(), any(), any(), any(), any(), any()))
.then(invocationOnMock -> RestResponse.successful(200, 2));
locate(location1, location2, location3, location4);

Expand All @@ -533,7 +533,7 @@ public void shouldNotRouteToFilteredHost() throws InterruptedException, Executio

// Then:
verify(ksqlClient, never())
.makeQueryRequest(eq(badNode.location()), any(), any(), any(), any(), any(), any());
.makeQueryRequest(eq(badNode.location()), any(), any(), any(), any(), any());

final double fetch_count = getMetricValue("-partition-fetch-count");
final double resubmission_count = getMetricValue("-partition-fetch-resubmission-count");
Expand All @@ -545,7 +545,7 @@ public void shouldNotRouteToFilteredHost() throws InterruptedException, Executio
public void forwardingError_errorRow() {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(i -> {
Map<String, ?> requestProperties = i.getArgument(3);
Consumer<List<StreamedRow>> rowConsumer = i.getArgument(4);
Expand Down Expand Up @@ -582,7 +582,7 @@ public void forwardingError_errorRow() {
public void forwardingError_authError() {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(i -> {
Map<String, ?> requestProperties = i.getArgument(3);
Consumer<List<StreamedRow>> rowConsumer = i.getArgument(4);
Expand Down Expand Up @@ -616,7 +616,7 @@ public void forwardingError_authError() {
public void forwardingError_throwsError() {
// Given:
locate(location5);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenThrow(new RuntimeException("Network Error"));

// When:
Expand All @@ -643,7 +643,7 @@ public void forwardingError_throwsError() {
public void forwardingError_cancelled() throws ExecutionException, InterruptedException {
// Given:
locate(location5);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(a -> {
Consumer<List<StreamedRow>> rowConsumer = a.getArgument(4);
rowConsumer.accept(
Expand Down Expand Up @@ -675,7 +675,7 @@ public void forwardingError_cancelled() throws ExecutionException, InterruptedEx
public void forwardingError_noRows() {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(i -> {
Map<String, ?> requestProperties = i.getArgument(3);
Consumer<List<StreamedRow>> rowConsumer = i.getArgument(4);
Expand Down Expand Up @@ -710,7 +710,7 @@ public void forwardingError_noRows() {
public void forwardingError_invalidSchema() {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(i -> {
Map<String, ?> requestProperties = i.getArgument(3);
Consumer<List<StreamedRow>> rowConsumer = i.getArgument(4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

Expand Down Expand Up @@ -66,8 +65,7 @@ public RestResponse<Integer> makeQueryRequest(
final Map<String, ?> configOverrides,
final Map<String, ?> requestProperties,
final Consumer<List<StreamedRow>> rowConsumer,
final CompletableFuture<Void> shouldCloseConnection,
final Optional<String> serializedOffsetVector
final CompletableFuture<Void> shouldCloseConnection
) {
throw new UnsupportedOperationException("KSQL client is disabled");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -78,8 +77,7 @@ RestResponse<Integer> makeQueryRequest(
Map<String, ?> configOverrides,
Map<String, ?> requestProperties,
Consumer<List<StreamedRow>> rowConsumer,
CompletableFuture<Void> shouldCloseConnection,
Optional<String> serializedOffsetVector
CompletableFuture<Void> shouldCloseConnection
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ public RestResponse<Integer> makeQueryRequest(
final Map<String, ?> configOverrides,
final Map<String, ?> requestProperties,
final Consumer<List<StreamedRow>> rowConsumer,
final CompletableFuture<Void> shouldCloseConnection,
final Optional<String> serializedOffsetVector
final CompletableFuture<Void> shouldCloseConnection
) {
final KsqlTarget target = sharedClient
.target(serverEndPoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

Expand Down Expand Up @@ -93,8 +92,7 @@ public RestResponse<Integer> makeQueryRequest(
final Map<String, ?> configOverrides,
final Map<String, ?> requestProperties,
final Consumer<List<StreamedRow>> rowConsumer,
final CompletableFuture<Void> shouldCloseConnection,
final Optional<String> serializedOffsetVector
final CompletableFuture<Void> shouldCloseConnection
) {
throw new UnsupportedOperationException();
}
Expand Down
Loading

0 comments on commit 5315afb

Please sign in to comment.