Skip to content

Commit

Permalink
Add optional fallback for ControlConnection#reconnect()
Browse files Browse the repository at this point in the history
Adds an experimental option to allow `ControlConnection` to try
reconnecting to the original contact points held by `MetadataManager`,
in case of getting empty query plan from the load balancing policy.

In order to separate this logic from query plans of other queries
`LoadBalancingPolicyWrapper#newControlReconnectionQueryPlan()` was introduced
and is called during reconnection in place of `newQueryPlan()`.
  • Loading branch information
Bouncheck committed Sep 12, 2024
1 parent f319d3d commit 88ed886
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,15 @@ public enum DefaultDriverOption implements DriverOption {
*/
CONTROL_CONNECTION_AGREEMENT_WARN("advanced.control-connection.schema-agreement.warn-on-failure"),

/**
* Whether to forcibly add original contact points held by MetadataManager to the reconnection
* plan, in case there is no live nodes available according to LBP. Experimental.
*
* <p>Value-type: boolean
*/
CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS(
"advanced.control-connection.reconnection.fallback-to-original-contacts"),

/**
* Whether `Session.prepare` calls should be sent to all nodes in the cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_INTERVAL, Duration.ofMillis(200));
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT, Duration.ofSeconds(10));
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, true);
map.put(TypedDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, false);
map.put(TypedDriverOption.PREPARE_ON_ALL_NODES, true);
map.put(TypedDriverOption.REPREPARE_ENABLED, true);
map.put(TypedDriverOption.REPREPARE_CHECK_SYSTEM_TABLE, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,10 @@ public String toString() {
public static final TypedDriverOption<Boolean> CONTROL_CONNECTION_AGREEMENT_WARN =
new TypedDriverOption<>(
DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, GenericType.BOOLEAN);
/** Whether to forcibly try original contacts if no live nodes are available */
public static final TypedDriverOption<Boolean> CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS =
new TypedDriverOption<>(
DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, GenericType.BOOLEAN);
/** Whether `Session.prepare` calls should be sent to all nodes in the cluster. */
public static final TypedDriverOption<Boolean> PREPARE_ON_ALL_NODES =
new TypedDriverOption<>(DefaultDriverOption.PREPARE_ON_ALL_NODES, GenericType.BOOLEAN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ private void init(

private CompletionStage<Boolean> reconnect() {
assert adminExecutor.inEventLoop();
Queue<Node> nodes = context.getLoadBalancingPolicyWrapper().newQueryPlan();
Queue<Node> nodes = context.getLoadBalancingPolicyWrapper().newControlReconnectionQueryPlan();
CompletableFuture<Boolean> result = new CompletableFuture<>();
connect(
nodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package com.datastax.oss.driver.internal.core.metadata;

import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
Expand Down Expand Up @@ -165,6 +166,28 @@ public Queue<Node> newQueryPlan() {
return newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);
}

@NonNull
public Queue<Node> newControlReconnectionQueryPlan() {
// First try the original way
Queue<Node> regularQueryPlan = newQueryPlan();
if (!regularQueryPlan.isEmpty()) return regularQueryPlan;

if (context
.getConfig()
.getDefaultProfile()
.getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS)) {
Set<DefaultNode> originalNodes = context.getMetadataManager().getContactPoints();
List<Node> nodes = new ArrayList<>();
for (DefaultNode node : originalNodes) {
nodes.add(new DefaultNode(node.getEndPoint(), context));
}
Collections.shuffle(nodes);
return new ConcurrentLinkedQueue<>(nodes);
} else {
return regularQueryPlan;
}
}

// when it comes in from the outside
private void onNodeStateEvent(NodeStateEvent event) {
eventFilter.accept(event);
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2113,6 +2113,17 @@ datastax-java-driver {
# Overridable in a profile: no
warn-on-failure = true
}

reconnection {
# Whether to forcibly add original contact points held by MetadataManager to the reconnection plan,
# in case there is no live nodes available according to LBP.
# Experimental.
#
# Required: yes
# Modifiable at runtime: yes, the new value will be used for checks issued after the change.
# Overridable in a profile: no
fallback-to-original-contacts = false
}
}

advanced.prepared-statements {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ public void setup() {
when(defaultProfile.getBoolean(DefaultDriverOption.CONNECTION_WARN_INIT_ERROR))
.thenReturn(false);

when(context.getConfig()).thenReturn(config);
when(config.getDefaultProfile()).thenReturn(defaultProfile);
when(defaultProfile.getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS))
.thenReturn(false);

controlConnection = new ControlConnection(context);
}

Expand All @@ -145,6 +150,15 @@ protected void mockQueryPlan(Node... nodes) {
}
return queryPlan;
});
when(loadBalancingPolicyWrapper.newControlReconnectionQueryPlan())
.thenAnswer(
i -> {
ConcurrentLinkedQueue<Node> queryPlan = new ConcurrentLinkedQueue<>();
for (Node node : nodes) {
queryPlan.offer(node);
}
return queryPlan;
});
}

@After
Expand Down

0 comments on commit 88ed886

Please sign in to comment.