Skip to content

Commit

Permalink
add cluster concurrent flow control demo
Browse files Browse the repository at this point in the history
Signed-off-by: yunfeiyanggzq <[email protected]>
  • Loading branch information
yunfeiyanggzq committed Sep 4, 2020
1 parent f5c2e1a commit cb1921c
Show file tree
Hide file tree
Showing 5 changed files with 322 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sentinel-demo-cluster</artifactId>
<groupId>com.alibaba.csp</groupId>
<version>1.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>sentinel-demo-cluster-server-concurrent</artifactId>

<dependencies>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-parameter-flow-control</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-cluster-server-default</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-cluster-client-default</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.demo.cluster;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.cluster.ClusterStateManager;
import com.alibaba.csp.sentinel.cluster.server.ClusterTokenServer;
import com.alibaba.csp.sentinel.cluster.server.SentinelDefaultTokenServer;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
* <p>Cluster server demo (alone mode).</p>
* <p>Here we init the cluster server dynamic data sources in
* {@link com.alibaba.csp.sentinel.demo.cluster.init.DemoClusterServerInitFunc}.</p>
*
* @author Eric Zhao
* @since 1.4.0
*/
public class ClusterClientDemo {
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static ExecutorService pool = Executors.newFixedThreadPool(100);

public static void main(String[] args) throws Exception {
ClusterTokenServer tokenServer = new SentinelDefaultTokenServer();
AtomicInteger success = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(100000);
ClusterStateManager.setToClient();
long start = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
Runnable task = new Runnable() {
@Override
public void run() {
Entry entry = null;
try {
entry = SphU.entry("cluster-resource2");
System.out.println("pass");
success.incrementAndGet();
} catch (Exception ex) {
System.out.println("block");
} finally {
countDownLatch.countDown();
if (entry != null) {
entry.exit();
}
}
}
};
pool.execute(task);
}
countDownLatch.await();
pool.shutdown();
System.out.println("the count of pass: " + success.get() + " | time use: " + (System.currentTimeMillis() - start));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.demo.cluster;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.cluster.ClusterStateManager;
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager;
import com.alibaba.csp.sentinel.cluster.server.ClusterTokenServer;
import com.alibaba.csp.sentinel.cluster.server.SentinelDefaultTokenServer;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
* <p>Cluster server demo (alone mode).</p>
* <p>Here we init the cluster server dynamic data sources in
* {@link com.alibaba.csp.sentinel.demo.cluster.init.DemoClusterServerInitFunc}.</p>
*
* @author Eric Zhao
* @since 1.4.0
*/
public class ClusterServerDemo {
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static ExecutorService pool = Executors.newFixedThreadPool(100);

@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static ExecutorService monitor = Executors.newFixedThreadPool(1);

public static void main(String[] args) throws Exception {
ClusterTokenServer tokenServer = new SentinelDefaultTokenServer();
AtomicInteger success = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(100);
ClusterStateManager.setToServer();
long start = System.currentTimeMillis();

Runnable monitorTask = new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
System.out.println("the concurrency of the rule{flowId:222}: " + CurrentConcurrencyManager.get(222L).get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
};
monitor.submit(monitorTask);

for (int i = 0; i < 100; i++) {
Runnable task = new Runnable() {
@Override
public void run() {
Entry entry = null;
try {
entry = SphU.entry("cluster-resource2");
System.out.println("pass");
success.incrementAndGet();
} catch (Exception ex) {
System.out.println("block");
} finally {
countDownLatch.countDown();
if (entry != null) {
entry.exit();
}
}
}
};
pool.execute(task);
}

countDownLatch.await();
pool.shutdown();
System.out.println("the count of pass: " + success.get() + " | time use: " + (System.currentTimeMillis() - start));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.demo.cluster.init;

import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientAssignConfig;
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfig;
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager;
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager;
import com.alibaba.csp.sentinel.cluster.server.config.ServerTransportConfig;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.datasource.nacos.NacosDataSource;
import com.alibaba.csp.sentinel.init.InitFunc;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;

import java.util.Collections;
import java.util.List;

/**
* @author Eric Zhao
*/
public class DemoClusterServerInitFunc implements InitFunc {

private static final String remoteAddress = "localhost:8848";
private static final String groupId = "SENTINEL_GROUP";
public static final String APP_NAME = "appA";
public static final String FLOW_POSTFIX = "-flow-rules";


//the rule is as follows:
//[
// {
// "clusterConfig":{
// "acquireRefuseStrategy":0,
// "clientOfflineTime":1000,
// "fallbackToLocalWhenFail": true,
// "flowId":222,
// "resourceTimeout":1000,
// "resourceTimeoutStrategy":0,
// "sampleCount":1000,
// "strategy":0,
// "thresholdType":1,
// "windowIntervalMs":1000
// },
// "clusterMode":true,
// "controlBehavior":0,
// "count":40,
// "grade":0,
// "limitApp":"default",
// "maxQueueingTimeMs":1000,
// "resource":"cluster-resource2",
// "strategy":0,
// "warmUpPeriodSec":10
// },
//]


@Override
public void init() throws Exception {
ClusterFlowRuleManager.setPropertySupplier(namespace -> {
ReadableDataSource<String, List<FlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
namespace + FLOW_POSTFIX,
source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
}));
return ds.getProperty();
});

ReadableDataSource<String, List<FlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
APP_NAME + FLOW_POSTFIX,
source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
}));
FlowRuleManager.register2Property(ds.getProperty());
initClusterClient();
initClusterServer();
}

private static void initClusterServer() {
ServerTransportConfig ServerTransportConfig = new ServerTransportConfig(18730, 600);
ClusterServerConfigManager.loadGlobalTransportConfig(ServerTransportConfig);
ClusterServerConfigManager.loadServerNamespaceSet(Collections.singleton(APP_NAME));
}

private static void initClusterClient() {
ClusterClientConfig clusterClientConfig = new ClusterClientConfig();
clusterClientConfig.setRequestTimeout(1500);
ClusterClientConfigManager.applyNewConfig(clusterClientConfig);
ClusterClientAssignConfig clusterClientAssignConfig = new ClusterClientAssignConfig("127.0.0.1", 18730);
ClusterClientConfigManager.applyNewAssignConfig(clusterClientAssignConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.alibaba.csp.sentinel.demo.cluster.init.DemoClusterServerInitFunc

0 comments on commit cb1921c

Please sign in to comment.