diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java index 5b8c3c907b..bb508b726b 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java @@ -24,6 +24,7 @@ package com.datastax.oss.driver.core.resolver; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -33,6 +34,8 @@ import com.datastax.oss.driver.api.core.config.TypedDriverOption; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge; import com.datastax.oss.driver.categories.IsolatedTests; @@ -43,6 +46,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -251,6 +255,201 @@ public void run_replace_test_20_times() { } } + // This is too long to run during CI, but is useful for manual investigations. + @SuppressWarnings("unused") + public void cannot_reconnect_with_resolved_socket() { + DriverConfigLoader loader = + new DefaultProgrammaticDriverConfigLoaderBuilder() + .withBoolean(TypedDriverOption.RESOLVE_CONTACT_POINTS.getRawOption(), false) + .withBoolean(TypedDriverOption.RECONNECT_ON_INIT.getRawOption(), true) + .withStringList( + TypedDriverOption.CONTACT_POINTS.getRawOption(), + Collections.singletonList("test.cluster.fake:9042")) + .build(); + + CqlSessionBuilder builder = new CqlSessionBuilder().withConfigLoader(loader); + CqlSession session; + Collection nodes; + Set filteredNodes; + try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).withIpPrefix("127.0.1.").build()) { + RESOLVER_FACTORY.updateResponse( + "test.cluster.fake", + new ValidResponse( + new InetAddress[] { + getNodeInetAddress(ccmBridge, 1), + getNodeInetAddress(ccmBridge, 2), + getNodeInetAddress(ccmBridge, 3) + })); + ccmBridge.create(); + ccmBridge.start(); + session = builder.build(); + long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000; + while (System.currentTimeMillis() < endTime) { + try { + nodes = session.getMetadata().getNodes().values(); + int upNodes = 0; + for (Node node : nodes) { + if (node.getUpSinceMillis() > 0) { + upNodes++; + } + } + if (upNodes == 3) { + break; + } + // session.refreshSchema(); + SimpleStatement statement = + new SimpleStatementBuilder("SELECT * FROM system.local") + .setTimeout(Duration.ofSeconds(3)) + .build(); + session.executeAsync(statement); + Thread.sleep(3000); + } catch (InterruptedException e) { + break; + } + } + ResultSet rs = session.execute("SELECT * FROM system.local"); + assertThat(rs).isNotNull(); + Row row = rs.one(); + assertThat(row).isNotNull(); + nodes = session.getMetadata().getNodes().values(); + assertThat(nodes).hasSize(3); + Iterator iterator = nodes.iterator(); + while (iterator.hasNext()) { + LOG.trace("Metadata node: " + iterator.next().toString()); + } + filteredNodes = + nodes.stream() + .filter(x -> x.toString().contains("test.cluster.fake")) + .collect(Collectors.toSet()); + assertThat(filteredNodes).hasSize(1); + } + int counter = 0; + while (filteredNodes.size() == 1) { + counter++; + if (counter == 255) { + LOG.error("Completed 254 runs. Breaking."); + break; + } + LOG.warn( + "Launching another cluster until we lose resolved socket from metadata (run {}).", + counter); + try (CcmBridge ccmBridge = + CcmBridge.builder().withNodes(3).withIpPrefix("127.0." + counter + ".").build()) { + RESOLVER_FACTORY.updateResponse( + "test.cluster.fake", + new ValidResponse( + new InetAddress[] { + getNodeInetAddress(ccmBridge, 1), + getNodeInetAddress(ccmBridge, 2), + getNodeInetAddress(ccmBridge, 3) + })); + ccmBridge.create(); + ccmBridge.start(); + long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000; + while (System.currentTimeMillis() < endTime) { + try { + nodes = session.getMetadata().getNodes().values(); + int upNodes = 0; + for (Node node : nodes) { + if (node.getUpSinceMillis() > 0) { + upNodes++; + } + } + if (upNodes == 3) { + break; + } + // session.refreshSchema(); + SimpleStatement statement = + new SimpleStatementBuilder("SELECT * FROM system.local") + .setTimeout(Duration.ofSeconds(3)) + .build(); + session.executeAsync(statement); + Thread.sleep(3000); + } catch (InterruptedException e) { + break; + } + } + /* + ResultSet rs = session.execute("SELECT * FROM system.local"); + assertThat(rs).isNotNull(); + Row row = rs.one(); + assertThat(row).isNotNull(); + */ + nodes = session.getMetadata().getNodes().values(); + assertThat(nodes).hasSize(3); + Iterator iterator = nodes.iterator(); + while (iterator.hasNext()) { + LOG.trace("Metadata node: " + iterator.next().toString()); + } + filteredNodes = + nodes.stream() + .filter(x -> x.toString().contains("test.cluster.fake")) + .collect(Collectors.toSet()); + if (filteredNodes.size() > 1) { + fail( + "Somehow there is more than 1 node in metadata with unresolved hostname. This should not ever happen."); + } + } + } + Iterator iterator = nodes.iterator(); + while (iterator.hasNext()) { + InetSocketAddress address = (InetSocketAddress) iterator.next().getEndPoint().resolve(); + assertFalse(address.isUnresolved()); + } + try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).withIpPrefix("127.1.1.").build()) { + RESOLVER_FACTORY.updateResponse( + "test.cluster.fake", + new ValidResponse( + new InetAddress[] { + getNodeInetAddress(ccmBridge, 1), + getNodeInetAddress(ccmBridge, 2), + getNodeInetAddress(ccmBridge, 3) + })); + // Now the driver should fail to reconnect since unresolved hostname is gone. + ccmBridge.create(); + ccmBridge.start(); + long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000; + while (System.currentTimeMillis() < endTime) { + try { + nodes = session.getMetadata().getNodes().values(); + int upNodes = 0; + for (Node node : nodes) { + if (node.getUpSinceMillis() > 0) { + upNodes++; + } + } + if (upNodes == 3) { + break; + } + // session.refreshSchema(); + SimpleStatement statement = + new SimpleStatementBuilder("SELECT * FROM system.local") + .setTimeout(Duration.ofSeconds(3)) + .build(); + session.executeAsync(statement); + Thread.sleep(3000); + } catch (InterruptedException e) { + break; + } + } + /* + for (int i = 0; i < 15; i++) { + try { + nodes = session.getMetadata().getNodes().values(); + if (nodes.size() == 3) { + break; + } + Thread.sleep(1000); + } catch (InterruptedException e) { + break; + } + } + */ + session.execute("SELECT * FROM system.local"); + } + session.close(); + } + private static InetAddress getNodeInetAddress(CcmBridge ccmBridge, int nodeid) { try { return InetAddress.getByName(ccmBridge.getNodeIpAddress(nodeid));