Skip to content

Commit

Permalink
Added tests for query
Browse files Browse the repository at this point in the history
  • Loading branch information
chernser committed Aug 28, 2024
1 parent 552d8b3 commit 4e23300
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 60 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.clickhouse.client;

import com.clickhouse.client.api.Client;
import com.clickhouse.client.api.ClientException;
import com.clickhouse.client.api.ClientFaultCause;
import com.clickhouse.client.api.ConnectionInitiationException;
import com.clickhouse.client.api.ConnectionReuseStrategy;
Expand All @@ -21,6 +22,7 @@
import org.apache.hc.core5.http.ConnectionRequestTimeoutException;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.net.URIBuilder;
import org.testcontainers.utility.ThrowingFunction;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand All @@ -33,13 +35,18 @@
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED;

public class HttpTransportTests extends BaseIntegrationTest{

static {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
}

@Test(groups = {"integration"},dataProvider = "testConnectionTTLProvider")
@SuppressWarnings("java:S2925")
Expand Down Expand Up @@ -219,26 +226,24 @@ public void testSecureConnection() {
}
}

@Test(groups = { "integration" }, enabled = true)
public void testNoHttpResponseFailure() {
@Test(groups = { "integration" }, dataProvider = "NoResponseFailureProvider")
public void testInsertAndNoHttpResponseFailure(String body, int maxRetries, ThrowingFunction<Client, Void> function,
boolean shouldFail) {
WireMockServer faultyServer = new WireMockServer( WireMockConfiguration
.options().port(9090).notifier(new ConsoleNotifier(false)));
faultyServer.start();

byte[] requestBody = ("INSERT INTO table01 FORMAT " +
ClickHouseFormat.TSV.name() + " \n1\t2\t3\n").getBytes();

// First request gets no response
faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl())
.withRequestBody(WireMock.binaryEqualTo(requestBody))
.withRequestBody(WireMock.equalTo(body))
.inScenario("Retry")
.whenScenarioStateIs(STARTED)
.willSetStateTo("Failed")
.willReturn(WireMock.aResponse().withFault(Fault.EMPTY_RESPONSE)).build());

// Second request gets a response (retry)
faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl())
.withRequestBody(WireMock.binaryEqualTo(requestBody))
.withRequestBody(WireMock.equalTo(body))
.inScenario("Retry")
.whenScenarioStateIs("Failed")
.willSetStateTo("Done")
Expand All @@ -250,20 +255,53 @@ public void testNoHttpResponseFailure() {
.addEndpoint(Protocol.HTTP, "localhost", faultyServer.port(), false)
.setUsername("default")
.setPassword("")
.useNewImplementation(true)
// .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true"))
.useNewImplementation(true) // because of the internal differences
.compressClientRequest(false)
.setOption(ClickHouseClientOption.RETRY.getKey(), "2")
.setMaxRetries(maxRetries)
.build();

try {
InsertResponse insertResponse = mockServerClient.insert("table01",
new ByteArrayInputStream("1\t2\t3\n".getBytes()), ClickHouseFormat.TSV).get(30, TimeUnit.SECONDS);
insertResponse.close();
function.apply(mockServerClient);
} catch (ClientException e) {
e.printStackTrace();
if (!shouldFail) {
Assert.fail("Unexpected exception", e);
}
return;
} catch (Exception e) {
Assert.fail("Unexpected exception", e);
} finally {
faultyServer.stop();
}

if (shouldFail) {
Assert.fail("Expected exception");
}
}

@DataProvider(name = "NoResponseFailureProvider")
public static Object[][] noResponseFailureProvider() {

String insertBody = "INSERT INTO table01 FORMAT " + ClickHouseFormat.TSV.name() + " \n1\t2\t3\n";
ThrowingFunction<Client, Void> insertFunction = (client) -> {
InsertResponse insertResponse = client.insert("table01",
new ByteArrayInputStream("1\t2\t3\n".getBytes()), ClickHouseFormat.TSV).get(30, TimeUnit.SECONDS);
insertResponse.close();
return null;
};

String selectBody = "select timezone()";
ThrowingFunction<Client, Void> queryFunction = (client) -> {
QueryResponse response = client.query("select timezone()").get(30, TimeUnit.SECONDS);
response.close();
return null;
};

return new Object[][]{
{insertBody, 1, insertFunction, false},
{selectBody, 1, queryFunction, false},
{insertBody, 0, insertFunction, true},
{selectBody, 0, queryFunction, true}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,51 +169,4 @@ public void insertRawDataSimple(int numberOfRecords) throws Exception {
OperationMetrics metrics = response.getMetrics();
assertEquals((int)response.getWrittenRows(), numberOfRecords );
}

@Test(groups = { "integration" }, enabled = true)
public void testNoHttpResponseFailure() {
WireMockServer faultyServer = new WireMockServer( WireMockConfiguration
.options().port(9090).notifier(new ConsoleNotifier(false)));
faultyServer.start();

byte[] requestBody = ("INSERT INTO table01 FORMAT " +
ClickHouseFormat.TSV.name() + " \n1\t2\t3\n").getBytes();

// First request gets no response
faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl())
.withRequestBody(WireMock.binaryEqualTo(requestBody))
.inScenario("Retry")
.whenScenarioStateIs(STARTED)
.willSetStateTo("Failed")
.willReturn(WireMock.aResponse().withFault(Fault.EMPTY_RESPONSE)).build());

// Second request gets a response (retry)
faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl())
.withRequestBody(WireMock.binaryEqualTo(requestBody))
.inScenario("Retry")
.whenScenarioStateIs("Failed")
.willSetStateTo("Done")
.willReturn(WireMock.aResponse()
.withHeader("X-ClickHouse-Summary",
"{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")).build());

Client mockServerClient = new Client.Builder()
.addEndpoint(Protocol.HTTP, "localhost", faultyServer.port(), false)
.setUsername("default")
.setPassword("")
.useNewImplementation(true)
// .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true"))
.compressClientRequest(false)
.setOption(ClickHouseClientOption.RETRY.getKey(), "2")
.build();
try {
InsertResponse insertResponse = mockServerClient.insert("table01",
new ByteArrayInputStream("1\t2\t3\n".getBytes()), ClickHouseFormat.TSV, settings).get(30, TimeUnit.SECONDS);
insertResponse.close();
} catch (Exception e) {
Assert.fail("Unexpected exception", e);
} finally {
faultyServer.stop();
}
}
}

0 comments on commit 4e23300

Please sign in to comment.