Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: passing consistency token using request properties in HARouting #8312

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings({"checkstyle:ClassDataAbstractionCoupling", "checkstyle:CyclomaticComplexity"})
public final class HARouting implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(HARouting.class);
Expand Down Expand Up @@ -340,10 +341,15 @@ private static void forwardTo(
.map(location -> Integer.toString(location.getPartition()))
.collect(Collectors.joining(","));
// Add skip forward flag to properties
final Map<String, Object> requestProperties = ImmutableMap.of(
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING, true,
KsqlRequestConfig.KSQL_REQUEST_INTERNAL_REQUEST, true,
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS, partitions);
final ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<String, Object>()
.put(KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING, true)
.put(KsqlRequestConfig.KSQL_REQUEST_INTERNAL_REQUEST, true)
.put(KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS, partitions);
if (consistencyOffsetVector.isPresent()) {
builder.put(KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR,
consistencyOffsetVector.get().serialize());
}
final Map<String, Object> requestProperties = builder.build();
final RestResponse<Integer> response;

try {
Expand All @@ -355,8 +361,7 @@ private static void forwardTo(
statement.getSessionConfig().getOverrides(),
requestProperties,
streamedRowsHandler(owner, pullQueryQueue, rowFactory, outputSchema),
shouldCancelRequests,
consistencyOffsetVector.map(ConsistencyOffsetVector::serialize)
shouldCancelRequests
);
} catch (Exception e) {
// If we threw some explicit exception, then let it bubble up. All of the row handling is
Expand Down Expand Up @@ -438,7 +443,11 @@ private static Consumer<List<StreamedRow>> streamedRowsHandler(
}

if (!row.getRow().isPresent()) {
throw new KsqlException("Missing row data on row " + i + " of chunk");
if (row.getConsistencyToken().isPresent()) {
continue;
} else {
throw new KsqlException("Missing row data on row " + i + " of chunk");
}
}

final List<?> r = row.getRow().get().getColumns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void shouldCallRouteQuery_success() throws InterruptedException, Executio
return null;
}).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
doNothing().when(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any())).thenAnswer(
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any())).thenAnswer(
new Answer() {
private int count = 0;

Expand Down Expand Up @@ -229,7 +229,7 @@ public Object answer(InvocationOnMock i) {
// Then:
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
verify(ksqlClient, times(2)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any());
verify(ksqlClient, times(2)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any());
assertThat(pullQueryQueue.size(), is(2));
assertThat(pullQueryQueue.pollRow(1, TimeUnit.SECONDS).getRow(), is(ROW1));
assertThat(pullQueryQueue.pollRow(1, TimeUnit.SECONDS).getRow(), is(ROW2));
Expand All @@ -248,7 +248,7 @@ public void shouldCallRouteQuery_twoRound() throws InterruptedException, Executi
throw new StandbyFallbackException("Error!");
}).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
doNothing().when(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any())).thenAnswer(
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any())).thenAnswer(
new Answer() {
private int count = 0;

Expand Down Expand Up @@ -285,7 +285,7 @@ public Object answer(InvocationOnMock i) {
// Then:
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
verify(ksqlClient, times(3)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any());
verify(ksqlClient, times(3)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any());

assertThat(pullQueryQueue.size(), is(2));
assertThat(pullQueryQueue.pollRow(1, TimeUnit.SECONDS).getRow(), is(ROW2));
Expand All @@ -309,7 +309,7 @@ public void shouldCallRouteQuery_partitionFailure() throws InterruptedException,
return null;
}).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());

when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any())).thenAnswer(
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any())).thenAnswer(
new Answer() {
private int count = 0;

Expand Down Expand Up @@ -350,7 +350,7 @@ public Object answer(InvocationOnMock i) {
// Then:
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
verify(ksqlClient, times(3)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any());
verify(ksqlClient, times(3)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any());

assertThat(pullQueryQueue.size(), is(4));
assertThat(pullQueryQueue.pollRow(1, TimeUnit.SECONDS).getRow(), is(ROW2));
Expand All @@ -369,7 +369,7 @@ public void shouldCallRouteQuery_twoRound_networkError()
throws InterruptedException, ExecutionException {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(i -> {
throw new RuntimeException("Network error!");
}
Expand All @@ -387,7 +387,7 @@ public void shouldCallRouteQuery_twoRound_networkError()

// Then:
verify(ksqlClient, times(1)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(),
any(), any());
any());
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location2.removeHeadHost())), any(), any(), any());

assertThat(pullQueryQueue.size(), is(1));
Expand All @@ -409,7 +409,7 @@ public void shouldCallRouteQuery_allStandbysFail() {
doAnswer(i -> {
throw new StandbyFallbackException("Error1!");
}).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any())).thenAnswer(
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any())).thenAnswer(
new Answer() {
private int count = 0;

Expand Down Expand Up @@ -453,7 +453,7 @@ public Object answer(InvocationOnMock i) {
// Then:
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location1)), any(), any(), any());
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location3)), any(), any(), any());
verify(ksqlClient, times(4)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any());
verify(ksqlClient, times(4)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any());

