From bc9cfb3cb7dd2edd584cf4c22ce2445b98226596 Mon Sep 17 00:00:00 2001 From: Eric Zhao Date: Mon, 18 May 2020 23:00:42 +0800 Subject: [PATCH] Refactor degrade hierarchy with new circuit breaker mechanism and improve strategy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add `CircuitBreaker` abstraction (with half-open state) and add circuit breaker state change event observer support. * Improve circuit breaking strategy (avg RT → slow request ratio) and make statistics of each rule dependent (to support arbitrary statistic interval). * Add simple "trial" mechanism (aka. half-open). * Refactor mechanism of metric recording and state change handling for circuit breakers: record RT and error when requests have completed (i.e. `onExit`, based on #1420). Signed-off-by: Eric Zhao --- .../slots/block/degrade/DegradeRule.java | 165 +++++---------- .../block/degrade/DegradeRuleManager.java | 195 +++++++++++------- .../slots/block/degrade/DegradeSlot.java | 57 ++++- .../AbstractCircuitBreaker.java | 147 +++++++++++++ .../circuitbreaker/CircuitBreaker.java | 79 +++++++ .../CircuitBreakerStateChangeObserver.java | 42 ++++ .../CircuitBreakerStrategy.java | 46 +++++ .../circuitbreaker/EventObserverRegistry.java | 70 +++++++ .../ExceptionCircuitBreaker.java | 156 ++++++++++++++ .../ResponseTimeCircuitBreaker.java | 150 ++++++++++++++ 10 files changed, 901 insertions(+), 206 deletions(-) create mode 100644 sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/AbstractCircuitBreaker.java create mode 100644 sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreaker.java create mode 100644 sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStateChangeObserver.java create mode 100644 sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStrategy.java create mode 100644 sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/EventObserverRegistry.java create mode 100644 sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ExceptionCircuitBreaker.java create mode 100644 sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ResponseTimeCircuitBreaker.java diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRule.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRule.java index aee71ff543..57d20729f5 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRule.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRule.java @@ -15,19 +15,12 @@ */ package com.alibaba.csp.sentinel.slots.block.degrade; -import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; import com.alibaba.csp.sentinel.context.Context; -import com.alibaba.csp.sentinel.node.ClusterNode; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.slots.block.AbstractRule; import com.alibaba.csp.sentinel.slots.block.RuleConstant; -import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import java.util.Objects; /** *

@@ -52,13 +45,10 @@ * * * @author jialiang.linjl + * @author Eric Zhao */ public class DegradeRule extends AbstractRule { - @SuppressWarnings("PMD.ThreadPoolCreationRule") - private static ScheduledExecutorService pool = Executors.newScheduledThreadPool( - Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("sentinel-degrade-reset-task", true)); - public DegradeRule() {} public DegradeRule(String resourceName) { @@ -66,33 +56,34 @@ public DegradeRule(String resourceName) { } /** - * RT threshold or exception ratio threshold count. + * Circuit breaking strategy (0: average RT, 1: exception ratio, 2: exception count). */ - private double count; + private int grade = RuleConstant.DEGRADE_GRADE_RT; /** - * Degrade recover timeout (in seconds) when degradation occurs. + * Threshold count. */ - private int timeWindow; + private double count; /** - * Degrade strategy (0: average RT, 1: exception ratio, 2: exception count). + * Recovery timeout (in seconds) when circuit breaker opens. After the timeout, the circuit breaker will + * transform to half-open state for trying a few requests. */ - private int grade = RuleConstant.DEGRADE_GRADE_RT; + private int timeWindow; /** - * Minimum number of consecutive slow requests that can trigger RT circuit breaking. + * Minimum number of requests (in an active statistic time span) that can trigger circuit breaking. * * @since 1.7.0 */ - private int rtSlowRequestAmount = RuleConstant.DEGRADE_DEFAULT_SLOW_REQUEST_AMOUNT; + private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT; /** - * Minimum number of requests (in an active statistic time span) that can trigger circuit breaking. - * - * @since 1.7.0 + * The threshold of slow request ratio in RT mode. */ - private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT; + private double slowRatioThreshold = 1.0d; + + private int statIntervalMs = 1000; public int getGrade() { return grade; @@ -121,21 +112,30 @@ public DegradeRule setTimeWindow(int timeWindow) { return this; } - public int getRtSlowRequestAmount() { - return rtSlowRequestAmount; + public int getMinRequestAmount() { + return minRequestAmount; } - public DegradeRule setRtSlowRequestAmount(int rtSlowRequestAmount) { - this.rtSlowRequestAmount = rtSlowRequestAmount; + public DegradeRule setMinRequestAmount(int minRequestAmount) { + this.minRequestAmount = minRequestAmount; return this; } - public int getMinRequestAmount() { - return minRequestAmount; + public double getSlowRatioThreshold() { + return slowRatioThreshold; } - public DegradeRule setMinRequestAmount(int minRequestAmount) { - this.minRequestAmount = minRequestAmount; + public DegradeRule setSlowRatioThreshold(double slowRatioThreshold) { + this.slowRatioThreshold = slowRatioThreshold; + return this; + } + + public int getStatIntervalMs() { + return statIntervalMs; + } + + public DegradeRule setStatIntervalMs(int statIntervalMs) { + this.statIntervalMs = statIntervalMs; return this; } @@ -144,23 +144,19 @@ public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } if (!super.equals(o)) { return false; } - DegradeRule that = (DegradeRule) o; - return Double.compare(that.count, count) == 0 && - timeWindow == that.timeWindow && - grade == that.grade && - rtSlowRequestAmount == that.rtSlowRequestAmount && - minRequestAmount == that.minRequestAmount; + DegradeRule rule = (DegradeRule)o; + return Double.compare(rule.count, count) == 0 && + timeWindow == rule.timeWindow && + grade == rule.grade && + minRequestAmount == rule.minRequestAmount && + Double.compare(rule.slowRatioThreshold, slowRatioThreshold) == 0 && + statIntervalMs == rule.statIntervalMs; } @Override public int hashCode() { - int result = super.hashCode(); - result = 31 * result + new Double(count).hashCode(); - result = 31 * result + timeWindow; - result = 31 * result + grade; - result = 31 * result + rtSlowRequestAmount; - result = 31 * result + minRequestAmount; - return result; + return Objects.hash(super.hashCode(), count, timeWindow, grade, minRequestAmount, + slowRatioThreshold, statIntervalMs); } @Override @@ -171,84 +167,15 @@ public String toString() { ", count=" + count + ", limitApp=" + getLimitApp() + ", timeWindow=" + timeWindow + - ", rtSlowRequestAmount=" + rtSlowRequestAmount + ", minRequestAmount=" + minRequestAmount + - "}"; + ", slowRatioThreshold=" + slowRatioThreshold + + ", statIntervalMs=" + statIntervalMs + + '}'; } - // Internal implementation (will be deprecated and moved outside). - - private AtomicLong passCount = new AtomicLong(0); - private final AtomicBoolean cut = new AtomicBoolean(false); - @Override - public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) { - if (cut.get()) { - return false; - } - - ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource()); - if (clusterNode == null) { - return true; - } - - if (grade == RuleConstant.DEGRADE_GRADE_RT) { - double rt = clusterNode.avgRt(); - if (rt < this.count) { - passCount.set(0); - return true; - } - - // Sentinel will degrade the service only if count exceeds. - if (passCount.incrementAndGet() < rtSlowRequestAmount) { - return true; - } - } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) { - double exception = clusterNode.exceptionQps(); - double success = clusterNode.successQps(); - double total = clusterNode.totalQps(); - // If total amount is less than minRequestAmount, the request will pass. - if (total < minRequestAmount) { - return true; - } - - // In the same aligned statistic time window, - // "success" (aka. completed count) = exception count + non-exception count (realSuccess) - double realSuccess = success - exception; - if (realSuccess <= 0 && exception < minRequestAmount) { - return true; - } - - if (exception / success < count) { - return true; - } - } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) { - double exception = clusterNode.totalException(); - if (exception < count) { - return true; - } - } - - if (cut.compareAndSet(false, true)) { - ResetTask resetTask = new ResetTask(this); - pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS); - } - + @Deprecated + public boolean passCheck(Context context, DefaultNode node, int count, Object... args) { return false; } - - private static final class ResetTask implements Runnable { - - private DegradeRule rule; - - ResetTask(DegradeRule rule) { - this.rule = rule; - } - - @Override - public void run() { - rule.passCount.set(0); - rule.cut.set(false); - } - } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRuleManager.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRuleManager.java index 6cb09f8e97..d7d6b6c9f9 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRuleManager.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRuleManager.java @@ -21,29 +21,29 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import com.alibaba.csp.sentinel.config.SentinelConfig; -import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.log.RecordLog; -import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; import com.alibaba.csp.sentinel.property.PropertyListener; import com.alibaba.csp.sentinel.property.SentinelProperty; -import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; -import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker; +import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ExceptionCircuitBreaker; +import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ResponseTimeCircuitBreaker; import com.alibaba.csp.sentinel.util.AssertUtil; import com.alibaba.csp.sentinel.util.StringUtil; /** + * The rule manager for circuit breaking rules ({@link DegradeRule}). + * * @author youji.zj * @author jialiang.linjl * @author Eric Zhao */ public final class DegradeRuleManager { - private static final Map> degradeRules = new ConcurrentHashMap<>(); + private static volatile Map> circuitBreakers = new HashMap<>(); + private static volatile Map> ruleMap = new HashMap<>(); private static final RulePropertyListener LISTENER = new RulePropertyListener(); private static SentinelProperty> currentProperty @@ -69,41 +69,37 @@ public static void register2Property(SentinelProperty> propert } } - public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count) - throws BlockException { - - Set rules = degradeRules.get(resource.getName()); - if (rules == null) { - return; - } - - for (DegradeRule rule : rules) { - if (!rule.passCheck(context, node, count)) { - throw new DegradeException(rule.getLimitApp(), rule); - } - } + static List getCircuitBreakers(String resourceName) { + return circuitBreakers.get(resourceName); } public static boolean hasConfig(String resource) { if (resource == null) { return false; } - return degradeRules.containsKey(resource); + return circuitBreakers.containsKey(resource); } /** - * Get a copy of the rules. + *

Get existing circuit breaking rules.

+ *

Note: DO NOT modify the rules from the returned list directly. + * The behavior is undefined.

* - * @return a new copy of the rules. + * @return list of existing circuit breaking rules, or empty list if no rules were loaded */ public static List getRules() { List rules = new ArrayList<>(); - for (Map.Entry> entry : degradeRules.entrySet()) { + for (Map.Entry> entry : ruleMap.entrySet()) { rules.addAll(entry.getValue()); } return rules; } + public static Set getRulesOfResource(String resource) { + AssertUtil.assertNotBlank(resource, "resource name cannot be blank"); + return ruleMap.get(resource); + } + /** * Load {@link DegradeRule}s, former rules will be replaced. * @@ -113,7 +109,7 @@ public static void loadRules(List rules) { try { currentProperty.updateValue(rules); } catch (Throwable e) { - RecordLog.warn("[DegradeRuleManager] Unexpected error when loading degrade rules", e); + RecordLog.error("[DegradeRuleManager] Unexpected error when loading degrade rules", e); } } @@ -128,7 +124,7 @@ public static void loadRules(List rules) { public static boolean setRulesForResource(String resourceName, Set rules) { AssertUtil.notEmpty(resourceName, "resourceName cannot be empty"); try { - Map> newRuleMap = new HashMap<>(degradeRules); + Map> newRuleMap = new HashMap<>(ruleMap); if (rules == null) { newRuleMap.remove(resourceName); } else { @@ -146,88 +142,127 @@ public static boolean setRulesForResource(String resourceName, Set } return currentProperty.updateValue(allRules); } catch (Throwable e) { - RecordLog.warn( - "[DegradeRuleManager] Unexpected error when setting degrade rules for resource: " + resourceName, e); + RecordLog.error("[DegradeRuleManager] Unexpected error when setting circuit breaking" + + " rules for resource: " + resourceName, e); + return false; + } + } + + private static CircuitBreaker getExistingSameCbOrNew(/*@Valid*/ DegradeRule rule) { + List cbs = getCircuitBreakers(rule.getResource()); + if (cbs == null || cbs.isEmpty()) { + return newCircuitBreakerFrom(rule); + } + for (CircuitBreaker cb : cbs) { + if (rule.equals(cb.getRule())) { + // Reuse the circuit breaker if the rule remains unchanged. + return cb; + } + } + return newCircuitBreakerFrom(rule); + } + + /** + * Create a circuit breaker instance from provided circuit breaking rule. + * + * @param rule a valid circuit breaking rule + * @return new circuit breaker based on provided rule; null if rule is invalid or unsupported type + */ + private static CircuitBreaker newCircuitBreakerFrom(/*@Valid*/ DegradeRule rule) { + switch (rule.getGrade()) { + case RuleConstant.DEGRADE_GRADE_RT: + return new ResponseTimeCircuitBreaker(rule); + case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO: + case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT: + return new ExceptionCircuitBreaker(rule); + default: + return null; + } + } + + public static boolean isValidRule(DegradeRule rule) { + boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource()) + && rule.getCount() >= 0 && rule.getTimeWindow() > 0; + if (!baseValid) { + return false; + } + if (rule.getMinRequestAmount() <= 0 || rule.getStatIntervalMs() <= 0) { return false; } + switch (rule.getGrade()) { + case RuleConstant.DEGRADE_GRADE_RT: + return rule.getSlowRatioThreshold() >= 0 && rule.getSlowRatioThreshold() <= 1; + case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO: + return rule.getCount() <= 1; + case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT: + return true; + default: + return false; + } } private static class RulePropertyListener implements PropertyListener> { + private synchronized void reloadFrom(List list) { + Map> cbs = buildCircuitBreakers(list); + Map> rm = new HashMap<>(cbs.size()); + + for (Map.Entry> e : cbs.entrySet()) { + assert e.getValue() != null && !e.getValue().isEmpty(); + + Set rules = new HashSet<>(e.getValue().size()); + for (CircuitBreaker cb : e.getValue()) { + rules.add(cb.getRule()); + } + rm.put(e.getKey(), rules); + } + + DegradeRuleManager.circuitBreakers = cbs; + DegradeRuleManager.ruleMap = rm; + } + @Override public void configUpdate(List conf) { - Map> rules = loadDegradeConf(conf); - if (rules != null) { - degradeRules.clear(); - degradeRules.putAll(rules); - } - RecordLog.info("[DegradeRuleManager] Degrade rules received: " + degradeRules); + reloadFrom(conf); + RecordLog.info("[DegradeRuleManager] Degrade rules has been updated to: " + ruleMap); } @Override public void configLoad(List conf) { - Map> rules = loadDegradeConf(conf); - if (rules != null) { - degradeRules.clear(); - degradeRules.putAll(rules); - } - RecordLog.info("[DegradeRuleManager] Degrade rules loaded: " + degradeRules); + reloadFrom(conf); + RecordLog.info("[DegradeRuleManager] Degrade rules loaded: " + ruleMap); } - private Map> loadDegradeConf(List list) { - Map> newRuleMap = new ConcurrentHashMap<>(); - + private Map> buildCircuitBreakers(List list) { + Map> cbMap = new HashMap<>(8); if (list == null || list.isEmpty()) { - return newRuleMap; + return cbMap; } - for (DegradeRule rule : list) { if (!isValidRule(rule)) { - RecordLog.warn( - "[DegradeRuleManager] Ignoring invalid degrade rule when loading new rules: " + rule); + RecordLog.warn("[DegradeRuleManager] Ignoring invalid rule when loading new rules: " + rule); continue; } if (StringUtil.isBlank(rule.getLimitApp())) { rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); } - - String identity = rule.getResource(); - Set ruleSet = newRuleMap.get(identity); - if (ruleSet == null) { - ruleSet = new HashSet<>(); - newRuleMap.put(identity, ruleSet); + CircuitBreaker cb = getExistingSameCbOrNew(rule); + if (cb == null) { + RecordLog.warn("[DegradeRuleManager] Unknown circuit breaking strategy, ignoring: " + rule); + continue; } - ruleSet.add(rule); - } - return newRuleMap; - } - } + String resourceName = rule.getResource(); - public static boolean isValidRule(DegradeRule rule) { - boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource()) - && rule.getCount() >= 0 && rule.getTimeWindow() > 0; - if (!baseValid) { - return false; - } - int maxAllowedRt = SentinelConfig.statisticMaxRt(); - if (rule.getGrade() == RuleConstant.DEGRADE_GRADE_RT) { - if (rule.getRtSlowRequestAmount() <= 0) { - return false; - } - // Warn for RT mode that exceeds the {@code TIME_DROP_VALVE}. - if (rule.getCount() > maxAllowedRt) { - RecordLog.warn(String.format("[DegradeRuleManager] WARN: setting large RT threshold (%.1f ms)" - + " in RT mode will not take effect since it exceeds the max allowed value (%d ms)", - rule.getCount(), maxAllowedRt)); + List cbList = cbMap.get(resourceName); + if (cbList == null) { + cbList = new ArrayList<>(); + cbMap.put(resourceName, cbList); + } + cbList.add(cb); } + return cbMap; } - - // Check exception ratio mode. - if (rule.getGrade() == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) { - return rule.getCount() <= 1 && rule.getMinRequestAmount() > 0; - } - return true; } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeSlot.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeSlot.java index 8e8dd19109..9bc8c793aa 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeSlot.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeSlot.java @@ -15,30 +15,73 @@ */ package com.alibaba.csp.sentinel.slots.block.degrade; +import java.util.List; + +import com.alibaba.csp.sentinel.Entry; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker; import com.alibaba.csp.sentinel.spi.SpiOrder; +import com.alibaba.csp.sentinel.util.TimeUtil; /** - * A {@link ProcessorSlot} dedicates to {@link DegradeRule} checking. + * A {@link ProcessorSlot} dedicates to circuit breaking. * - * @author leyou + * @author Carpenter Lee + * @author Eric Zhao */ @SpiOrder(-1000) public class DegradeSlot extends AbstractLinkedProcessorSlot { @Override - public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) - throws Throwable { - DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count); + public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, + boolean prioritized, Object... args) throws Throwable { + performChecking(resourceWrapper); + fireEntry(context, resourceWrapper, node, count, prioritized, args); } + void performChecking(ResourceWrapper r) throws BlockException { + List circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName()); + if (circuitBreakers == null || circuitBreakers.isEmpty()) { + return; + } + for (CircuitBreaker cb : circuitBreakers) { + if (!cb.tryPass()) { + throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule()); + } + } + } + @Override - public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { - fireExit(context, resourceWrapper, count, args); + public void exit(Context context, ResourceWrapper r, int count, Object... args) { + Entry curEntry = context.getCurEntry(); + if (curEntry.getBlockError() != null) { + fireExit(context, r, count, args); + return; + } + List circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName()); + if (circuitBreakers == null || circuitBreakers.isEmpty()) { + fireExit(context, r, count, args); + return; + } + + if (curEntry.getBlockError() == null) { + long completeTime = curEntry.getCompleteTimestamp(); + if (completeTime <= 0) { + completeTime = TimeUtil.currentTimeMillis(); + } + long rt = completeTime - curEntry.getCreateTimestamp(); + Throwable error = curEntry.getError(); + for (CircuitBreaker circuitBreaker : circuitBreakers) { + circuitBreaker.onRequestComplete(rt, error); + } + } + + fireExit(context, r, count, args); } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/AbstractCircuitBreaker.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/AbstractCircuitBreaker.java new file mode 100644 index 0000000000..2d3c2370af --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/AbstractCircuitBreaker.java @@ -0,0 +1,147 @@ +/* + * Copyright 1999-2019 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; + +import java.util.concurrent.atomic.AtomicReference; + +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; +import com.alibaba.csp.sentinel.util.AssertUtil; +import com.alibaba.csp.sentinel.util.TimeUtil; + +/** + * @author Eric Zhao + * @since 1.8.0 + */ +public abstract class AbstractCircuitBreaker implements CircuitBreaker { + + protected final DegradeRule rule; + protected final int recoveryTimeoutMs; + + private final EventObserverRegistry observerRegistry; + + protected final AtomicReference currentState = new AtomicReference<>(State.CLOSED); + protected volatile long nextRetryTimestamp; + + public AbstractCircuitBreaker(DegradeRule rule) { + this(rule, EventObserverRegistry.getInstance()); + } + + AbstractCircuitBreaker(DegradeRule rule, EventObserverRegistry observerRegistry) { + AssertUtil.notNull(observerRegistry, "observerRegistry cannot be null"); + if (!DegradeRuleManager.isValidRule(rule)) { + throw new IllegalArgumentException("Invalid DegradeRule: " + rule); + } + this.observerRegistry = observerRegistry; + this.rule = rule; + this.recoveryTimeoutMs = rule.getTimeWindow() * 1000; + } + + @Override + public DegradeRule getRule() { + return rule; + } + + @Override + public State currentState() { + return currentState.get(); + } + + @Override + public boolean tryPass() { + // Template implementation. + if (currentState.get() == State.CLOSED) { + return true; + } + if (currentState.get() == State.OPEN) { + // For half-open state we allow a request for trial. + return retryTimeoutArrived() && fromOpenToHalfOpen(); + } + return false; + } + + /** + * Reset the statistic data. + */ + abstract void resetStat(); + + protected boolean retryTimeoutArrived() { + return TimeUtil.currentTimeMillis() >= nextRetryTimestamp; + } + + protected void updateNextRetryTimestamp() { + this.nextRetryTimestamp = TimeUtil.currentTimeMillis() + recoveryTimeoutMs; + } + + protected boolean fromCloseToOpen(double snapshotValue) { + State prev = State.CLOSED; + if (currentState.compareAndSet(prev, State.OPEN)) { + updateNextRetryTimestamp(); + + for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) { + observer.onStateChange(prev, State.OPEN, rule, snapshotValue); + } + return true; + } + return false; + } + + protected boolean fromOpenToHalfOpen() { + if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) { + for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) { + observer.onStateChange(State.OPEN, State.HALF_OPEN, rule, null); + } + return true; + } + return false; + } + + protected boolean fromHalfOpenToOpen(double snapshotValue) { + if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) { + updateNextRetryTimestamp(); + for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) { + observer.onStateChange(State.HALF_OPEN, State.OPEN, rule, snapshotValue); + } + return true; + } + return false; + } + + protected boolean fromHalfOpenToClose() { + if (currentState.compareAndSet(State.HALF_OPEN, State.CLOSED)) { + resetStat(); + for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) { + observer.onStateChange(State.HALF_OPEN, State.CLOSED, rule, null); + } + return true; + } + return false; + } + + protected void transformToOpen(double triggerValue) { + State cs = currentState.get(); + switch (cs) { + case CLOSED: + fromCloseToOpen(triggerValue); + break; + case HALF_OPEN: + fromHalfOpenToOpen(triggerValue); + break; + default: + break; + } + } +} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreaker.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreaker.java new file mode 100644 index 0000000000..3e5faf43fc --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreaker.java @@ -0,0 +1,79 @@ +/* + * Copyright 1999-2019 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; + +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; + +/** + *

Basic circuit breaker interface.

+ * + * @author Eric Zhao + */ +public interface CircuitBreaker { + + /** + * Get the associated circuit breaking rule. + * + * @return associated circuit breaking rule + */ + DegradeRule getRule(); + + /** + * Acquires permission of an invocation only if it is available at the time of invocation. + * + * @return {@code true} if permission was acquired and {@code false} otherwise + */ + boolean tryPass(); + + /** + * Get current state of the circuit breaker. + * + * @return current state of the circuit breaker + */ + State currentState(); + + /** + * Record a completed request with the given response time and error (if present) and + * handle state transformation of the circuit breaker. + * + * @param rt the response time of this entry + * @param error the error of this entry (if present) + */ + void onRequestComplete(long rt, Throwable error); + + /** + * Circuit breaker state. + */ + enum State { + /** + * In {@code OPEN} state, all requests will be rejected until the next recovery time point. + */ + OPEN, + /** + * In {@code HALF_OPEN} state, the circuit breaker will allow a "probe" invocation. + * If the invocation is abnormal according to the strategy (e.g. it's slow), the circuit breaker + * will re-transform to the {@code OPEN} state and wait for the next recovery time point; + * otherwise the resource will be regarded as "recovered" and the circuit breaker + * will cease cutting off requests and transform to {@code CLOSED} state. + */ + HALF_OPEN, + /** + * In {@code CLOSED} state, all requests are permitted. When current metric value exceeds the threshold, + * the circuit breaker will transform to {@code OPEN} state. + */ + CLOSED + } +} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStateChangeObserver.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStateChangeObserver.java new file mode 100644 index 0000000000..e7e5b2dc76 --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStateChangeObserver.java @@ -0,0 +1,42 @@ +/* + * Copyright 1999-2019 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; + +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; + +/** + * @author Eric Zhao + * @since 1.8.0 + */ +public interface CircuitBreakerStateChangeObserver { + + /** + *

Observer method triggered when circuit breaker state changed. The transformation could be:

+ *
    + *
  • From {@code CLOSED} to {@code OPEN} (with the triggered metric)
  • + *
  • From {@code OPEN} to {@code HALF_OPEN}
  • + *
  • From {@code OPEN} to {@code CLOSED}
  • + *
  • From {@code HALF_OPEN} to {@code OPEN} (with the triggered metric)
  • + *
+ * + * @param prevState previous state of the circuit breaker + * @param newState new state of the circuit breaker + * @param rule associated rule + * @param snapshotValue triggered value on circuit breaker opens (null if the new state is CLOSED or HALF_OPEN) + */ + void onStateChange(CircuitBreaker.State prevState, CircuitBreaker.State newState, DegradeRule rule, + Double snapshotValue); +} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStrategy.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStrategy.java new file mode 100644 index 0000000000..79d3b12a32 --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStrategy.java @@ -0,0 +1,46 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; + +/** + * @author Eric Zhao + * @since 1.8.0 + */ +public enum CircuitBreakerStrategy { + + /** + * Circuit breaker opens (cuts off) when slow request ratio exceeds the threshold. + */ + SLOW_REQUEST_RATIO(0), + /** + * Circuit breaker opens (cuts off) when error ratio exceeds the threshold. + */ + ERROR_RATIO(1), + /** + * Circuit breaker opens (cuts off) when error count exceeds the threshold. + */ + ERROR_COUNT(2); + + private int type; + + CircuitBreakerStrategy(int type) { + this.type = type; + } + + public int getType() { + return type; + } +} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/EventObserverRegistry.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/EventObserverRegistry.java new file mode 100644 index 0000000000..2243eebaa5 --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/EventObserverRegistry.java @@ -0,0 +1,70 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.alibaba.csp.sentinel.util.AssertUtil; + +/** + *

Registry for circuit breaker event observers.

+ * + * @author Eric Zhao + * @since 1.8.0 + */ +public class EventObserverRegistry { + + private final Map stateChangeObserverMap = new HashMap<>(); + + /** + * Register a circuit breaker state change observer. + * + * @param name observer name + * @param observer a valid observer + */ + public void addStateChangeObserver(String name, CircuitBreakerStateChangeObserver observer) { + AssertUtil.notNull(name, "name cannot be null"); + AssertUtil.notNull(observer, "observer cannot be null"); + stateChangeObserverMap.put(name, observer); + } + + public boolean removeStateChangeObserver(String name) { + AssertUtil.notNull(name, "name cannot be null"); + return stateChangeObserverMap.remove(name) != null; + } + + /** + * Get all registered state chane observers. + * + * @return all registered state chane observers + */ + public List getStateChangeObservers() { + return new ArrayList<>(stateChangeObserverMap.values()); + } + + public static EventObserverRegistry getInstance() { + return InstanceHolder.instance; + } + + private static class InstanceHolder { + private static EventObserverRegistry instance = new EventObserverRegistry(); + } + + EventObserverRegistry() {} +} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ExceptionCircuitBreaker.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ExceptionCircuitBreaker.java new file mode 100644 index 0000000000..d57a037b1d --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ExceptionCircuitBreaker.java @@ -0,0 +1,156 @@ +/* + * Copyright 1999-2019 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; + +import java.util.List; + +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; +import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray; +import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; +import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; +import com.alibaba.csp.sentinel.util.AssertUtil; + +import static com.alibaba.csp.sentinel.slots.block.RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO; +import static com.alibaba.csp.sentinel.slots.block.RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT; + +/** + * @author Eric Zhao + * @since 1.8.0 + */ +public class ExceptionCircuitBreaker extends AbstractCircuitBreaker { + + private final int strategy; + private final int minRequestAmount; + private final double threshold; + + private final LeapArray stat; + + public ExceptionCircuitBreaker(DegradeRule rule) { + this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs())); + } + + ExceptionCircuitBreaker(DegradeRule rule, LeapArray stat) { + super(rule); + this.strategy = rule.getGrade(); + boolean modeOk = strategy == DEGRADE_GRADE_EXCEPTION_RATIO || strategy == DEGRADE_GRADE_EXCEPTION_COUNT; + AssertUtil.isTrue(modeOk, "rule strategy should be error-ratio or error-count"); + AssertUtil.notNull(stat, "stat cannot be null"); + this.minRequestAmount = rule.getMinRequestAmount(); + this.threshold = rule.getCount(); + this.stat = stat; + } + + @Override + protected void resetStat() { + // Reset current bucket (bucket count = 1). + stat.currentWindow().value().reset(); + } + + @Override + public void onRequestComplete(long rt, Throwable error) { + SimpleErrorCounter counter = stat.currentWindow().value(); + if (error != null) { + counter.getErrorCount().add(1); + } + counter.getTotalCount().add(1); + + handleStateChangeWhenThresholdExceeded(error); + } + + private void handleStateChangeWhenThresholdExceeded(Throwable error) { + if (currentState.get() == State.OPEN) { + return; + } + if (currentState.get() == State.HALF_OPEN) { + if (error == null) { + fromHalfOpenToClose(); + } else { + fromHalfOpenToOpen(1.0d); + } + return; + } + List counters = stat.values(); + long errCount = 0; + long totalCount = 0; + for (SimpleErrorCounter counter : counters) { + errCount += counter.errorCount.sum(); + totalCount += counter.totalCount.sum(); + } + if (totalCount < minRequestAmount) { + return; + } + double curCount = errCount; + if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) { + // Use errorRatio + curCount = errCount * 1.0d / totalCount; + } + if (curCount > threshold) { + transformToOpen(curCount); + } + } + + static class SimpleErrorCounter { + private LongAdder errorCount; + private LongAdder totalCount; + + public SimpleErrorCounter() { + this.errorCount = new LongAdder(); + this.totalCount = new LongAdder(); + } + + public LongAdder getErrorCount() { + return errorCount; + } + + public LongAdder getTotalCount() { + return totalCount; + } + + public SimpleErrorCounter reset() { + errorCount.reset(); + totalCount.reset(); + return this; + } + + @Override + public String toString() { + return "SimpleErrorCounter{" + + "errorCount=" + errorCount + + ", totalCount=" + totalCount + + '}'; + } + } + + static class SimpleErrorCounterLeapArray extends LeapArray { + + public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs) { + super(sampleCount, intervalInMs); + } + + @Override + public SimpleErrorCounter newEmptyBucket(long timeMillis) { + return new SimpleErrorCounter(); + } + + @Override + protected WindowWrap resetWindowTo(WindowWrap w, long startTime) { + // Update the start time and reset value. + w.resetTo(startTime); + w.value().reset(); + return w; + } + } +} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ResponseTimeCircuitBreaker.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ResponseTimeCircuitBreaker.java new file mode 100644 index 0000000000..252aaa438f --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ResponseTimeCircuitBreaker.java @@ -0,0 +1,150 @@ +/* + * Copyright 1999-2019 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; + +import java.util.List; + +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; +import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray; +import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; +import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; +import com.alibaba.csp.sentinel.util.AssertUtil; + +/** + * @author Eric Zhao + * @since 1.8.0 + */ +public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker { + + private final long maxAllowedRt; + private final double maxSlowRequestRatio; + private final int minRequestAmount; + + private final LeapArray slidingCounter; + + public ResponseTimeCircuitBreaker(DegradeRule rule) { + this(rule, new SlowRequestLeapArray(1, rule.getStatIntervalMs())); + } + + ResponseTimeCircuitBreaker(DegradeRule rule, LeapArray stat) { + super(rule); + AssertUtil.isTrue(rule.getGrade() == RuleConstant.DEGRADE_GRADE_RT, "rule metric type should be RT"); + AssertUtil.notNull(stat, "stat cannot be null"); + this.maxAllowedRt = Math.round(rule.getCount()); + this.maxSlowRequestRatio = rule.getSlowRatioThreshold(); + this.minRequestAmount = rule.getMinRequestAmount(); + this.slidingCounter = stat; + } + + @Override + public void resetStat() { + // Reset current bucket (bucket count = 1). + slidingCounter.currentWindow().value().reset(); + } + + @Override + public void onRequestComplete(long rt, Throwable error) { + SlowRequestCounter counter = slidingCounter.currentWindow().value(); + if (rt > maxAllowedRt) { + counter.slowCount.add(1); + } + counter.totalCount.add(1); + + handleStateChangeWhenThresholdExceeded(rt); + } + + private void handleStateChangeWhenThresholdExceeded(long rt) { + if (currentState.get() == State.OPEN) { + return; + } + if (currentState.get() == State.HALF_OPEN) { + // TODO: improve logic for half-open recovery + if (rt > maxAllowedRt) { + fromHalfOpenToOpen(1.0d); + } else { + fromHalfOpenToClose(); + } + return; + } + + List counters = slidingCounter.values(); + long slowCount = 0; + long totalCount = 0; + for (SlowRequestCounter counter : counters) { + slowCount += counter.slowCount.sum(); + totalCount += counter.totalCount.sum(); + } + if (totalCount < minRequestAmount) { + return; + } + double currentRatio = slowCount * 1.0d / totalCount; + if (currentRatio > maxSlowRequestRatio) { + transformToOpen(currentRatio); + } + } + + static class SlowRequestCounter { + private LongAdder slowCount; + private LongAdder totalCount; + + public SlowRequestCounter() { + this.slowCount = new LongAdder(); + this.totalCount = new LongAdder(); + } + + public LongAdder getSlowCount() { + return slowCount; + } + + public LongAdder getTotalCount() { + return totalCount; + } + + public SlowRequestCounter reset() { + slowCount.reset(); + totalCount.reset(); + return this; + } + + @Override + public String toString() { + return "SlowRequestCounter{" + + "slowCount=" + slowCount + + ", totalCount=" + totalCount + + '}'; + } + } + + static class SlowRequestLeapArray extends LeapArray { + + public SlowRequestLeapArray(int sampleCount, int intervalInMs) { + super(sampleCount, intervalInMs); + } + + @Override + public SlowRequestCounter newEmptyBucket(long timeMillis) { + return new SlowRequestCounter(); + } + + @Override + protected WindowWrap resetWindowTo(WindowWrap w, long startTime) { + w.resetTo(startTime); + w.value().reset(); + return w; + } + } +}