Skip to content

Commit

Permalink
added test for ws, made ct in client string
Browse files Browse the repository at this point in the history
  • Loading branch information
vpapavas committed Oct 26, 2021
1 parent 2237d63 commit d6544f2
Show file tree
Hide file tree
Showing 25 changed files with 315 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.math.BigDecimal;
import java.util.List;
import java.util.Optional;

/**
* A single record, returned as part of a query result.
Expand Down Expand Up @@ -278,6 +277,4 @@ public interface Row {
* @throws IllegalArgumentException if the column name is invalid
*/
KsqlArray getKsqlArray(String columnName);

Optional<String> getSerializedConsistencyVector();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import static io.netty.handler.codec.http.HttpResponseStatus.OK;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.api.client.AcksPublisher;
import io.confluent.ksql.api.client.BatchedQueryResult;
import io.confluent.ksql.api.client.Client;
Expand Down Expand Up @@ -66,6 +64,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
Expand All @@ -87,10 +86,11 @@ public class ClientImpl implements Client {
private final String basicAuthHeader;
private final boolean ownedVertx;
private final Map<String, Object> sessionVariables;
private final ConsistencyOffsetVector consistencyOffsetVector;
private final Map<String, Object> requestProperties;
private final LocalPropertyParser parser;

private AtomicReference<String> serializedConsistencyVector;

/**
* {@code Client} instances should be created via {@link Client#create(ClientOptions)}, NOT via
* this constructor.
Expand All @@ -117,7 +117,7 @@ private ClientImpl(final ClientOptions clientOptions, final Vertx vertx,
this.serverSocketAddress =
SocketAddress.inetSocketAddress(clientOptions.getPort(), clientOptions.getHost());
this.sessionVariables = new HashMap<>();
this.consistencyOffsetVector = new ConsistencyOffsetVector();
this.serializedConsistencyVector = new AtomicReference<>();
this.requestProperties = new HashMap<>();
this.parser = new LocalPropertyParser();
}
Expand All @@ -134,12 +134,13 @@ public CompletableFuture<StreamedQueryResult> streamQuery(
) {
if (ConsistencyOffsetVector.isConsistencyVectorEnabled(requestProperties)) {
requestProperties.put(
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_TOKEN,
getConsistencyOffsetVector().serialize());
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR,
serializedConsistencyVector.get());
}
final CompletableFuture<StreamedQueryResult> cf = new CompletableFuture<>();
makeQueryRequest(sql, properties, cf,
(ctx, rp, fut, req) -> new StreamQueryResponseHandler(ctx, rp, fut));
(ctx, rp, fut, req) -> new StreamQueryResponseHandler(
ctx, rp, fut, serializedConsistencyVector));
return cf;
}

Expand All @@ -155,16 +156,17 @@ public BatchedQueryResult executeQuery(
) {
if (ConsistencyOffsetVector.isConsistencyVectorEnabled(requestProperties)) {
requestProperties.put(
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_TOKEN,
getConsistencyOffsetVector().serialize());
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_CONSISTENCY_OFFSET_VECTOR,
serializedConsistencyVector.get());
}
final BatchedQueryResult result = new BatchedQueryResultImpl();
makeQueryRequest(
sql,
properties,
result,
(context, recordParser, cf, request) -> new ExecuteQueryResponseHandler(
context, recordParser, cf, clientOptions.getExecuteQueryMaxResultRows())
context, recordParser, cf, clientOptions.getExecuteQueryMaxResultRows(),
serializedConsistencyVector)
);
return result;
}
Expand Down Expand Up @@ -440,24 +442,15 @@ public Map<String, Object> getVariables() {
return new HashMap<>(sessionVariables);
}

public Object getRequestProperty(final String key) {
return requestProperties.get(key);
}

public void setRequestProperty(final String key, final Object value) {
Objects.requireNonNull(value, "value");
final Object parsed = parser.parse(key, value);
requestProperties.put(key, parsed);
}

public Map<String, Object> toMap() {
return ImmutableMap.copyOf(requestProperties);
}

@VisibleForTesting
@SuppressFBWarnings(value = "EI_EXPOSE_REP")
public ConsistencyOffsetVector getConsistencyOffsetVector() {
return consistencyOffsetVector;
public String getSerializedConsistencyVector() {
return serializedConsistencyVector.get();
}

@Override
Expand Down Expand Up @@ -585,15 +578,14 @@ private HttpClientRequest configureBasicAuth(final HttpClientRequest request) {
return request.putHeader(AUTHORIZATION.toString(), basicAuthHeader);
}

private static <T extends CompletableFuture<?>> void handleStreamedResponse(
private <T extends CompletableFuture<?>> void handleStreamedResponse(
final HttpClientResponse response,
final T cf,
final StreamedResponseHandlerSupplier<T> responseHandlerSupplier) {
if (response.statusCode() == OK.code()) {
final RecordParser recordParser = RecordParser.newDelimited("\n", response);
final ResponseHandler<T> responseHandler =
responseHandlerSupplier.get(Vertx.currentContext(), recordParser, cf, response.request());

recordParser.handler(responseHandler::handleBodyBuffer);
recordParser.endHandler(responseHandler::handleBodyEnd);
recordParser.exceptionHandler(responseHandler::handleException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,15 +42,18 @@ public class ExecuteQueryResponseHandler extends QueryResponseHandler<BatchedQue
private List<String> columnNames;
private List<ColumnType> columnTypes;
private Map<String, Integer> columnNameToIndex;
private AtomicReference<String> serializedConsistencyVector;

ExecuteQueryResponseHandler(
final Context context,
final RecordParser recordParser,
final BatchedQueryResult cf,
final int maxRows) {
final int maxRows,
final AtomicReference<String> serializedCV) {
super(context, recordParser, cf);
this.maxRows = maxRows;
this.rows = new ArrayList<>();
this.serializedConsistencyVector = Objects.requireNonNull(serializedCV, "serializedCV");
}

@Override
Expand All @@ -65,8 +69,7 @@ protected void handleRow(final Buffer buff) {
final Row row;
final Object json = buff.toJson();
if (json instanceof String) {
row = new RowImpl(Optional.of((String)json));
addToList(row);
serializedConsistencyVector.set((String)json);
} else if (json instanceof JsonArray) {
final JsonArray values = new JsonArray(buff);
row = new RowImpl(columnNames, columnTypes, values, columnNameToIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

public class RowImpl implements Row {

private final ImmutableList<String> columnNames;
private final ImmutableList<ColumnType> columnTypes;
private final KsqlArray values;
private final ImmutableMap<String, Integer> columnNameToIndex;
private final Optional<String> serializedConsistencyVector;

public RowImpl(
final List<String> columnNames,
Expand All @@ -46,15 +44,6 @@ public RowImpl(
this.columnTypes = ImmutableList.copyOf(Objects.requireNonNull(columnTypes));
this.values = new KsqlArray(Objects.requireNonNull(values).getList());
this.columnNameToIndex = ImmutableMap.copyOf(Objects.requireNonNull(columnNameToIndex));
this.serializedConsistencyVector = Optional.empty();
}

public RowImpl(final Optional<String> serializedConsistencyVector) {
this.columnNames = null;
this.columnTypes = null;
this.values = null;
this.columnNameToIndex = null;
this.serializedConsistencyVector = Objects.requireNonNull(serializedConsistencyVector);
}

@Override
Expand Down Expand Up @@ -189,11 +178,6 @@ public KsqlArray getKsqlArray(final String columnName) {
return getKsqlArray(indexFromName(columnName));
}

@Override
public Optional<String> getSerializedConsistencyVector() {
return serializedConsistencyVector;
}

private int indexFromName(final String columnName) {
final Integer index = columnNameToIndex.get(columnName);
if (index == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,23 @@
import io.vertx.core.json.JsonObject;
import io.vertx.core.parsetools.RecordParser;
import java.util.Map;
import java.util.Optional;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;

public class StreamQueryResponseHandler
extends QueryResponseHandler<CompletableFuture<StreamedQueryResult>> {

private StreamedQueryResultImpl queryResult;
private Map<String, Integer> columnNameToIndex;
private boolean paused;
private AtomicReference<String> serializedConsistencyVector;

StreamQueryResponseHandler(final Context context, final RecordParser recordParser,
final CompletableFuture<StreamedQueryResult> cf) {
final CompletableFuture<StreamedQueryResult> cf,
final AtomicReference<String> serializedCV) {
super(context, recordParser, cf);
this.serializedConsistencyVector = Objects.requireNonNull(serializedCV, "serializedCV");
}

@Override
Expand All @@ -58,17 +62,12 @@ protected void handleRow(final Buffer buff) {
if (queryResult == null) {
throw new IllegalStateException("handleRow called before metadata processed");
}

final Object json = buff.toJson();
final Row row;
if (json instanceof String) {
row = new RowImpl(Optional.of((String)json));
final boolean full = queryResult.accept(row);
if (full && !paused) {
recordParser.pause();
queryResult.drainHandler(this::publisherReceptive);
paused = true;
}
// This is the serialized consistency vector
// Don't add it to the publisher's buffer since the user should not see it
serializedConsistencyVector.set((String)json);
} else if (json instanceof JsonArray) {
row = new RowImpl(
queryResult.columnNames(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ public void handleError(final Exception e) {
sendError(e);
}

public boolean isPolling() {
return polling;
}

private void handleErrorWhilePolling(final Throwable t) {
log.error("Unexpected error while polling: " + t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,9 @@
import io.confluent.ksql.api.client.KsqlArray;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.util.RowUtil;
import io.netty.buffer.ByteBuf;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.junit.Before;
Expand Down Expand Up @@ -63,6 +61,7 @@ public class RowImplTest {
.add("04:40:34.789");

private RowImpl row;
private RowImpl rowWithCV;

@Before
public void setUp() {
Expand Down Expand Up @@ -224,5 +223,4 @@ public void shouldImplementHashCodeAndEquals() {
)
.testEquals();
}

}
Loading

0 comments on commit d6544f2

Please sign in to comment.