Skip to content

Commit

Permalink
Use the keyspace aware sep queue for everything, call it parameterized (
Browse files Browse the repository at this point in the history
#10)

Range scans now get parameterized with the table name (to avoid e.g.
files range scans ruining the much less frequent other ones); the
assumption is that scans on the same table are heterogeneous.

No change elsewhere.
  • Loading branch information
j-baker authored and tpetracca committed Mar 7, 2019
1 parent c598921 commit f5355d1
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService.FutureTask;
import org.apache.cassandra.db.AbstractRangeCommand;
import org.apache.cassandra.service.IReadCommand;

public final class KeyspaceAwareSepQueue extends AbstractQueue<FutureTask<?>>
public final class ParameterizedSepQueue extends AbstractQueue<FutureTask<?>>
{
private static final Logger log = LoggerFactory.getLogger(KeyspaceAwareSepQueue.class);
private static final ThreadLocal<String> currentKeyspace = new ThreadLocal<>();
private static final Logger log = LoggerFactory.getLogger(ParameterizedSepQueue.class);
private static final ThreadLocal<String> queueingParameter = new ThreadLocal<>();
@GuardedBy("this")
private final Map<String, Queue<FutureTask<?>>> queue = new LinkedHashMap<>();

Expand All @@ -49,8 +51,8 @@ private Queue<FutureTask<?>> queue(String keyspace) {

// If someone didn't set the current keyspace, gracefully fall back to a backup queue.
private static String currentKeyspace() {
String current = currentKeyspace.get();
currentKeyspace.remove();
String current = queueingParameter.get();
queueingParameter.remove();
if (current == null) {
log.info("Saw a command where the current keyspace was not set, which indicates a bug");
return "";
Expand Down Expand Up @@ -95,7 +97,16 @@ public FutureTask<?> peek()
throw new UnsupportedOperationException();
}

public static void setCurrentKeyspace(String keyspace) {
currentKeyspace.set(checkNotNull(keyspace));
/**
* Hypothesis: Gets happen on all tables, so you want to prioritize per keyspace. Scans happen on individual
* tables, and we'd rather indicate that gets to a specific table are expensive rather than all tables; better
* to take out reads to a single table than all.
*/
public static void setQueueingParameter(IReadCommand read) {
if (read instanceof AbstractRangeCommand) {
queueingParameter.set(((AbstractRangeCommand) read).columnFamily);
} else {
queueingParameter.set(read.getKeyspace());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public LocalAwareExecutorService newKeyspaceAwareExecutor(
int maxConcurrency, int maxQueuedTasks, String jmxPath, String name)
{
SEPExecutor executor = new SEPExecutor(
this, new KeyspaceAwareSepQueue(), maxConcurrency, maxQueuedTasks, jmxPath, name);
this, new ParameterizedSepQueue(), maxConcurrency, maxQueuedTasks, jmxPath, name);
executors.add(executor);
return executor;
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/concurrent/StageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class StageManager
{
stages.put(Stage.MUTATION, multiThreadedLowSignalStage(Stage.MUTATION, getConcurrentWriters()));
stages.put(Stage.COUNTER_MUTATION, multiThreadedLowSignalStage(Stage.COUNTER_MUTATION, getConcurrentCounterWriters()));
stages.put(Stage.READ, multiThreadedLowSignalStage(Stage.READ, getConcurrentReaders()));
stages.put(Stage.READ, readStage(Stage.READ, getConcurrentReaders()));
stages.put(Stage.READ_CHEAP, readStage(Stage.READ_CHEAP, getConcurrentReaders()));
stages.put(Stage.REQUEST_RESPONSE, multiThreadedLowSignalStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors()));
stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, FBUtilities.getAvailableProcessors()));
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/net/MessageIn.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.KeyspaceAwareSepQueue;
import org.apache.cassandra.concurrent.ParameterizedSepQueue;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ReadCommand;
Expand Down Expand Up @@ -106,7 +106,7 @@ public Stage getMessageType()
{
if (payload instanceof ReadCommand) {
ReadCommand command = (ReadCommand) payload;
KeyspaceAwareSepQueue.setCurrentKeyspace(command.ksName);
ParameterizedSepQueue.setQueueingParameter(command);
return command.isCheap() ? Stage.READ_CHEAP : Stage.READ;
}
return MessagingService.verbStages.get(verb);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.KeyspaceAwareSepQueue;
import org.apache.cassandra.concurrent.ParameterizedSepQueue;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData.SpeculativeRetry.RetryType;
Expand Down Expand Up @@ -117,7 +117,7 @@ private void makeRequests(ReadCommand readCommand, Iterable<InetAddress> endpoin
if (hasLocalEndpoint)
{
logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data");
KeyspaceAwareSepQueue.setCurrentKeyspace(command.ksName);
ParameterizedSepQueue.setQueueingParameter(command);
StageManager.getStage(stage(command)).maybeExecuteImmediately(new LocalReadRunnable(command, handler));
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/service/StorageProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.ParameterizedSepQueue;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
Expand Down Expand Up @@ -1849,6 +1850,7 @@ public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyL
if (filteredEndpoints.size() == 1
&& filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()))
{
ParameterizedSepQueue.setQueueingParameter(command);
StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler));
}
else
Expand Down

0 comments on commit f5355d1

Please sign in to comment.