Skip to content

Commit

Permalink
Add optional fallback for ControlConnection#reconnect()
Browse files Browse the repository at this point in the history
Adds an experimental option to allow `ControlConnection` to try
reconnecting to the original contact points held by `MetadataManager`,
in case of getting empty query plan from the load balancing policy.
  • Loading branch information
Bouncheck committed Sep 12, 2024
1 parent 0519c6f commit 57245e1
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,15 @@ public enum DefaultDriverOption implements DriverOption {
*/
CONTROL_CONNECTION_AGREEMENT_WARN("advanced.control-connection.schema-agreement.warn-on-failure"),

/**
* Whether to forcibly add original contact points held by MetadataManager to the reconnection
* plan, in case there is no live nodes available according to LBP. Experimental.
*
* <p>Value-type: boolean
*/
CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS(
"advanced.control-connection.reconnection.fallback-to-original-contacts"),

/**
* Whether `Session.prepare` calls should be sent to all nodes in the cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_INTERVAL, Duration.ofMillis(200));
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT, Duration.ofSeconds(10));
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, true);
map.put(TypedDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, false);
map.put(TypedDriverOption.PREPARE_ON_ALL_NODES, true);
map.put(TypedDriverOption.REPREPARE_ENABLED, true);
map.put(TypedDriverOption.REPREPARE_CHECK_SYSTEM_TABLE, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,10 @@ public String toString() {
public static final TypedDriverOption<Boolean> CONTROL_CONNECTION_AGREEMENT_WARN =
new TypedDriverOption<>(
DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, GenericType.BOOLEAN);
/** Whether to forcibly try original contacts if no live nodes are available */
public static final TypedDriverOption<Boolean> CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS =
new TypedDriverOption<>(
DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, GenericType.BOOLEAN);
/** Whether `Session.prepare` calls should be sent to all nodes in the cluster. */
public static final TypedDriverOption<Boolean> PREPARE_ON_ALL_NODES =
new TypedDriverOption<>(DefaultDriverOption.PREPARE_ON_ALL_NODES, GenericType.BOOLEAN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ private void init(

private CompletionStage<Boolean> reconnect() {
assert adminExecutor.inEventLoop();
Queue<Node> nodes = context.getLoadBalancingPolicyWrapper().newQueryPlan();
Queue<Node> nodes = context.getLoadBalancingPolicyWrapper().newControlReconnectionQueryPlan();
CompletableFuture<Boolean> result = new CompletableFuture<>();
connect(
nodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package com.datastax.oss.driver.internal.core.metadata;

import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
Expand Down Expand Up @@ -165,6 +166,24 @@ public Queue<Node> newQueryPlan() {
return newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);
}

@NonNull
public Queue<Node> newControlReconnectionQueryPlan() {
// First try the original way
Queue<Node> regularQueryPlan = newQueryPlan();
if (!regularQueryPlan.isEmpty()) return regularQueryPlan;

if (context
.getConfig()
.getDefaultProfile()
.getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS)) {
List<Node> nodes = new ArrayList<>(context.getMetadataManager().getContactPoints());
Collections.shuffle(nodes);
return new ConcurrentLinkedQueue<>(nodes);
} else {
return regularQueryPlan;
}
}

// when it comes in from the outside
private void onNodeStateEvent(NodeStateEvent event) {
eventFilter.accept(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void completeOrReschedule(Set<UUID> uuids, Throwable error) {
f -> {
if (!f.isSuccess()) {
LOG.debug(
"[{}] Error while rescheduling schema agreement, completing now (false)",
"[{}] listesner Error while rescheduling schema agreement, completing now (false)",
logPrefix,
f.cause());
}
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2113,6 +2113,17 @@ datastax-java-driver {
# Overridable in a profile: no
warn-on-failure = true
}

reconnection {
# Whether to forcibly add original contact points held by MetadataManager to the reconnection plan,
# in case there is no live nodes available according to LBP.
# Experimental.
#
# Required: yes
# Modifiable at runtime: yes, the new value will be used for checks issued after the change.
# Overridable in a profile: no
fallback-to-original-contacts = false
}
}

advanced.prepared-statements {
Expand Down

0 comments on commit 57245e1

Please sign in to comment.