assertThat(e.getCause().getMessage(), containsString("Exhausted standby hosts to try."));

Expand Down Expand Up @@ -514,7 +514,7 @@ public void shouldCallRouteQuery_allFilteredWithCause() {
public void shouldNotRouteToFilteredHost() throws InterruptedException, ExecutionException {
// Given:
location1 = new PartitionLocation(Optional.empty(), 1, ImmutableList.of(badNode, node1));
when(ksqlClient.makeQueryRequest(any(), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(any(), any(), any(), any(), any(), any()))
.then(invocationOnMock -> RestResponse.successful(200, 2));
locate(location1, location2, location3, location4);

Expand All @@ -533,7 +533,7 @@ public void shouldNotRouteToFilteredHost() throws InterruptedException, Executio

// Then:
verify(ksqlClient, never())
.makeQueryRequest(eq(badNode.location()), any(), any(), any(), any(), any(), any());
.makeQueryRequest(eq(badNode.location()), any(), any(), any(), any(), any());

final double fetch_count = getMetricValue("-partition-fetch-count");
final double resubmission_count = getMetricValue("-partition-fetch-resubmission-count");
Expand All @@ -545,7 +545,7 @@ public void shouldNotRouteToFilteredHost() throws InterruptedException, Executio
public void forwardingError_errorRow() {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(i -> {
Map<String, ?> requestProperties = i.getArgument(3);
Consumer<List<StreamedRow>> rowConsumer = i.getArgument(4);
Expand Down Expand Up @@ -582,7 +582,7 @@ public void forwardingError_errorRow() {
public void forwardingError_authError() {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(i -> {
Map<String, ?> requestProperties = i.getArgument(3);
Consumer<List<StreamedRow>> rowConsumer = i.getArgument(4);
Expand Down Expand Up @@ -616,7 +616,7 @@ public void forwardingError_authError() {
public void forwardingError_throwsError() {
// Given:
locate(location5);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenThrow(new RuntimeException("Network Error"));

// When:
Expand All @@ -643,7 +643,7 @@ public void forwardingError_throwsError() {
public void forwardingError_cancelled() throws ExecutionException, InterruptedException {
// Given:
locate(location5);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(a -> {
Consumer<List<StreamedRow>> rowConsumer = a.getArgument(4);
rowConsumer.accept(
Expand Down Expand Up @@ -675,7 +675,7 @@ public void forwardingError_cancelled() throws ExecutionException, InterruptedEx
public void forwardingError_noRows() {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(i -> {
Map<String, ?> requestProperties = i.getArgument(3);
Consumer<List<StreamedRow>> rowConsumer = i.getArgument(4);
Expand Down Expand Up @@ -710,7 +710,7 @@ public void forwardingError_noRows() {
public void forwardingError_invalidSchema() {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
.thenAnswer(i -> {
Map<String, ?> requestProperties = i.getArgument(3);
Consumer<List<StreamedRow>> rowConsumer = i.getArgument(4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

Expand Down Expand Up @@ -66,8 +65,7 @@ public RestResponse<Integer> makeQueryRequest(
final Map<String, ?> configOverrides,
final Map<String, ?> requestProperties,
final Consumer<List<StreamedRow>> rowConsumer,
final CompletableFuture<Void> shouldCloseConnection,
final Optional<String> serializedOffsetVector
final CompletableFuture<Void> shouldCloseConnection
) {
throw new UnsupportedOperationException("KSQL client is disabled");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -78,8 +77,7 @@ RestResponse<Integer> makeQueryRequest(
Map<String, ?> configOverrides,
Map<String, ?> requestProperties,
Consumer<List<StreamedRow>> rowConsumer,
CompletableFuture<Void> shouldCloseConnection,
Optional<String> serializedOffsetVector
CompletableFuture<Void> shouldCloseConnection
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ public RestResponse<Integer> makeQueryRequest(
final Map<String, ?> configOverrides,
final Map<String, ?> requestProperties,
final Consumer<List<StreamedRow>> rowConsumer,
final CompletableFuture<Void> shouldCloseConnection,
final Optional<String> serializedOffsetVector
final CompletableFuture<Void> shouldCloseConnection
) {
final KsqlTarget target = sharedClient
.target(serverEndPoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

Expand Down Expand Up @@ -93,8 +92,7 @@ public RestResponse<Integer> makeQueryRequest(
final Map<String, ?> configOverrides,
final Map<String, ?> requestProperties,
final Consumer<List<StreamedRow>> rowConsumer,
final CompletableFuture<Void> shouldCloseConnection,
final Optional<String> serializedOffsetVector
final CompletableFuture<Void> shouldCloseConnection
) {
throw new UnsupportedOperationException();
}
Expand Down
Loading