Skip to content

Commit

Permalink
HADOOP-17165. Implement service-user feature in DecayRPCScheduler. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tasanuma authored Sep 9, 2020
1 parent 8511926 commit e5fe326
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@

import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -108,6 +111,13 @@ public class DecayRpcScheduler implements RpcScheduler,
public static final String IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY =
"faircallqueue.decay-scheduler.thresholds";

/**
* Service users will always be scheduled into the highest-priority queue.
* They are specified as a comma-separated list.
*/
public static final String IPC_DECAYSCHEDULER_SERVICE_USERS_KEY =
"decay-scheduler.service-users";

// Specifies the identity to use when the IdentityProvider cannot handle
// a schedulable.
public static final String DECAYSCHEDULER_UNKNOWN_IDENTITY =
Expand Down Expand Up @@ -178,6 +188,7 @@ public class DecayRpcScheduler implements RpcScheduler,
private static final double PRECISION = 0.0001;
private MetricsProxy metricsProxy;
private final CostProvider costProvider;
private Set<String> serviceUserNames;

/**
* This TimerTask will call decayCurrentCosts until
Expand Down Expand Up @@ -229,6 +240,7 @@ public DecayRpcScheduler(int numLevels, String ns, Configuration conf) {
conf);
this.backOffResponseTimeThresholds =
parseBackOffResponseTimeThreshold(ns, conf, numLevels);
this.serviceUserNames = this.parseServiceUserNames(ns, conf);

// Setup response time metrics
responseTimeTotalInCurrWindow = new AtomicLongArray(numLevels);
Expand Down Expand Up @@ -359,6 +371,12 @@ private static double[] parseThresholds(String ns, Configuration conf,
return decimals;
}

private Set<String> parseServiceUserNames(String ns, Configuration conf) {
Collection<String> collection = conf.getStringCollection(
ns + "." + IPC_DECAYSCHEDULER_SERVICE_USERS_KEY);
return new HashSet<>(collection);
}

/**
* Generate default thresholds if user did not specify. Strategy is
* to halve each time, since queue usage tends to be exponential.
Expand Down Expand Up @@ -486,7 +504,7 @@ private void recomputeScheduleCache() {
AtomicLong value = entry.getValue().get(0);

long snapshot = value.get();
int computedLevel = computePriorityLevel(snapshot);
int computedLevel = computePriorityLevel(snapshot, id);

nextCache.put(id, computedLevel);
}
Expand Down Expand Up @@ -534,9 +552,15 @@ private void addCost(Object identity, long costDelta) {
* Given the cost for an identity, compute a scheduling decision.
*
* @param cost the cost for an identity
* @param identity the identity of the user
* @return scheduling decision from 0 to numLevels - 1
*/
private int computePriorityLevel(long cost) {
private int computePriorityLevel(long cost, Object identity) {
// The priority for service users is always 0
if (isServiceUser((String)identity)) {
return 0;
}

long totalCallSnapshot = totalDecayedCallCost.get();

double proportion = 0;
Expand Down Expand Up @@ -576,7 +600,7 @@ private int cachedOrComputedPriorityLevel(Object identity) {
// Cache was no good, compute it
List<AtomicLong> costList = callCosts.get(identity);
long currentCost = costList == null ? 0 : costList.get(0).get();
int priority = computePriorityLevel(currentCost);
int priority = computePriorityLevel(currentCost, identity);
LOG.debug("compute priority for {} priority {}", identity, priority);
return priority;
}
Expand All @@ -598,6 +622,10 @@ public int getPriorityLevel(Schedulable obj) {
return cachedOrComputedPriorityLevel(identity);
}

private boolean isServiceUser(String userName) {
return this.serviceUserNames.contains(userName);
}

@Override
public boolean shouldBackOff(Schedulable obj) {
Boolean backOff = false;
Expand Down Expand Up @@ -698,6 +726,11 @@ long getDecayPeriodMillis() {
return thresholds;
}

@VisibleForTesting
Set<String> getServiceUserNames() {
return serviceUserNames;
}

@VisibleForTesting
void forceDecay() {
decayCurrentCosts();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2584,6 +2584,14 @@
</description>
</property>

<property>
<name>ipc.[port_number].decay-scheduler.service-users</name>
<value></value>
<description>Service users will always be scheduled into the highest-priority
queue. They are specified as a comma-separated list.
</description>
</property>

<property>
<name>ipc.[port_number].weighted-cost.lockshared</name>
<value>10</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ This is configurable via the **identity provider**, which defaults to the **User
provider simply uses the username of the client submitting the request. However, a custom identity provider can be used
to performing throttling based on other groupings, or using an external identity provider.

If particular users submit important requests and you don't want to limit them, you can set them up as the
**service-users**. They are always scheduled into the high-priority queue.

### Cost-based Fair Call Queue

Though the fair call queue itself does a good job of mitigating the impact from users who submit a very high _number_
Expand Down Expand Up @@ -138,6 +141,7 @@ omitted.
| decay-scheduler.backoff.responsetime.enable | DecayRpcScheduler | Whether or not to enable the backoff by response time feature. | false |
| decay-scheduler.backoff.responsetime.thresholds | DecayRpcScheduler | The response time thresholds, as time durations, for each priority queue. If the average response time for a queue is above this threshold, backoff will occur in lower priority queues. This should be a comma-separated list of length equal to the number of priority levels. | Threshold increases by 10s per level (e.g., for 4 levels: `10s,20s,30s,40s`) |
| decay-scheduler.metrics.top.user.count | DecayRpcScheduler | The number of top (i.e., heaviest) users to emit metric information about. | 10 |
| decay-scheduler.service-users | DecayRpcScheduler | Service users will always be scheduled into the highest-priority queue. They are specified as a comma-separated list. | |
| weighted-cost.lockshared | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phase which holds a shared (read) lock. | 10 |
| weighted-cost.lockexclusive | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phase which holds an exclusive (write) lock. | 100 |
| weighted-cost.{handler,lockfree,response} | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phases which do not involve holding a lock. See `org.apache.hadoop.ipc.ProcessingDetails.Timing` for more details on each phase. | 1 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ public void initializeMemberVariables() {
"ipc.[port_number].decay-scheduler.backoff.responsetime.thresholds");
xmlPropsToSkipCompare.add(
"ipc.[port_number].decay-scheduler.metrics.top.user.count");
xmlPropsToSkipCompare.add(
"ipc.[port_number].decay-scheduler.service-users");
xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.lockshared");
xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.lockexclusive");
xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.handler");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@

import static java.lang.Thread.sleep;

import java.util.Map;
import org.eclipse.jetty.util.ajax.JSON;
import org.junit.Test;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -383,4 +388,37 @@ private int getPriorityIncrementCallCount(String callId) {
scheduler.addResponseTime("ignored", mockCall, emptyProcessingDetails);
return priority;
}

@Test
public void testServiceUsers() {
Configuration conf = new Configuration();
conf.setLong("ipc.19."
+ DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999);
conf.set("ipc.19." + DecayRpcScheduler.IPC_DECAYSCHEDULER_SERVICE_USERS_KEY,
"service1,service2");
scheduler = new DecayRpcScheduler(4, "ipc.19", conf);

assertTrue(scheduler.getServiceUserNames().contains("service1"));
assertTrue(scheduler.getServiceUserNames().contains("service2"));

for (int i = 0; i < 10; i++) {
getPriorityIncrementCallCount("user1");
getPriorityIncrementCallCount("service1");
getPriorityIncrementCallCount("service2");
}

assertNotEquals(0, scheduler.getPriorityLevel(mockCall("user1")));
// The priorities of service users should be always 0.
assertEquals(0, scheduler.getPriorityLevel(mockCall("service1")));
assertEquals(0, scheduler.getPriorityLevel(mockCall("service2")));

// DecayRpcScheduler caches priorities after decay
scheduler.forceDecay();
// Check priorities on cache
String summary = scheduler.getSchedulingDecisionSummary();
Map<String, Object> summaryMap = (Map<String, Object>) JSON.parse(summary);
assertNotEquals(0L, summaryMap.get("user1"));
assertEquals(0L, summaryMap.get("service1"));
assertEquals(0L, summaryMap.get("service2"));
}
}

0 comments on commit e5fe326

Please sign in to comment.