diff --git a/codes/distributed/ratelimit/sentinel/README.md b/codes/distributed/ratelimit/sentinel/README.md index 0c852bd4..693a9ccd 100644 --- a/codes/distributed/ratelimit/sentinel/README.md +++ b/codes/distributed/ratelimit/sentinel/README.md @@ -13,7 +13,7 @@ (2)启动 ```shell -java -jar sentinel-dashboard.jar -Dproject.name=sentinel-dashboard --server.port=18080 +java -jar sentinel-dashboard2.jar -Dproject.name=sentinel-dashboard --server.port=18080 ``` ### 启动 Provider 服务 @@ -105,4 +105,3 @@ java -jar sentinel-dashboard.jar -Dproject.name=sentinel-dashboard --server.port } } ``` - diff --git a/codes/distributed/ratelimit/sentinel/basic/pom.xml b/codes/distributed/ratelimit/sentinel/basic/pom.xml new file mode 100644 index 00000000..8404ce7d --- /dev/null +++ b/codes/distributed/ratelimit/sentinel/basic/pom.xml @@ -0,0 +1,54 @@ + + + 4.0.0 + + io.github.dunwu.spring + spring-distributed-ratelimit-sentinel-basic + 1.0.0 + jar + Spring::分布式::流量控制::Sentinel::Basic + + + 1.8 + @ + ${java.version} + ${java.version} + UTF-8 + UTF-8 + + + + + com.alibaba.csp + sentinel-core + 1.8.7 + + + com.alibaba.csp + sentinel-transport-simple-http + 1.8.7 + + + com.alibaba.csp + sentinel-annotation-aspectj + 1.8.7 + + + com.alibaba.csp + sentinel-parameter-flow-control + 1.8.7 + + + + org.apache.httpcomponents + httpclient + 4.5.14 + + + cn.hutool + hutool-all + 5.8.25 + + + diff --git a/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/AsyncEntryDemo.java b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/AsyncEntryDemo.java new file mode 100644 index 00000000..8c721744 --- /dev/null +++ b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/AsyncEntryDemo.java @@ -0,0 +1,220 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.spring.ratelimit.sentinel; + +import com.alibaba.csp.sentinel.AsyncEntry; +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.SphU; +import com.alibaba.csp.sentinel.context.ContextUtil; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * An example for asynchronous entry in Sentinel. + * + * @author Eric Zhao + * @since 0.2.0 + */ +public class AsyncEntryDemo { + + private void invoke(String arg, Consumer handler) { + CompletableFuture.runAsync(() -> { + try { + TimeUnit.SECONDS.sleep(3); + String resp = arg + ": " + System.currentTimeMillis(); + handler.accept(resp); + } catch (Exception ex) { + ex.printStackTrace(); + } + }); + } + + private void anotherAsync() { + try { + final AsyncEntry entry = SphU.asyncEntry("test-another-async"); + + CompletableFuture.runAsync(() -> { + ContextUtil.runOnContext(entry.getAsyncContext(), () -> { + try { + TimeUnit.SECONDS.sleep(2); + // Normal entry nested in asynchronous entry. + anotherSyncInAsync(); + + System.out.println("Async result: 666"); + } catch (InterruptedException e) { + // Ignore. + } finally { + entry.exit(); + } + }); + }); + } catch (BlockException ex) { + ex.printStackTrace(); + } + } + + private void fetchSync() { + Entry entry = null; + try { + entry = SphU.entry("test-sync"); + } catch (BlockException ex) { + ex.printStackTrace(); + } finally { + if (entry != null) { + entry.exit(); + } + } + } + + private void fetchSyncInAsync() { + Entry entry = null; + try { + entry = SphU.entry("test-sync-in-async"); + } catch (BlockException ex) { + ex.printStackTrace(); + } finally { + if (entry != null) { + entry.exit(); + } + } + } + + private void anotherSyncInAsync() { + Entry entry = null; + try { + entry = SphU.entry("test-another-sync-in-async"); + } catch (BlockException ex) { + ex.printStackTrace(); + } finally { + if (entry != null) { + entry.exit(); + } + } + } + + private void directlyAsync() { + try { + final AsyncEntry entry = SphU.asyncEntry("test-async-not-nested"); + + this.invoke("abc", result -> { + // If no nested entry later, we don't have to wrap in `ContextUtil.runOnContext()`. + try { + // Here to handle the async result (without other entry). + } finally { + // Exit the async entry. + entry.exit(); + } + }); + } catch (BlockException e) { + // Request blocked, handle the exception. + e.printStackTrace(); + } + } + + private void doAsyncThenSync() { + try { + // First we call an asynchronous resource. + final AsyncEntry entry = SphU.asyncEntry("test-async"); + this.invoke("abc", resp -> { + // The thread is different from original caller thread for async entry. + // So we need to wrap in the async context so that nested invocation entry + // can be linked to the parent asynchronous entry. + ContextUtil.runOnContext(entry.getAsyncContext(), () -> { + try { + // In the callback, we do another async invocation several times under the async context. + for (int i = 0; i < 7; i++) { + anotherAsync(); + } + + System.out.println(resp); + + // Then we do a sync (normal) entry under current async context. + fetchSyncInAsync(); + } finally { + // Exit the async entry. + entry.exit(); + } + }); + }); + // Then we call a sync resource. + fetchSync(); + } catch (BlockException ex) { + // Request blocked, handle the exception. + ex.printStackTrace(); + } + } + + public static void main(String[] args) throws Exception { + initFlowRule(); + + AsyncEntryDemo service = new AsyncEntryDemo(); + + // Expected invocation chain: + // + // EntranceNode: machine-root + // -EntranceNode: async-context + // --test-top + // ---test-sync + // ---test-async + // ----test-another-async + // -----test-another-sync-in-async + // ----test-sync-in-async + ContextUtil.enter("async-context", "originA"); + Entry entry = null; + try { + entry = SphU.entry("test-top"); + System.out.println("Do something..."); + service.doAsyncThenSync(); + } catch (BlockException ex) { + // Request blocked, handle the exception. + ex.printStackTrace(); + } finally { + if (entry != null) { + entry.exit(); + } + ContextUtil.exit(); + } + + TimeUnit.SECONDS.sleep(20); + } + + private static void initFlowRule() { + // Rule 1 won't take effect as the limitApp doesn't match. + FlowRule rule1 = new FlowRule() + .setResource("test-another-sync-in-async") + .setLimitApp("originB") + .as(FlowRule.class) + .setCount(4) + .setGrade(RuleConstant.FLOW_GRADE_QPS); + // Rule 2 will take effect. + FlowRule rule2 = new FlowRule() + .setResource("test-another-async") + .setLimitApp("default") + .as(FlowRule.class) + .setCount(5) + .setGrade(RuleConstant.FLOW_GRADE_QPS); + List ruleList = Arrays.asList(rule1, rule2); + FlowRuleManager.loadRules(ruleList); + } +} diff --git a/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/authority/AuthorityDemo.java b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/authority/AuthorityDemo.java new file mode 100644 index 00000000..9217b53b --- /dev/null +++ b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/authority/AuthorityDemo.java @@ -0,0 +1,86 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.spring.ratelimit.sentinel.authority; + +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.SphU; +import com.alibaba.csp.sentinel.context.ContextUtil; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRule; +import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRuleManager; + +import java.util.Collections; + +/** + * Authority rule is designed for limiting by request origins. In blacklist mode, + * requests will be blocked when blacklist contains current origin, otherwise will pass. + * In whitelist mode, only requests from whitelist origin can pass. + * + * @author Eric Zhao + */ +public class AuthorityDemo { + + private static final String RESOURCE_NAME = "testABC"; + + public static void main(String[] args) { + System.out.println("========Testing for black list========"); + initBlackRules(); + testFor(RESOURCE_NAME, "appA"); + testFor(RESOURCE_NAME, "appB"); + testFor(RESOURCE_NAME, "appC"); + testFor(RESOURCE_NAME, "appE"); + + System.out.println("========Testing for white list========"); + initWhiteRules(); + testFor(RESOURCE_NAME, "appA"); + testFor(RESOURCE_NAME, "appB"); + testFor(RESOURCE_NAME, "appC"); + testFor(RESOURCE_NAME, "appE"); + } + + private static void testFor(/*@NonNull*/ String resource, /*@NonNull*/ String origin) { + ContextUtil.enter(resource, origin); + Entry entry = null; + try { + entry = SphU.entry(resource); + System.out.println(String.format("Passed for resource %s, origin is %s", resource, origin)); + } catch (BlockException ex) { + System.err.println(String.format("Blocked for resource %s, origin is %s", resource, origin)); + } finally { + if (entry != null) { + entry.exit(); + } + ContextUtil.exit(); + } + } + + private static void initWhiteRules() { + AuthorityRule rule = new AuthorityRule(); + rule.setResource(RESOURCE_NAME); + rule.setStrategy(RuleConstant.AUTHORITY_WHITE); + rule.setLimitApp("appA,appE"); + AuthorityRuleManager.loadRules(Collections.singletonList(rule)); + } + + private static void initBlackRules() { + AuthorityRule rule = new AuthorityRule(); + rule.setResource(RESOURCE_NAME); + rule.setStrategy(RuleConstant.AUTHORITY_BLACK); + rule.setLimitApp("appA,appB"); + AuthorityRuleManager.loadRules(Collections.singletonList(rule)); + } +} diff --git a/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/degrade/ExceptionRatioCircuitBreakerDemo.java b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/degrade/ExceptionRatioCircuitBreakerDemo.java new file mode 100644 index 00000000..f2497b0d --- /dev/null +++ b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/degrade/ExceptionRatioCircuitBreakerDemo.java @@ -0,0 +1,178 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.spring.ratelimit.sentinel.degrade; + +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.SphU; +import com.alibaba.csp.sentinel.Tracer; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; +import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker.State; +import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreakerStrategy; +import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.EventObserverRegistry; +import com.alibaba.csp.sentinel.util.TimeUtil; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author jialiang.linjl + * @author Eric Zhao + */ +public class ExceptionRatioCircuitBreakerDemo { + + private static final String KEY = "some_service"; + + private static AtomicInteger total = new AtomicInteger(); + private static AtomicInteger pass = new AtomicInteger(); + private static AtomicInteger block = new AtomicInteger(); + private static AtomicInteger bizException = new AtomicInteger(); + + private static volatile boolean stop = false; + private static int seconds = 120; + + public static void main(String[] args) throws Exception { + initDegradeRule(); + registerStateChangeObserver(); + startTick(); + + final int concurrency = 8; + for (int i = 0; i < concurrency; i++) { + Thread entryThread = new Thread(() -> { + while (true) { + Entry entry = null; + try { + entry = SphU.entry(KEY); + sleep(ThreadLocalRandom.current().nextInt(5, 10)); + pass.addAndGet(1); + + // Error probability is 45% + if (ThreadLocalRandom.current().nextInt(0, 100) > 55) { + // biz code raise an exception. + throw new RuntimeException("oops"); + } + } catch (BlockException e) { + block.addAndGet(1); + sleep(ThreadLocalRandom.current().nextInt(5, 10)); + } catch (Throwable t) { + bizException.incrementAndGet(); + // It's required to record exception here manually. + Tracer.traceEntry(t, entry); + } finally { + total.addAndGet(1); + if (entry != null) { + entry.exit(); + } + } + } + }); + entryThread.setName("sentinel-simulate-traffic-task-" + i); + entryThread.start(); + } + } + + private static void registerStateChangeObserver() { + EventObserverRegistry.getInstance().addStateChangeObserver("logging", + (prevState, newState, rule, snapshotValue) -> { + if (newState == State.OPEN) { + System.err.println(String.format("%s -> OPEN at %d, snapshotValue=%.2f", prevState.name(), + TimeUtil.currentTimeMillis(), snapshotValue)); + } else { + System.err.println(String.format("%s -> %s at %d", prevState.name(), newState.name(), + TimeUtil.currentTimeMillis())); + } + }); + } + + private static void initDegradeRule() { + List rules = new ArrayList<>(); + DegradeRule rule = new DegradeRule(KEY) + .setGrade(CircuitBreakerStrategy.ERROR_RATIO.getType()) + // Set ratio threshold to 50%. + .setCount(0.5d) + .setStatIntervalMs(30000) + .setMinRequestAmount(50) + // Retry timeout (in second) + .setTimeWindow(10); + rules.add(rule); + DegradeRuleManager.loadRules(rules); + System.out.println("Degrade rule loaded: " + rules); + } + + private static void sleep(int timeMs) { + try { + TimeUnit.MILLISECONDS.sleep(timeMs); + } catch (InterruptedException e) { + // ignore + } + } + + private static void startTick() { + Thread timer = new Thread(new TimerTask()); + timer.setName("sentinel-timer-tick-task"); + timer.start(); + } + + static class TimerTask implements Runnable { + @Override + public void run() { + long start = System.currentTimeMillis(); + System.out.println("Begin to run! Go go go!"); + System.out.println("See corresponding metrics.log for accurate statistic data"); + + long oldTotal = 0; + long oldPass = 0; + long oldBlock = 0; + long oldBizException = 0; + while (!stop) { + sleep(1000); + + long globalTotal = total.get(); + long oneSecondTotal = globalTotal - oldTotal; + oldTotal = globalTotal; + + long globalPass = pass.get(); + long oneSecondPass = globalPass - oldPass; + oldPass = globalPass; + + long globalBlock = block.get(); + long oneSecondBlock = globalBlock - oldBlock; + oldBlock = globalBlock; + + long globalBizException = bizException.get(); + long oneSecondBizException = globalBizException - oldBizException; + oldBizException = globalBizException; + + System.out.println(TimeUtil.currentTimeMillis() + ", oneSecondTotal:" + oneSecondTotal + + ", oneSecondPass:" + oneSecondPass + + ", oneSecondBlock:" + oneSecondBlock + + ", oneSecondBizException:" + oneSecondBizException); + if (seconds-- <= 0) { + stop = true; + } + } + long cost = System.currentTimeMillis() - start; + System.out.println("time cost: " + cost + " ms"); + System.out.println("total: " + total.get() + ", pass:" + pass.get() + + ", block:" + block.get() + ", bizException:" + bizException.get()); + System.exit(0); + } + } +} diff --git a/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/degrade/SlowRatioCircuitBreakerDemo.java b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/degrade/SlowRatioCircuitBreakerDemo.java new file mode 100644 index 00000000..36478f41 --- /dev/null +++ b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/degrade/SlowRatioCircuitBreakerDemo.java @@ -0,0 +1,186 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.spring.ratelimit.sentinel.degrade; + +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.SphU; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; +import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker.State; +import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreakerStrategy; +import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.EventObserverRegistry; +import com.alibaba.csp.sentinel.util.TimeUtil; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Run this demo, and the output will be like: + * + *
+ * 1529399827825,total:0, pass:0, block:0
+ * 1529399828825,total:4263, pass:100, block:4164
+ * 1529399829825,total:19179, pass:4, block:19176 // circuit breaker opens
+ * 1529399830824,total:19806, pass:0, block:19806
+ * 1529399831825,total:19198, pass:0, block:19198
+ * 1529399832824,total:19481, pass:0, block:19481
+ * 1529399833826,total:19241, pass:0, block:19241
+ * 1529399834826,total:17276, pass:0, block:17276
+ * 1529399835826,total:18722, pass:0, block:18722
+ * 1529399836826,total:19490, pass:0, block:19492
+ * 1529399837828,total:19355, pass:0, block:19355
+ * 1529399838827,total:11388, pass:0, block:11388
+ * 1529399839829,total:14494, pass:104, block:14390 // After 10 seconds, the system restored
+ * 1529399840854,total:18505, pass:0, block:18505
+ * 1529399841854,total:19673, pass:0, block:19676
+ * 
+ * + * @author jialiang.linjl + * @author Eric Zhao + */ +public class SlowRatioCircuitBreakerDemo { + + private static final String KEY = "some_method"; + + private static volatile boolean stop = false; + private static int seconds = 120; + + private static AtomicInteger total = new AtomicInteger(); + private static AtomicInteger pass = new AtomicInteger(); + private static AtomicInteger block = new AtomicInteger(); + + public static void main(String[] args) throws Exception { + initDegradeRule(); + registerStateChangeObserver(); + startTick(); + + int concurrency = 8; + for (int i = 0; i < concurrency; i++) { + Thread entryThread = new Thread(() -> { + while (true) { + Entry entry = null; + try { + entry = SphU.entry(KEY); + pass.incrementAndGet(); + // RT: [40ms, 60ms) + sleep(ThreadLocalRandom.current().nextInt(40, 60)); + } catch (BlockException e) { + block.incrementAndGet(); + sleep(ThreadLocalRandom.current().nextInt(5, 10)); + } finally { + total.incrementAndGet(); + if (entry != null) { + entry.exit(); + } + } + } + }); + entryThread.setName("sentinel-simulate-traffic-task-" + i); + entryThread.start(); + } + } + + private static void registerStateChangeObserver() { + EventObserverRegistry.getInstance().addStateChangeObserver("logging", + (prevState, newState, rule, snapshotValue) -> { + if (newState == State.OPEN) { + System.err.println(String.format("%s -> OPEN at %d, snapshotValue=%.2f", prevState.name(), + TimeUtil.currentTimeMillis(), snapshotValue)); + } else { + System.err.println(String.format("%s -> %s at %d", prevState.name(), newState.name(), + TimeUtil.currentTimeMillis())); + } + }); + } + + private static void initDegradeRule() { + List rules = new ArrayList<>(); + DegradeRule rule = new DegradeRule(KEY) + .setGrade(CircuitBreakerStrategy.SLOW_REQUEST_RATIO.getType()) + // Max allowed response time + .setCount(50) + // Retry timeout (in second) + .setTimeWindow(10) + // Circuit breaker opens when slow request ratio > 60% + .setSlowRatioThreshold(0.6) + .setMinRequestAmount(100) + .setStatIntervalMs(20000); + rules.add(rule); + + DegradeRuleManager.loadRules(rules); + System.out.println("Degrade rule loaded: " + rules); + } + + private static void sleep(int timeMs) { + try { + TimeUnit.MILLISECONDS.sleep(timeMs); + } catch (InterruptedException e) { + // ignore + } + } + + private static void startTick() { + Thread timer = new Thread(new TimerTask()); + timer.setName("sentinel-timer-tick-task"); + timer.start(); + } + + static class TimerTask implements Runnable { + @Override + public void run() { + long start = System.currentTimeMillis(); + System.out.println("Begin to run! Go go go!"); + System.out.println("See corresponding metrics.log for accurate statistic data"); + + long oldTotal = 0; + long oldPass = 0; + long oldBlock = 0; + + while (!stop) { + sleep(1000); + + long globalTotal = total.get(); + long oneSecondTotal = globalTotal - oldTotal; + oldTotal = globalTotal; + + long globalPass = pass.get(); + long oneSecondPass = globalPass - oldPass; + oldPass = globalPass; + + long globalBlock = block.get(); + long oneSecondBlock = globalBlock - oldBlock; + oldBlock = globalBlock; + + System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + + ", pass:" + oneSecondPass + ", block:" + oneSecondBlock); + + if (seconds-- <= 0) { + stop = true; + } + } + + long cost = System.currentTimeMillis() - start; + System.out.println("time cost: " + cost + " ms"); + System.out.println("total: " + total.get() + ", pass:" + pass.get() + + ", block:" + block.get()); + System.exit(0); + } + } +} diff --git a/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/FlowQpsDemo.java b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/FlowQpsDemo.java new file mode 100644 index 00000000..8c68ba7a --- /dev/null +++ b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/FlowQpsDemo.java @@ -0,0 +1,162 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.spring.ratelimit.sentinel.flow; + +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.SphU; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import com.alibaba.csp.sentinel.util.TimeUtil; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author jialiang.linjl + */ +public class FlowQpsDemo { + + private static final String KEY = "abc"; + + private static AtomicInteger pass = new AtomicInteger(); + private static AtomicInteger block = new AtomicInteger(); + private static AtomicInteger total = new AtomicInteger(); + + private static volatile boolean stop = false; + + private static final int threadCount = 32; + + private static int seconds = 60 + 40; + + public static void main(String[] args) throws Exception { + initFlowQpsRule(); + + tick(); + // first make the system run on a very low condition + simulateTraffic(); + + System.out.println("===== begin to do flow control"); + System.out.println("only 20 requests per second can pass"); + + } + + private static void initFlowQpsRule() { + List rules = new ArrayList(); + FlowRule rule1 = new FlowRule(); + rule1.setResource(KEY); + // set limit qps to 20 + rule1.setCount(20); + rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); + rule1.setLimitApp("default"); + rules.add(rule1); + FlowRuleManager.loadRules(rules); + } + + private static void simulateTraffic() { + for (int i = 0; i < threadCount; i++) { + Thread t = new Thread(new RunTask()); + t.setName("simulate-traffic-Task"); + t.start(); + } + } + + private static void tick() { + Thread timer = new Thread(new TimerTask()); + timer.setName("sentinel-timer-task"); + timer.start(); + } + + static class TimerTask implements Runnable { + + @Override + public void run() { + long start = System.currentTimeMillis(); + System.out.println("begin to statistic!!!"); + + long oldTotal = 0; + long oldPass = 0; + long oldBlock = 0; + while (!stop) { + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + } + long globalTotal = total.get(); + long oneSecondTotal = globalTotal - oldTotal; + oldTotal = globalTotal; + + long globalPass = pass.get(); + long oneSecondPass = globalPass - oldPass; + oldPass = globalPass; + + long globalBlock = block.get(); + long oneSecondBlock = globalBlock - oldBlock; + oldBlock = globalBlock; + + System.out.println(seconds + " send qps is: " + oneSecondTotal); + System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + + ", pass:" + oneSecondPass + + ", block:" + oneSecondBlock); + + if (seconds-- <= 0) { + stop = true; + } + } + + long cost = System.currentTimeMillis() - start; + System.out.println("time cost: " + cost + " ms"); + System.out.println("total:" + total.get() + ", pass:" + pass.get() + + ", block:" + block.get()); + System.exit(0); + } + } + + static class RunTask implements Runnable { + @Override + public void run() { + while (!stop) { + Entry entry = null; + + try { + entry = SphU.entry(KEY); + // token acquired, means pass + pass.addAndGet(1); + } catch (BlockException e1) { + block.incrementAndGet(); + } catch (Exception e2) { + // biz exception + } finally { + total.incrementAndGet(); + if (entry != null) { + entry.exit(); + } + } + + Random random2 = new Random(); + try { + TimeUnit.MILLISECONDS.sleep(random2.nextInt(50)); + } catch (InterruptedException e) { + // ignore + } + } + } + } +} diff --git a/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/FlowQpsRegexDemo.java b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/FlowQpsRegexDemo.java new file mode 100644 index 00000000..f18318db --- /dev/null +++ b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/FlowQpsRegexDemo.java @@ -0,0 +1,196 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.spring.ratelimit.sentinel.flow; + +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.SphU; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import com.alibaba.csp.sentinel.util.TimeUtil; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class FlowQpsRegexDemo { + + private static final String KEY = "/A/.*"; + + private static Map passMap = new ConcurrentHashMap<>(); + private static Map blockMap = new ConcurrentHashMap<>(); + private static Map totalMap = new ConcurrentHashMap<>(); + + private static final List resourceNameList = Arrays.asList("/A/a", "/A/c", "/B/a"); + + private static volatile boolean stop = false; + + private static final int threadCount = 10; + + private static int seconds = 60 + 40; + + public static void main(String[] args) throws Exception { + initFlowQpsRule(); + + tick(); + // first make the system run on a very low condition + simulateTraffic(); + + System.out.println("===== begin to do flow control"); + System.out.println("Resources prefixed with /A/ can only pass 20 requests per second"); + + } + + private static void initFlowQpsRule() { + List rules = new ArrayList(); + FlowRule rule1 = new FlowRule(); + rule1.setResource(KEY); + // set limit qps to 20 + rule1.setCount(20); + rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); + rule1.setRegex(true); + rule1.setLimitApp("default"); + rules.add(rule1); + FlowRuleManager.loadRules(rules); + } + + private static void simulateTraffic() { + for (String resourceName : resourceNameList) { + passMap.put(resourceName, new AtomicInteger(0)); + blockMap.put(resourceName, new AtomicInteger(0)); + totalMap.put(resourceName, new AtomicInteger(0)); + for (int i = 0; i < threadCount; i++) { + Thread t = new Thread(new RunTask(resourceName)); + t.setName("simulate-traffic-Task-" + resourceName); + t.start(); + } + } + } + + private static void tick() { + Thread timer = new Thread(new TimerTask()); + timer.setName("sentinel-timer-task"); + timer.start(); + } + + static class TimerTask implements Runnable { + + private final Map oldTotalMap = new HashMap<>(); + private final Map oldPassMap = new HashMap<>(); + private final Map oldBlockMap = new HashMap<>(); + + TimerTask() { + for (String resource : resourceNameList) { + oldTotalMap.put(resource, 0L); + oldPassMap.put(resource, 0L); + oldBlockMap.put(resource, 0L); + } + } + + @Override + public void run() { + System.out.println("begin to statistic!!!"); + + while (!stop) { + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + } + for (String resource : resourceNameList) { + long oldTotal = oldTotalMap.get(resource); + long oldPass = oldPassMap.get(resource); + long oldBlock = oldBlockMap.get(resource); + AtomicInteger pass = passMap.get(resource); + AtomicInteger block = blockMap.get(resource); + AtomicInteger total = totalMap.get(resource); + long globalTotal = total.get(); + long oneSecondTotal = globalTotal - oldTotal; + oldTotalMap.put(resource, globalTotal); + + long globalPass = pass.get(); + long oneSecondPass = globalPass - oldPass; + oldPassMap.put(resource, globalPass); + + long globalBlock = block.get(); + long oneSecondBlock = globalBlock - oldBlock; + oldBlockMap.put(resource, globalBlock); + + System.out.println(seconds + " " + resource + " send qps is: " + oneSecondTotal); + System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + + ", pass:" + oneSecondPass + + ", block:" + oneSecondBlock); + } + + if (seconds-- <= 0) { + stop = true; + } + } + System.exit(0); + } + } + + static class RunTask implements Runnable { + + private final String resourceName; + private final AtomicInteger pass; + private final AtomicInteger block; + private final AtomicInteger total; + + RunTask(String resourceName) { + this.resourceName = resourceName; + pass = passMap.get(resourceName); + block = blockMap.get(resourceName); + total = totalMap.get(resourceName); + } + + @Override + public void run() { + while (!stop) { + Entry entry = null; + + try { + entry = SphU.entry(resourceName); + // token acquired, means pass + pass.addAndGet(1); + } catch (BlockException e1) { + block.incrementAndGet(); + } catch (Exception e2) { + // biz exception + } finally { + total.incrementAndGet(); + if (entry != null) { + entry.exit(); + } + } + + Random random2 = new Random(); + try { + TimeUnit.MILLISECONDS.sleep(random2.nextInt(50)); + } catch (InterruptedException e) { + // ignore + } + } + } + } + +} diff --git a/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/FlowThreadDemo.java b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/FlowThreadDemo.java new file mode 100644 index 00000000..c493a302 --- /dev/null +++ b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/FlowThreadDemo.java @@ -0,0 +1,155 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.spring.ratelimit.sentinel.flow; + +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.SphU; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import com.alibaba.csp.sentinel.util.TimeUtil; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author jialiang.linjl + */ +public class FlowThreadDemo { + + private static AtomicInteger pass = new AtomicInteger(); + private static AtomicInteger block = new AtomicInteger(); + private static AtomicInteger total = new AtomicInteger(); + private static AtomicInteger activeThread = new AtomicInteger(); + + private static volatile boolean stop = false; + private static final int threadCount = 100; + + private static int seconds = 60 + 40; + private static volatile int methodBRunningTime = 2000; + + public static void main(String[] args) throws Exception { + System.out.println( + "MethodA will call methodB. After running for a while, methodB becomes fast, " + + "which make methodA also become fast "); + tick(); + initFlowRule(); + + for (int i = 0; i < threadCount; i++) { + Thread entryThread = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + Entry methodA = null; + try { + TimeUnit.MILLISECONDS.sleep(5); + methodA = SphU.entry("methodA"); + activeThread.incrementAndGet(); + Entry methodB = SphU.entry("methodB"); + TimeUnit.MILLISECONDS.sleep(methodBRunningTime); + methodB.exit(); + pass.addAndGet(1); + } catch (BlockException e1) { + block.incrementAndGet(); + } catch (Exception e2) { + // biz exception + } finally { + total.incrementAndGet(); + if (methodA != null) { + methodA.exit(); + activeThread.decrementAndGet(); + } + } + } + } + }); + entryThread.setName("working thread"); + entryThread.start(); + } + } + + private static void initFlowRule() { + List rules = new ArrayList(); + FlowRule rule1 = new FlowRule(); + rule1.setResource("methodA"); + // set limit concurrent thread for 'methodA' to 20 + rule1.setCount(20); + rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD); + rule1.setLimitApp("default"); + + rules.add(rule1); + FlowRuleManager.loadRules(rules); + } + + private static void tick() { + Thread timer = new Thread(new TimerTask()); + timer.setName("sentinel-timer-task"); + timer.start(); + } + + static class TimerTask implements Runnable { + + @Override + public void run() { + long start = System.currentTimeMillis(); + System.out.println("begin to statistic!!!"); + + long oldTotal = 0; + long oldPass = 0; + long oldBlock = 0; + + while (!stop) { + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + } + long globalTotal = total.get(); + long oneSecondTotal = globalTotal - oldTotal; + oldTotal = globalTotal; + + long globalPass = pass.get(); + long oneSecondPass = globalPass - oldPass; + oldPass = globalPass; + + long globalBlock = block.get(); + long oneSecondBlock = globalBlock - oldBlock; + oldBlock = globalBlock; + + System.out.println(seconds + " total qps is: " + oneSecondTotal); + System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + + ", pass:" + oneSecondPass + + ", block:" + oneSecondBlock + + " activeThread:" + activeThread.get()); + if (seconds-- <= 0) { + stop = true; + } + if (seconds == 40) { + System.out.println("method B is running much faster; more requests are allowed to pass"); + methodBRunningTime = 20; + } + } + + long cost = System.currentTimeMillis() - start; + System.out.println("time cost: " + cost + " ms"); + System.out.println("total:" + total.get() + ", pass:" + pass.get() + + ", block:" + block.get()); + System.exit(0); + } + } +} diff --git a/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/PaceFlowDemo.java b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/PaceFlowDemo.java new file mode 100644 index 00000000..10bda7e3 --- /dev/null +++ b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/PaceFlowDemo.java @@ -0,0 +1,204 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.spring.ratelimit.sentinel.flow; + +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.SphU; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import com.alibaba.csp.sentinel.util.TimeUtil; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + *

+ * If {@link com.alibaba.csp.sentinel.slots.block.RuleConstant#CONTROL_BEHAVIOR_RATE_LIMITER} is set, incoming + * requests are passing at regular interval. When a new request arrives, the + * flow rule checks whether the interval between the new request and the + * previous request. If the interval is less than the count set in the rule + * first. If the interval is large, it will pass the request; otherwise, + * sentinel will calculate the waiting time for this request. If the waiting + * time is longer than the {@link com.alibaba.csp.sentinel.slots.block.flow.FlowRule#maxQueueingTimeMs} set in the rule, + * the request will be rejected immediately. + * + * This method is widely used for pulsed flow. When a large amount of flow + * comes, we don't want to pass all these requests at once, which may drag the + * system down. We can make the system handle these requests at a steady pace by + * using this kind of rules. + * + *

+ * This demo demonstrates how to use {@link com.alibaba.csp.sentinel.slots.block.RuleConstant#CONTROL_BEHAVIOR_RATE_LIMITER}. + *

+ * + *

+ * {@link #initPaceFlowRule() } create rules that uses + * {@code CONTROL_BEHAVIOR_RATE_LIMITER}. + *

+ * {@link #simulatePulseFlow()} simulates 100 requests that arrives at almost the + * same time. All these 100 request are passed at a fixed interval. + * + *

+ * Run this demo, results are as follows: + *

+ * pace behavior
+ * ....
+ * 1528872403887 one request pass, cost 9348 ms // every 100 ms pass one request.
+ * 1528872403986 one request pass, cost 9469 ms
+ * 1528872404087 one request pass, cost 9570 ms
+ * 1528872404187 one request pass, cost 9642 ms
+ * 1528872404287 one request pass, cost 9770 ms
+ * 1528872404387 one request pass, cost 9848 ms
+ * 1528872404487 one request pass, cost 9970 ms
+ * ...
+ * done
+ * total pass:100, total block:0
+ * 
+ * + * Then we invoke {@link #initDefaultFlowRule()} to set rules with default behavior, and only 10 + * requests will be allowed to pass, other requests will be rejected immediately. + *

+ * The output will be like: + *

+ * default behavior
+ * 1530500101279 one request pass, cost 0 ms
+ * 1530500101279 one request pass, cost 0 ms
+ * 1530500101279 one request pass, cost 0 ms
+ * 1530500101279 one request pass, cost 0 ms
+ * 1530500101279 one request pass, cost 0 ms
+ * 1530500101279 one request pass, cost 0 ms
+ * 1530500101280 one request pass, cost 1 ms
+ * 1530500101280 one request pass, cost 0 ms
+ * 1530500101280 one request pass, cost 0 ms
+ * 1530500101280 one request pass, cost 0 ms
+ * done
+ * total pass:10, total block:90 // 10 requests passed, other 90 requests rejected immediately.
+ * 
+ * + * @author jialiang.linjl + */ +public class PaceFlowDemo { + + private static final String KEY = "abc"; + + private static volatile CountDownLatch countDown; + + private static final Integer requestQps = 100; + private static final Integer count = 10; + private static final AtomicInteger done = new AtomicInteger(); + private static final AtomicInteger pass = new AtomicInteger(); + private static final AtomicInteger block = new AtomicInteger(); + + public static void main(String[] args) throws InterruptedException { + System.out.println("pace behavior"); + countDown = new CountDownLatch(1); + initPaceFlowRule(); + simulatePulseFlow(); + countDown.await(); + + System.out.println("done"); + System.out.println("total pass:" + pass.get() + ", total block:" + block.get()); + + System.out.println(); + System.out.println("default behavior"); + TimeUnit.SECONDS.sleep(5); + done.set(0); + pass.set(0); + block.set(0); + countDown = new CountDownLatch(1); + initDefaultFlowRule(); + simulatePulseFlow(); + countDown.await(); + System.out.println("done"); + System.out.println("total pass:" + pass.get() + ", total block:" + block.get()); + System.exit(0); + } + + private static void initPaceFlowRule() { + List rules = new ArrayList(); + FlowRule rule1 = new FlowRule(); + rule1.setResource(KEY); + rule1.setCount(count); + rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); + rule1.setLimitApp("default"); + /* + * CONTROL_BEHAVIOR_RATE_LIMITER means requests more than threshold will be queueing in the queue, + * until the queueing time is more than {@link FlowRule#maxQueueingTimeMs}, the requests will be rejected. + */ + rule1.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER); + rule1.setMaxQueueingTimeMs(20 * 1000); + + rules.add(rule1); + FlowRuleManager.loadRules(rules); + } + + private static void initDefaultFlowRule() { + List rules = new ArrayList(); + FlowRule rule1 = new FlowRule(); + rule1.setResource(KEY); + rule1.setCount(count); + rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); + rule1.setLimitApp("default"); + // CONTROL_BEHAVIOR_DEFAULT means requests more than threshold will be rejected immediately. + rule1.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT); + + rules.add(rule1); + FlowRuleManager.loadRules(rules); + } + + private static void simulatePulseFlow() { + for (int i = 0; i < requestQps; i++) { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + long startTime = TimeUtil.currentTimeMillis(); + Entry entry = null; + try { + entry = SphU.entry(KEY); + } catch (BlockException e1) { + block.incrementAndGet(); + } catch (Exception e2) { + // biz exception + } finally { + if (entry != null) { + entry.exit(); + pass.incrementAndGet(); + long cost = TimeUtil.currentTimeMillis() - startTime; + System.out.println( + TimeUtil.currentTimeMillis() + " one request pass, cost " + cost + " ms"); + } + } + + try { + TimeUnit.MILLISECONDS.sleep(5); + } catch (InterruptedException e1) { + // ignore + } + + if (done.incrementAndGet() >= requestQps) { + countDown.countDown(); + } + } + }, "Thread " + i); + thread.start(); + } + } +} diff --git a/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/WarmUpFlowDemo.java b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/WarmUpFlowDemo.java new file mode 100644 index 00000000..2b92c714 --- /dev/null +++ b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/WarmUpFlowDemo.java @@ -0,0 +1,224 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.spring.ratelimit.sentinel.flow; + +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.SphU; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import com.alibaba.csp.sentinel.util.TimeUtil; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * When {@link com.alibaba.csp.sentinel.slots.block.flow.FlowRule#controlBehavior} set to {@link com.alibaba.csp.sentinel.slots.block.RuleConstant#CONTROL_BEHAVIOR_WARM_UP}, real passed qps will + * gradually increase to {@link com.alibaba.csp.sentinel.slots.block.flow.FlowRule#count}, other than burst increasing. + *

+ * Run this demo, results are as follows: + *

+ * ...
+ * 1530497805902, total:1, pass:1, block:0 // run in slow qps
+ * 1530497806905, total:3, pass:3, block:0
+ * 1530497807909, total:2, pass:2, block:0
+ * 1530497808913, total:3, pass:3, block:0
+ * 1530497809917, total:269, pass:6, block:263 // request qps burst increase, warm up behavior triggered.
+ * 1530497810917, total:3676, pass:7, block:3669
+ * 1530497811919, total:3734, pass:9, block:3725
+ * 1530497812920, total:3692, pass:9, block:3683
+ * 1530497813923, total:3642, pass:10, block:3632
+ * 1530497814926, total:3685, pass:10, block:3675
+ * 1530497815930, total:3671, pass:11, block:3660
+ * 1530497816933, total:3660, pass:15, block:3645
+ * 1530497817936, total:3681, pass:21, block:3661 // warm up process end, pass qps increased to {@link com.alibaba.csp.sentinel.slots.block.flow.FlowRule#count}
+ * 1530497818940, total:3737, pass:20, block:3716
+ * 1530497819945, total:3663, pass:20, block:3643
+ * 1530497820950, total:3723, pass:21, block:3702
+ * 1530497821954, total:3680, pass:20, block:3660
+ * ...
+ * 
+ * + * @author jialiang.linjl + */ +public class WarmUpFlowDemo { + + private static final String KEY = "abc"; + + private static AtomicInteger pass = new AtomicInteger(); + private static AtomicInteger block = new AtomicInteger(); + private static AtomicInteger total = new AtomicInteger(); + + private static volatile boolean stop = false; + + private static final int threadCount = 100; + private static int seconds = 60 + 40; + + public static void main(String[] args) throws Exception { + initFlowRule(); + // trigger Sentinel internal init + Entry entry = null; + try { + entry = SphU.entry(KEY); + } catch (Exception e) { + } finally { + if (entry != null) { + entry.exit(); + } + } + + Thread timer = new Thread(new TimerTask()); + timer.setName("sentinel-timer-task"); + timer.start(); + + //first make the system run on a very low condition + for (int i = 0; i < 3; i++) { + Thread t = new Thread(new WarmUpTask()); + t.setName("sentinel-warmup-task"); + t.start(); + } + Thread.sleep(20000); + + /* + * Start more thread to simulate more qps. Since we use {@link RuleConstant.CONTROL_BEHAVIOR_WARM_UP} as + * {@link FlowRule#controlBehavior}, real passed qps will increase to {@link FlowRule#count} in + * {@link FlowRule#warmUpPeriodSec} seconds. + */ + for (int i = 0; i < threadCount; i++) { + Thread t = new Thread(new RunTask()); + t.setName("sentinel-run-task"); + t.start(); + } + } + + private static void initFlowRule() { + List rules = new ArrayList(); + FlowRule rule1 = new FlowRule(); + rule1.setResource(KEY); + rule1.setCount(20); + rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); + rule1.setLimitApp("default"); + rule1.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP); + rule1.setWarmUpPeriodSec(10); + + rules.add(rule1); + FlowRuleManager.loadRules(rules); + } + + static class WarmUpTask implements Runnable { + @Override + public void run() { + while (!stop) { + Entry entry = null; + try { + entry = SphU.entry(KEY); + // token acquired, means pass + pass.addAndGet(1); + } catch (BlockException e1) { + block.incrementAndGet(); + } catch (Exception e2) { + // biz exception + } finally { + total.incrementAndGet(); + if (entry != null) { + entry.exit(); + } + } + Random random2 = new Random(); + try { + TimeUnit.MILLISECONDS.sleep(random2.nextInt(2000)); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + static class RunTask implements Runnable { + @Override + public void run() { + while (!stop) { + Entry entry = null; + try { + entry = SphU.entry(KEY); + pass.addAndGet(1); + } catch (BlockException e1) { + block.incrementAndGet(); + } catch (Exception e2) { + // biz exception + } finally { + total.incrementAndGet(); + if (entry != null) { + entry.exit(); + } + } + Random random2 = new Random(); + try { + TimeUnit.MILLISECONDS.sleep(random2.nextInt(50)); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + static class TimerTask implements Runnable { + + @Override + public void run() { + long start = System.currentTimeMillis(); + System.out.println("begin to statistic!!!"); + long oldTotal = 0; + long oldPass = 0; + long oldBlock = 0; + while (!stop) { + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + } + + long globalTotal = total.get(); + long oneSecondTotal = globalTotal - oldTotal; + oldTotal = globalTotal; + + long globalPass = pass.get(); + long oneSecondPass = globalPass - oldPass; + oldPass = globalPass; + + long globalBlock = block.get(); + long oneSecondBlock = globalBlock - oldBlock; + oldBlock = globalBlock; + + System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + + ", pass:" + oneSecondPass + + ", block:" + oneSecondBlock); + if (seconds-- <= 0) { + stop = true; + } + } + + long cost = System.currentTimeMillis() - start; + System.out.println("time cost: " + cost + " ms"); + System.out.println("total:" + total.get() + ", pass:" + pass.get() + + ", block:" + block.get()); + System.exit(0); + } + } +} diff --git a/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/WarmUpRateLimiterFlowDemo.java b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/WarmUpRateLimiterFlowDemo.java new file mode 100644 index 00000000..775f21ab --- /dev/null +++ b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/flow/WarmUpRateLimiterFlowDemo.java @@ -0,0 +1,214 @@ +package example.spring.ratelimit.sentinel.flow; + +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.SphU; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import com.alibaba.csp.sentinel.util.TimeUtil; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * When {@link com.alibaba.csp.sentinel.slots.block.flow.FlowRule#controlBehavior} set to {@link com.alibaba.csp.sentinel.slots.block.RuleConstant#CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER}, real passed + * qps will gradually increase to {@link com.alibaba.csp.sentinel.slots.block.flow.FlowRule#count}, other than burst increasing, and after the passed qps reaches + * the threshold, the request will pass at a constant interval. + *

+ * In short, {@link com.alibaba.csp.sentinel.slots.block.RuleConstant#CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER} behaves like + * {@link com.alibaba.csp.sentinel.slots.block.RuleConstant#CONTROL_BEHAVIOR_WARM_UP} + {@link com.alibaba.csp.sentinel.slots.block.RuleConstant#CONTROL_BEHAVIOR_RATE_LIMITER}. + *

+ * + *

+ * Run this demo, results are as follows: + *

+ * ...
+ * 1541035848056, total:5, pass:5, block:0 // run in slow qps
+ * 1541035849061, total:0, pass:0, block:0
+ * 1541035850066, total:6, pass:6, block:0
+ * 1541035851068, total:2, pass:2, block:0
+ * 1541035852073, total:3, pass:3, block:0
+ * 1541035853078, total:3361, pass:7, block:3354 // request qps burst increase, warm up behavior triggered.
+ * 1541035854083, total:3414, pass:7, block:3407
+ * 1541035855087, total:3377, pass:7, block:3370
+ * 1541035856091, total:3366, pass:8, block:3358
+ * 1541035857096, total:3259, pass:8, block:3251
+ * 1541035858101, total:3066, pass:13, block:3054
+ * 1541035859105, total:3042, pass:15, block:3026
+ * 1541035860109, total:2946, pass:17, block:2929
+ * 1541035861113, total:2909, pass:20, block:2889 // warm up process end, pass qps increased to {@link com.alibaba.csp.sentinel.slots.block.flow.FlowRule#count}
+ * 1541035862117, total:2970, pass:20, block:2950
+ * 1541035863122, total:2919, pass:20, block:2899
+ * 1541035864127, total:2903, pass:21, block:2882
+ * 1541035865133, total:2930, pass:20, block:2910
+ * ...
+ * 
+ * + * @author CarpenterLee + * @see WarmUpFlowDemo + * @see PaceFlowDemo + */ +public class WarmUpRateLimiterFlowDemo { + private static final String KEY = "abc"; + + private static AtomicInteger pass = new AtomicInteger(); + private static AtomicInteger block = new AtomicInteger(); + private static AtomicInteger total = new AtomicInteger(); + + private static volatile boolean stop = false; + + private static final int threadCount = 100; + private static int seconds = 100; + + public static void main(String[] args) throws Exception { + initFlowRule(); + // trigger Sentinel internal init + Entry entry = null; + try { + entry = SphU.entry(KEY); + } catch (Exception e) { + } finally { + if (entry != null) { + entry.exit(); + } + } + + Thread timer = new Thread(new TimerTask()); + timer.setName("sentinel-timer-task"); + timer.start(); + + //first make the system run on a very low condition + for (int i = 0; i < 3; i++) { + Thread t = new Thread(new SlowTask()); + t.setName("sentinel-slow-task"); + t.start(); + } + Thread.sleep(5000); + + // request qps burst increase, warm up behavior triggered. + for (int i = 0; i < threadCount; i++) { + Thread t = new Thread(new RunTask()); + t.setName("sentinel-run-task"); + t.start(); + } + } + + private static void initFlowRule() { + List rules = new ArrayList(); + FlowRule rule1 = new FlowRule(); + rule1.setResource(KEY); + rule1.setCount(20); + rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); + rule1.setLimitApp("default"); + rule1.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER); + rule1.setWarmUpPeriodSec(10); + rule1.setMaxQueueingTimeMs(100); + + rules.add(rule1); + FlowRuleManager.loadRules(rules); + } + + static class SlowTask implements Runnable { + @Override + public void run() { + while (!stop) { + Entry entry = null; + try { + entry = SphU.entry(KEY); + // token acquired, means pass + pass.addAndGet(1); + } catch (BlockException e1) { + block.incrementAndGet(); + } catch (Exception e2) { + // biz exception + } finally { + total.incrementAndGet(); + if (entry != null) { + entry.exit(); + } + } + Random random2 = new Random(); + try { + TimeUnit.MILLISECONDS.sleep(random2.nextInt(2000)); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + static class RunTask implements Runnable { + @Override + public void run() { + while (!stop) { + Entry entry = null; + try { + entry = SphU.entry(KEY); + pass.addAndGet(1); + } catch (BlockException e1) { + block.incrementAndGet(); + } catch (Exception e2) { + // biz exception + } finally { + total.incrementAndGet(); + if (entry != null) { + entry.exit(); + } + } + Random random2 = new Random(); + try { + TimeUnit.MILLISECONDS.sleep(random2.nextInt(50)); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + static class TimerTask implements Runnable { + + @Override + public void run() { + long start = System.currentTimeMillis(); + System.out.println("begin to statistic!!!"); + long oldTotal = 0; + long oldPass = 0; + long oldBlock = 0; + while (!stop) { + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + } + + long globalTotal = total.get(); + long oneSecondTotal = globalTotal - oldTotal; + oldTotal = globalTotal; + + long globalPass = pass.get(); + long oneSecondPass = globalPass - oldPass; + oldPass = globalPass; + + long globalBlock = block.get(); + long oneSecondBlock = globalBlock - oldBlock; + oldBlock = globalBlock; + + System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + + ", pass:" + oneSecondPass + + ", block:" + oneSecondBlock); + if (seconds-- <= 0) { + stop = true; + } + } + + long cost = System.currentTimeMillis() - start; + System.out.println("time cost: " + cost + " ms"); + System.out.println("total:" + total.get() + ", pass:" + pass.get() + + ", block:" + block.get()); + System.exit(0); + } + } +} diff --git a/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/system/SystemGuardDemo.java b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/system/SystemGuardDemo.java new file mode 100644 index 00000000..22c943d3 --- /dev/null +++ b/codes/distributed/ratelimit/sentinel/basic/src/main/java/example/spring/ratelimit/sentinel/system/SystemGuardDemo.java @@ -0,0 +1,143 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.spring.ratelimit.sentinel.system; + +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.EntryType; +import com.alibaba.csp.sentinel.SphU; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.system.SystemRule; +import com.alibaba.csp.sentinel.slots.system.SystemRuleManager; +import com.alibaba.csp.sentinel.util.TimeUtil; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author jialiang.linjl + */ +public class SystemGuardDemo { + + private static AtomicInteger pass = new AtomicInteger(); + private static AtomicInteger block = new AtomicInteger(); + private static AtomicInteger total = new AtomicInteger(); + + private static volatile boolean stop = false; + private static final int threadCount = 100; + + private static int seconds = 60 + 40; + + public static void main(String[] args) throws Exception { + + tick(); + initSystemRule(); + + for (int i = 0; i < threadCount; i++) { + Thread entryThread = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + Entry entry = null; + try { + entry = SphU.entry("methodA", EntryType.IN); + pass.incrementAndGet(); + try { + TimeUnit.MILLISECONDS.sleep(20); + } catch (InterruptedException e) { + // ignore + } + } catch (BlockException e1) { + block.incrementAndGet(); + try { + TimeUnit.MILLISECONDS.sleep(20); + } catch (InterruptedException e) { + // ignore + } + } catch (Exception e2) { + // biz exception + } finally { + total.incrementAndGet(); + if (entry != null) { + entry.exit(); + } + } + } + } + + }); + entryThread.setName("working-thread"); + entryThread.start(); + } + } + + private static void initSystemRule() { + SystemRule rule = new SystemRule(); + // max load is 3 + rule.setHighestSystemLoad(3.0); + // max cpu usage is 60% + rule.setHighestCpuUsage(0.6); + // max avg rt of all request is 10 ms + rule.setAvgRt(10); + // max total qps is 20 + rule.setQps(20); + // max parallel working thread is 10 + rule.setMaxThread(10); + + SystemRuleManager.loadRules(Collections.singletonList(rule)); + } + + private static void tick() { + Thread timer = new Thread(new TimerTask()); + timer.setName("sentinel-timer-task"); + timer.start(); + } + + static class TimerTask implements Runnable { + @Override + public void run() { + System.out.println("begin to statistic!!!"); + long oldTotal = 0; + long oldPass = 0; + long oldBlock = 0; + while (!stop) { + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + } + long globalTotal = total.get(); + long oneSecondTotal = globalTotal - oldTotal; + oldTotal = globalTotal; + + long globalPass = pass.get(); + long oneSecondPass = globalPass - oldPass; + oldPass = globalPass; + + long globalBlock = block.get(); + long oneSecondBlock = globalBlock - oldBlock; + oldBlock = globalBlock; + + System.out.println(seconds + ", " + TimeUtil.currentTimeMillis() + ", total:" + + oneSecondTotal + ", pass:" + + oneSecondPass + ", block:" + oneSecondBlock); + if (seconds-- <= 0) { + stop = true; + } + } + System.exit(0); + } + } +} diff --git a/codes/distributed/ratelimit/sentinel/pom.xml b/codes/distributed/ratelimit/sentinel/pom.xml index 17ca1930..1e9cc23c 100644 --- a/codes/distributed/ratelimit/sentinel/pom.xml +++ b/codes/distributed/ratelimit/sentinel/pom.xml @@ -16,6 +16,7 @@ Spring::分布式::流量控制::Sentinel + basic api provider consumer diff --git a/codes/distributed/ratelimit/sentinel/provider/pom.xml b/codes/distributed/ratelimit/sentinel/provider/pom.xml index b556cc2e..fe468135 100644 --- a/codes/distributed/ratelimit/sentinel/provider/pom.xml +++ b/codes/distributed/ratelimit/sentinel/provider/pom.xml @@ -61,22 +61,22 @@ com.alibaba.csp sentinel-core - 1.8.7 + 2.0.0-alpha com.alibaba.csp sentinel-transport-simple-http - 1.8.7 + 2.0.0-alpha com.alibaba.csp sentinel-annotation-aspectj - 1.8.7 + 2.0.0-alpha com.alibaba.csp sentinel-parameter-flow-control - 1.8.7 + 2.0.0-alpha diff --git a/codes/distributed/ratelimit/sentinel/provider/src/main/java/example/spring/ratelimit/sentinel/RateLimitController.java b/codes/distributed/ratelimit/sentinel/provider/src/main/java/example/spring/ratelimit/sentinel/RateLimitController.java index 822a7f5e..e32e4a48 100644 --- a/codes/distributed/ratelimit/sentinel/provider/src/main/java/example/spring/ratelimit/sentinel/RateLimitController.java +++ b/codes/distributed/ratelimit/sentinel/provider/src/main/java/example/spring/ratelimit/sentinel/RateLimitController.java @@ -3,7 +3,8 @@ import com.alibaba.csp.sentinel.annotation.SentinelResource; import com.alibaba.csp.sentinel.slots.block.BlockException; import lombok.extern.slf4j.Slf4j; -import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** @@ -19,27 +20,38 @@ public class RateLimitController { /** * 注解方式定义资源并限流 */ - @GetMapping("/limit/http") - @SentinelResource(value = "Http限流", blockHandler = "httpLimitBlockHandler") - public String limitHttp() { + @RequestMapping("/flow/limit") + @SentinelResource(value = "流量控制", blockHandler = "flowLimitBlockHandler") + public String flowLimit() { try { - log.info("Http限流 -> 请求通过"); + log.info("流量控制 -> 请求通过"); return "ok"; } catch (Exception e) { - log.error("Http限流 -> 请求异常", e); + log.error("流量控制 -> 请求异常", e); return "failed"; } } - public String httpLimitBlockHandler(BlockException e) { - log.warn("Http限流 -> 请求限流", e); + public String flowLimitBlockHandler(BlockException e) { + log.warn("流量控制 -> 请求限流", e); return "failed"; } - @GetMapping("/limit/http") - @SentinelResource(value = "Http限流", blockHandler = "httpLimitBlockHandler") - public void paramFlowControl() { + @RequestMapping("/param/flow/limit") + @SentinelResource(value = "热点参数流量控制", blockHandler = "paramFlowLimitBlockHandler") + public String paramFlowLimit(@RequestParam("key1") String key1, @RequestParam("key2") Integer key2) { + try { + log.info("热点参数流量控制 -> 请求通过"); + return "ok"; + } catch (Exception e) { + log.error("热点参数流量控制 -> 请求异常", e); + return "failed"; + } + } + public String paramFlowLimitBlockHandler(String key1, Integer key2, BlockException e) { + log.warn("热点参数流量控制 -> 请求限流", e); + return "failed"; } }