Skip to content

Commit

Permalink
Tablets support
Browse files Browse the repository at this point in the history
Introduces basic tablets support for version 4.x of the driver.
Metadata about tablets will be kept in TabletMap that gets continuously updated
through the tablets-routing-v1 extension. Each time the BoundStatement targets
the wrong node and shard combination the server supporting tablets should
respond with tablet metadata inside custom payload of its response.
This metadata will be transparently processed and used for future queries.

Tablets metadata will by enabled by default. Until now driver was using
TokenMaps to choose replicas and appropriate shards. Having a token was enough
information to do that. Now driver will first attempt tablet-based lookup
and only after failing to find corresponding tablet it will defer to TokenMap
lookup. Since to find a correct tablet besides the token we need the keyspace
and table names, many of the methods were extended to also accept those
as parameters.

RequestHandlerTestHarness was adjusted to mock also MetadataManager.
Before it used to mock only `session.getMetadata()` call but the same can
be obtained by `context.getMetadataManager().getMetadata()`. Using the
second method was causing test failures.
  • Loading branch information
Bouncheck committed Jun 17, 2024
1 parent 7acacbb commit 07dfa3e
Show file tree
Hide file tree
Showing 21 changed files with 967 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.datastax.oss.driver.api.core.metadata;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;

/** Simple keyspace name and table name pair. */
public class KeyspaceTableNamePair {
@NonNull private final CqlIdentifier keyspace;
@NonNull private final CqlIdentifier tableName;

public KeyspaceTableNamePair(@NonNull CqlIdentifier keyspace, @NonNull CqlIdentifier tableName) {
this.keyspace = keyspace;
this.tableName = tableName;
}

@NonNull
public CqlIdentifier getKeyspace() {
return keyspace;
}

@NonNull
public CqlIdentifier getTableName() {
return tableName;
}

@Override
public String toString() {
return "KeyspaceTableNamePair{"
+ "keyspace='"
+ keyspace
+ '\''
+ ", tableName='"
+ tableName
+ '\''
+ '}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || !(o instanceof KeyspaceTableNamePair)) return false;
KeyspaceTableNamePair that = (KeyspaceTableNamePair) o;
return keyspace.equals(that.keyspace) && tableName.equals(that.tableName);
}

@Override
public int hashCode() {
return Objects.hash(keyspace, tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ default Optional<KeyspaceMetadata> getKeyspace(@NonNull String keyspaceName) {
@NonNull
Optional<TokenMap> getTokenMap();

/**
* The tablet map for this cluster.
*
* <p>Starts as an empty map that will gradually receive updates on each query of a yet unknown
* tablet.
*/
TabletMap getTabletMap();

/**
* The cluster name to which this session is connected. The Optional returned should contain the
* value from the server for <b>system.local.cluster_name</b>.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.datastax.oss.driver.api.core.metadata;

import com.datastax.oss.driver.shaded.guava.common.annotations.Beta;
import java.util.Set;

/**
* Represents a tablet as described in tablets-routing-v1 protocol extension with some additional
* fields for ease of use.
*/
@Beta
public interface Tablet extends Comparable<Tablet> {
/**
* Returns left endpoint of an interval. This interval is left-open, meaning the tablet does not
* own the token equal to the first token.
*
* @return {@code long} value representing first token.
*/
public long getFirstToken();

/**
* Returns right endpoint of an interval. This interval is right-closed, which means that last
* token is owned by this tablet.
*
* @return {@code long} value representing last token.
*/
public long getLastToken();

public Set<Node> getReplicaNodes();

/**
* Looks up the shard number for specific replica Node.
*
* @param node one of the replica nodes of this tablet.
* @return Shard number for the replica or -1 if no such Node found.
*/
public int getShardForNode(Node node);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.datastax.oss.driver.api.core.metadata;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.shaded.guava.common.annotations.Beta;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;

/** Holds all currently known tablet metadata. */
@Beta
public interface TabletMap {
/**
* Returns mapping from tables to the sets of their tablets.
*
* @return the Map keyed by (keyspace,table) pairs with Set of tablets as value type.
*/
public ConcurrentMap<KeyspaceTableNamePair, ConcurrentSkipListSet<Tablet>> getMapping();

/**
* Adds a single tablet to the map. Handles removal of overlapping tablets.
*
* @param keyspace target keyspace
* @param table target table
* @param tablet tablet instance to add
*/
public void addTablet(CqlIdentifier keyspace, CqlIdentifier table, Tablet tablet);

/**
* Returns {@link Tablet} instance
*
* @param keyspace tablet's keyspace
* @param table tablet's table
* @param token target token
* @return {@link Tablet} responsible for provided token or {@code null} if no such tablet is
* present.
*/
public Tablet getTablet(CqlIdentifier keyspace, CqlIdentifier table, long token);
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ public interface Request {
@Nullable
CqlIdentifier getRoutingKeyspace();

/**
* The table to use for tablet-aware routing. Infers the table from available ColumnDefinitions or
* {@code null} if it is not possible.
*
* @return
*/
@Nullable
default CqlIdentifier getRoutingTable() {
return null;
}

/**
* The partition key to use for token-aware routing.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.datastax.oss.driver.internal.core.protocol.SegmentToFrameDecoder;
import com.datastax.oss.driver.internal.core.protocol.ShardingInfo;
import com.datastax.oss.driver.internal.core.protocol.ShardingInfo.ConnectionShardingInfo;
import com.datastax.oss.driver.internal.core.protocol.TabletInfo;
import com.datastax.oss.driver.internal.core.util.ProtocolUtils;
import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
import com.datastax.oss.protocol.internal.Message;
Expand Down Expand Up @@ -94,6 +95,7 @@ class ProtocolInitHandler extends ConnectInitHandler {
private ChannelHandlerContext ctx;
private final boolean querySupportedOptions;
private LwtInfo lwtInfo;
private TabletInfo tabletInfo;

/**
* @param querySupportedOptions whether to send OPTIONS as the first message, to request which
Expand Down Expand Up @@ -191,6 +193,9 @@ Message getRequest() {
if (lwtInfo != null) {
lwtInfo.addOption(startupOptions);
}
if (tabletInfo != null && tabletInfo.isEnabled()) {
TabletInfo.addOption(startupOptions);
}
return request = new Startup(startupOptions);
case GET_CLUSTER_NAME:
return request = CLUSTER_NAME_QUERY;
Expand Down Expand Up @@ -230,6 +235,7 @@ void onResponse(Message response) {
if (lwtInfo != null) {
channel.attr(LWT_INFO_KEY).set(lwtInfo);
}
tabletInfo = TabletInfo.parseTabletInfo(res.options);
step = Step.STARTUP;
send();
} else if (step == Step.STARTUP && response instanceof Ready) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.Tablet;
import com.datastax.oss.driver.api.core.metadata.TabletMap;
import com.datastax.oss.driver.api.core.metadata.TokenMap;
import com.datastax.oss.driver.api.core.metadata.token.Partitioner;
import com.datastax.oss.driver.api.core.metadata.token.Token;
Expand All @@ -59,8 +61,10 @@
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
import com.datastax.oss.driver.internal.core.metadata.token.DefaultTokenMap;
import com.datastax.oss.driver.internal.core.metadata.token.TokenLong64;
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
import com.datastax.oss.driver.internal.core.protocol.TabletInfo;
import com.datastax.oss.driver.internal.core.session.DefaultSession;
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker;
Expand Down Expand Up @@ -284,6 +288,51 @@ private Token getRoutingToken(Statement statement) {
return tokenMap == null ? null : ((DefaultTokenMap) tokenMap).getTokenFactory().hash(key);
}

public Integer getShardFromTabletMap(Statement statement, Node node, Token token) {
TabletMap tabletMap = context.getMetadataManager().getMetadata().getTabletMap();
if (!(token instanceof TokenLong64)) {
LOG.trace(
"Token ({}) is not a TokenLong64. Not performing tablet shard lookup for statement {}.",
token,
statement);
return null;
}
CqlIdentifier statementKeyspace = statement.getKeyspace();
if (statementKeyspace == null) {
statementKeyspace = statement.getRoutingKeyspace();
}
if (statementKeyspace == null) {
statementKeyspace = this.keyspace;
}
CqlIdentifier statementTable = statement.getRoutingTable();
if (statementKeyspace == null || statementTable == null) {
return null;
}
long tokenValue = ((TokenLong64) token).getValue();

Tablet targetTablet = tabletMap.getTablet(statementKeyspace, statementTable, tokenValue);
if (targetTablet == null) {
LOG.trace(
"Could not determine shard for token {} and table {}.{} on Node {}: Could not find corresponding tablet, returning null.",
token,
statementKeyspace,
statementTable,
node);
return null;
}
int shard = targetTablet.getShardForNode(node);
if (shard == -1) {
LOG.trace(
"Could not find shard corresponding to token {} and Node {} for table {} in keyspace {}. Returning null.",
token,
node,
statementTable,
statementKeyspace);
return null;
}
return shard;
}

/**
* Sends the request to the next available node.
*
Expand All @@ -309,9 +358,20 @@ private void sendRequest(
Node node = retriedNode;
DriverChannel channel = null;
if (node == null
|| (channel = session.getChannel(node, logPrefix, getRoutingToken(statement))) == null) {
|| (channel =
session.getChannel(
node,
logPrefix,
getRoutingToken(statement),
getShardFromTabletMap(statement, node, getRoutingToken(statement))))
== null) {
while (!result.isDone() && (node = queryPlan.poll()) != null) {
channel = session.getChannel(node, logPrefix, getRoutingToken(statement));
channel =
session.getChannel(
node,
logPrefix,
getRoutingToken(statement),
getShardFromTabletMap(statement, node, getRoutingToken(statement)));
if (channel != null) {
break;
} else {
Expand Down Expand Up @@ -420,6 +480,18 @@ private void setFinalResult(
totalLatencyNanos,
TimeUnit.NANOSECONDS);
}
if (resultSet.getColumnDefinitions().size() > 0
&& resultSet
.getExecutionInfo()
.getIncomingPayload()
.containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) {
context
.getMetadataManager()
.addTabletFromPayload(
resultSet.getColumnDefinitions().get(0).getKeyspace(),
resultSet.getColumnDefinitions().get(0).getTable(),
resultSet.getExecutionInfo().getIncomingPayload());
}
}
// log the warnings if they have NOT been disabled
if (!executionInfo.getWarnings().isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,17 @@ public CqlIdentifier getRoutingKeyspace() {
return null;
}

@Override
public CqlIdentifier getRoutingTable() {
for (BatchableStatement<?> statement : statements) {
CqlIdentifier ks = statement.getRoutingTable();
if (ks != null) {
return ks;
}
}
return null;
}

@NonNull
@Override
public BatchStatement setRoutingKeyspace(CqlIdentifier newRoutingKeyspace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ public CqlIdentifier getRoutingKeyspace() {
}
}

@Override
public CqlIdentifier getRoutingTable() {
ColumnDefinitions definitions = preparedStatement.getVariableDefinitions();
return (definitions.size() == 0) ? null : definitions.get(0).getTable();
}

@NonNull
@Override
public BoundStatement setRoutingKeyspace(@Nullable CqlIdentifier newRoutingKeyspace) {
Expand Down
Loading

0 comments on commit 07dfa3e

Please sign in to comment.