Skip to content

Commit

Permalink
Add MockResolverIT#cannot_reconnect_with_resolved_socket()
Browse files Browse the repository at this point in the history
Adds an useful method for testing the issues that surface after cluster
replacements. Due to the variable, sometimes long runtime it is not added
to any of the test groups.
  • Loading branch information
Bouncheck committed Sep 12, 2024
1 parent eba900e commit f319d3d
Showing 1 changed file with 199 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package com.datastax.oss.driver.core.resolver;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand All @@ -33,6 +34,8 @@
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge;
import com.datastax.oss.driver.categories.IsolatedTests;
Expand All @@ -43,6 +46,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -251,6 +255,201 @@ public void run_replace_test_20_times() {
}
}

// This is too long to run during CI, but is useful for manual investigations.
@SuppressWarnings("unused")
public void cannot_reconnect_with_resolved_socket() {
DriverConfigLoader loader =
new DefaultProgrammaticDriverConfigLoaderBuilder()
.withBoolean(TypedDriverOption.RESOLVE_CONTACT_POINTS.getRawOption(), false)
.withBoolean(TypedDriverOption.RECONNECT_ON_INIT.getRawOption(), true)
.withStringList(
TypedDriverOption.CONTACT_POINTS.getRawOption(),
Collections.singletonList("test.cluster.fake:9042"))
.build();

CqlSessionBuilder builder = new CqlSessionBuilder().withConfigLoader(loader);
CqlSession session;
Collection<Node> nodes;
Set<Node> filteredNodes;
try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).withIpPrefix("127.0.1.").build()) {
RESOLVER_FACTORY.updateResponse(
"test.cluster.fake",
new ValidResponse(
new InetAddress[] {
getNodeInetAddress(ccmBridge, 1),
getNodeInetAddress(ccmBridge, 2),
getNodeInetAddress(ccmBridge, 3)
}));
ccmBridge.create();
ccmBridge.start();
session = builder.build();
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
while (System.currentTimeMillis() < endTime) {
try {
nodes = session.getMetadata().getNodes().values();
int upNodes = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
upNodes++;
}
}
if (upNodes == 3) {
break;
}
// session.refreshSchema();
SimpleStatement statement =
new SimpleStatementBuilder("SELECT * FROM system.local")
.setTimeout(Duration.ofSeconds(3))
.build();
session.executeAsync(statement);
Thread.sleep(3000);
} catch (InterruptedException e) {
break;
}
}
ResultSet rs = session.execute("SELECT * FROM system.local");
assertThat(rs).isNotNull();
Row row = rs.one();
assertThat(row).isNotNull();
nodes = session.getMetadata().getNodes().values();
assertThat(nodes).hasSize(3);
Iterator<Node> iterator = nodes.iterator();
while (iterator.hasNext()) {
LOG.trace("Metadata node: " + iterator.next().toString());
}
filteredNodes =
nodes.stream()
.filter(x -> x.toString().contains("test.cluster.fake"))
.collect(Collectors.toSet());
assertThat(filteredNodes).hasSize(1);
}
int counter = 0;
while (filteredNodes.size() == 1) {
counter++;
if (counter == 255) {
LOG.error("Completed 254 runs. Breaking.");
break;
}
LOG.warn(
"Launching another cluster until we lose resolved socket from metadata (run {}).",
counter);
try (CcmBridge ccmBridge =
CcmBridge.builder().withNodes(3).withIpPrefix("127.0." + counter + ".").build()) {
RESOLVER_FACTORY.updateResponse(
"test.cluster.fake",
new ValidResponse(
new InetAddress[] {
getNodeInetAddress(ccmBridge, 1),
getNodeInetAddress(ccmBridge, 2),
getNodeInetAddress(ccmBridge, 3)
}));
ccmBridge.create();
ccmBridge.start();
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
while (System.currentTimeMillis() < endTime) {
try {
nodes = session.getMetadata().getNodes().values();
int upNodes = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
upNodes++;
}
}
if (upNodes == 3) {
break;
}
// session.refreshSchema();
SimpleStatement statement =
new SimpleStatementBuilder("SELECT * FROM system.local")
.setTimeout(Duration.ofSeconds(3))
.build();
session.executeAsync(statement);
Thread.sleep(3000);
} catch (InterruptedException e) {
break;
}
}
/*
ResultSet rs = session.execute("SELECT * FROM system.local");
assertThat(rs).isNotNull();
Row row = rs.one();
assertThat(row).isNotNull();
*/
nodes = session.getMetadata().getNodes().values();
assertThat(nodes).hasSize(3);
Iterator<Node> iterator = nodes.iterator();
while (iterator.hasNext()) {
LOG.trace("Metadata node: " + iterator.next().toString());
}
filteredNodes =
nodes.stream()
.filter(x -> x.toString().contains("test.cluster.fake"))
.collect(Collectors.toSet());
if (filteredNodes.size() > 1) {
fail(
"Somehow there is more than 1 node in metadata with unresolved hostname. This should not ever happen.");
}
}
}
Iterator<Node> iterator = nodes.iterator();
while (iterator.hasNext()) {
InetSocketAddress address = (InetSocketAddress) iterator.next().getEndPoint().resolve();
assertFalse(address.isUnresolved());
}
try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).withIpPrefix("127.1.1.").build()) {
RESOLVER_FACTORY.updateResponse(
"test.cluster.fake",
new ValidResponse(
new InetAddress[] {
getNodeInetAddress(ccmBridge, 1),
getNodeInetAddress(ccmBridge, 2),
getNodeInetAddress(ccmBridge, 3)
}));
// Now the driver should fail to reconnect since unresolved hostname is gone.
ccmBridge.create();
ccmBridge.start();
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
while (System.currentTimeMillis() < endTime) {
try {
nodes = session.getMetadata().getNodes().values();
int upNodes = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
upNodes++;
}
}
if (upNodes == 3) {
break;
}
// session.refreshSchema();
SimpleStatement statement =
new SimpleStatementBuilder("SELECT * FROM system.local")
.setTimeout(Duration.ofSeconds(3))
.build();
session.executeAsync(statement);
Thread.sleep(3000);
} catch (InterruptedException e) {
break;
}
}
/*
for (int i = 0; i < 15; i++) {
try {
nodes = session.getMetadata().getNodes().values();
if (nodes.size() == 3) {
break;
}
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
*/
session.execute("SELECT * FROM system.local");
}
session.close();
}

private static InetAddress getNodeInetAddress(CcmBridge ccmBridge, int nodeid) {
try {
return InetAddress.getByName(ccmBridge.getNodeIpAddress(nodeid));
Expand Down

0 comments on commit f319d3d

Please sign in to comment.