Skip to content

Commit

Permalink
fix: Revert "chore: upgrade vertx to 4.2 (#8975)" (#9003)
Browse files Browse the repository at this point in the history
This reverts commit f5a4f60.
  • Loading branch information
jnh5y authored Apr 8, 2022
1 parent 57f3596 commit d4214de
Show file tree
Hide file tree
Showing 40 changed files with 856 additions and 562 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.JksOptions;
Expand Down Expand Up @@ -605,44 +604,26 @@ private <T extends CompletableFuture<?>> void makeRequest(
final Handler<HttpClientResponse> responseHandler,
final boolean endRequest,
final HttpMethod method) {
final RequestOptions options = new RequestOptions();
options.setMethod(method);
options.setServer(serverSocketAddress);
options.setPort(clientOptions.getPort());
options.setHost(clientOptions.getHost());
options.setURI(path);

httpClient.request(options, ar -> {
if (ar.failed()) {
cf.completeExceptionally(ar.cause());
}

HttpClientRequest request = ar.result();
request.response(response -> {
if (response.failed()) {
cf.completeExceptionally(response.cause());
}

responseHandler.handle(response.result());
});
request.exceptionHandler(cf::completeExceptionally);

request = configureUserAgent(request);
if (clientOptions.isUseBasicAuth()) {
request = configureBasicAuth(request);
}
if (clientOptions.getRequestHeaders() != null) {
for (final Entry<String, String> entry : clientOptions.getRequestHeaders().entrySet()) {
request.putHeader(entry.getKey(), entry.getValue());
}
}
if (endRequest) {
request.end(requestBody);
} else {
final HttpClientRequest finalRequest = request;
finalRequest.sendHead(version -> finalRequest.writeCustomFrame(0, 0, requestBody));
HttpClientRequest request = httpClient.request(method,
serverSocketAddress, clientOptions.getPort(), clientOptions.getHost(),
path,
responseHandler)
.exceptionHandler(cf::completeExceptionally);
request = configureUserAgent(request);
if (clientOptions.isUseBasicAuth()) {
request = configureBasicAuth(request);
}
if (clientOptions.getRequestHeaders() != null) {
for (final Entry<String, String> entry : clientOptions.getRequestHeaders().entrySet()) {
request.putHeader(entry.getKey(), entry.getValue());
}
});
}
if (endRequest) {
request.end(requestBody);
} else {
final HttpClientRequest finalRequest = request;
finalRequest.sendHead(version -> finalRequest.writeCustomFrame(0, 0, requestBody));
}
}

private HttpClientRequest configureBasicAuth(final HttpClientRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.rest.ApiJsonMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.ksql.api.client.util.JsonMapper;
import io.confluent.ksql.rest.entity.QueryResponseMetadata;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
Expand All @@ -24,6 +25,8 @@

abstract class QueryResponseHandler<T extends CompletableFuture<?>> extends ResponseHandler<T> {

private static final ObjectMapper JSON_MAPPER = JsonMapper.get();

protected boolean hasReadArguments;

QueryResponseHandler(final Context context, final RecordParser recordParser, final T cf) {
Expand Down Expand Up @@ -59,8 +62,7 @@ private void handleArgs(final Buffer buff) {

final QueryResponseMetadata queryResponseMetadata;
try {
queryResponseMetadata = ApiJsonMapper.INSTANCE.get()
.readValue(buff.getBytes(), QueryResponseMetadata.class);
queryResponseMetadata = JSON_MAPPER.readValue(buff.getBytes(), QueryResponseMetadata.class);
} catch (Exception e) {
cf.completeExceptionally(e);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ public class StreamQueryResponseHandler
private Map<String, Object> properties;
private ClientImpl client;

StreamQueryResponseHandler(
final Context context,
final RecordParser recordParser,
StreamQueryResponseHandler(final Context context, final RecordParser recordParser,
final CompletableFuture<StreamedQueryResult> cf,
final AtomicReference<String> serializedCV,
final AtomicReference<String> continuationToken,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client.util;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;

public final class JsonMapper {

private static final ObjectMapper MAPPER = new ObjectMapper();

static {
MAPPER.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.disable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES)
.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN)
.setNodeFactory(JsonNodeFactory.withExactBigDecimals(true))
.registerModule(new Jdk8Module());
}

private JsonMapper() {

}

public static ObjectMapper get() {
return MAPPER.copy();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,14 @@
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;

import com.google.common.testing.EqualsTester;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.RequestOptions;
import java.util.HashMap;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -76,11 +71,7 @@ public void shouldSetUserAgent() {

// When
when(vertx.createHttpClient(any())).thenReturn(httpClient);
doAnswer(a -> {
((Handler<AsyncResult<HttpClientRequest>>) a.getArgument(1))
.handle(Future.succeededFuture(clientRequest));
return null;
}).when(httpClient).request(any(RequestOptions.class), any(Handler.class));
when(httpClient.request(any(), any(), anyInt(), any(), any(), any())).thenReturn(clientRequest);
when(clientRequest.exceptionHandler(any())).thenReturn(clientRequest);
when(clientRequest.putHeader((String) any(), (String) any())).thenAnswer(a -> {
String key = a.getArgument(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -74,7 +73,6 @@
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.rest.ApiJsonMapper;
import io.confluent.ksql.rest.entity.ConnectorList;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.integration.RestIntegrationTestUtil;
Expand All @@ -90,11 +88,9 @@
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.util.AppInfo;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.StructuredTypesDataProvider;
import io.confluent.ksql.util.TestDataProvider;
import io.vertx.core.Vertx;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import java.io.FileOutputStream;
Expand Down Expand Up @@ -1341,7 +1337,7 @@ private static List<KsqlArray> convertToClientRows(
}

// Add header column
addObjectToKsqlArray(expectedRow, new byte[] {23});
expectedRow.add(new byte[] {23});

expectedRows.add(expectedRow);
}
Expand All @@ -1361,33 +1357,11 @@ private static void addObjectToKsqlArray(final KsqlArray array, final Object val
array.add(SqlTimeTypes.formatDate((Date) value));
} else if (value instanceof Time) {
array.add(SqlTimeTypes.formatTime((Time) value));
} else if (value instanceof byte[]) {
array.add(serializeVertX3CompatibleByte((byte[]) value));
} else {
array.add(value);
}
}

/**
* VertX 4 changed to serialize using Base64 without padding but our server
* still encodes the data with padding - this uses the same serializer that
* the server uses to serialize the data into a string. Note that this issue
* only affects tests because the test uses string comparison to compare the
* bytes (it's schemaless, so the byte[] comes back as a string from the server).
*
* @see <a href=https://github.com/eclipse-vertx/vert.x/pull/3197>vertx#3197</a>
*/
private static String serializeVertX3CompatibleByte(final byte[] bytes) {
try {
// writeValueAsString by default adds quotes to both sides to make it valid
// JSON
final String escaped = ApiJsonMapper.INSTANCE.get().writeValueAsString(bytes);
return escaped.substring(1, escaped.length() - 1);
} catch (JsonProcessingException e) {
throw new KsqlException(e);
}
}

private static Matcher<? super StreamInfo> streamForProvider(
final TestDataProvider testDataProvider
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import io.confluent.common.utils.IntegrationTest;
Expand All @@ -49,7 +48,6 @@
import io.confluent.ksql.api.client.util.RowUtil;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.rest.ApiJsonMapper;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.integration.RestIntegrationTestUtil;
import io.confluent.ksql.rest.server.KsqlRestConfig;
Expand All @@ -58,7 +56,6 @@
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.StructuredTypesDataProvider;
import io.confluent.ksql.util.StructuredTypesDataProvider.Batch;
Expand Down Expand Up @@ -363,7 +360,7 @@ private static List<KsqlArray> convertToClientRows(
}

// Add header column
addObjectToKsqlArray(expectedRow, new byte[] {23});
expectedRow.add(new byte[] {23});

expectedRows.add(expectedRow);
}
Expand All @@ -383,31 +380,11 @@ private static void addObjectToKsqlArray(final KsqlArray array, final Object val
array.add(SqlTimeTypes.formatDate((Date) value));
} else if (value instanceof Time) {
array.add(SqlTimeTypes.formatTime((Time) value));
} else if (value instanceof byte[]) {
array.add(serializeVertX3CompatibleByte((byte[]) value));
} else {
array.add(value);
}
}

/**
* VertX 4 changed to serialize using Base64 without padding but our server
* still encodes the data with padding - this uses the same serializer that
* the server uses to serialize the data into a string
*
* @see <a href=https://github.com/eclipse-vertx/vert.x/pull/3197>vertx#3197</a>
*/
private static String serializeVertX3CompatibleByte(final byte[] bytes) {
try {
// writeValueAsString by default adds quotes to both sides to make it valid
// JSON
final String escaped = ApiJsonMapper.INSTANCE.get().writeValueAsString(bytes);
return escaped.substring(1, escaped.length() - 1);
} catch (JsonProcessingException e) {
throw new KsqlException(e);
}
}

private void assertExpectedScalablePushQueries(
final int expectedScalablePushQueries
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.util.VertxUtils;
import io.vertx.core.Context;
import io.vertx.core.Future;
import java.util.Objects;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -66,9 +65,8 @@ public void subscribe(final Subscriber<? super T> subscriber) {
}
}

public Future<Void> close() {
public void close() {
ctx.runOnContext(v -> doClose());
return Future.succeededFuture();
}

@SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "ctx should be mutable")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

/**
* General purpose utils (not limited to the server, could be used by client too) for the API
Expand All @@ -41,20 +40,7 @@ public static void checkContext(final Context context) {
}

public static boolean isEventLoopAndSameContext(final Context context) {
return Context.isOnEventLoopThread()
&& (context == Vertx.currentContext()
|| checkDuplicateContext(Vertx.currentContext(), context)
|| checkDuplicateContext(context, Vertx.currentContext()));
}

private static boolean checkDuplicateContext(final Context context, final Context other) {
// see https://github.com/eclipse-vertx/vert.x/issues/3300 - the recommendation from
// the VertX community is to always call runOnContext() despite the performance overhead
// instead of checking the context. this hack allows us to keep the same pattern we had
// from before the VertX 4 migration
return context instanceof ContextInternal
&& ((ContextInternal) context).isDuplicate()
&& (((ContextInternal) context).unwrap() == other);
return Context.isOnEventLoopThread() && context == Vertx.currentContext();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ static CompletableFuture<RoutingResult> executeOrRouteQuery(
final String thisHostName
) {
if (node.isLocal()) {
LOG.info("Query with id {} executed locally at host {} at timestamp {}.",
pushPhysicalPlanManager.getQueryId(), node.location(),
LOG.info("Query {} id {} executed locally at host {} at timestamp {}.",
statement.getStatementText(), pushPhysicalPlanManager.getQueryId(), node.location(),
System.currentTimeMillis());
scalablePushQueryMetrics
.ifPresent(metrics -> metrics.recordLocalRequests(1));
Expand Down
Loading

0 comments on commit d4214de

Please sign in to comment.