Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature]finish cluster concurrent flow control rule checker #1631

Merged
merged 4 commits into from
Sep 16, 2020

Conversation

yunfeiyanggzq
Copy link
Contributor

@yunfeiyanggzq yunfeiyanggzq commented Jul 27, 2020

Signed-off-by: yunfeiyanggzq [email protected]

Describe what this PR does / why we need it

完成集群并发流控流控规则源
图片

Does this pull request fix one issue?

Fixes #1629 此处有项目整体代码和测试说明

Describe how you did it

本pr是集群并发流控的一部分代码,主要完成DefaultTokenService,本代码并没有接入FlowSlot的FlowCheker中,所以对现有的业务并没有发生任何的改变。主要的功能是实现了集群并发流控的规则源,不能进行netty通信

改造sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/ClusterFlowConfig.java,调整集群流控规则,增加并发流控的相应控制参数
改造sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java 主要适配并发集群流控中的一些参数检测。
新增sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/CurrentConcurrencyManager.java 维护每个rule所对应的当前并发量
新增sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNodeManager.java 存储目前的token信息
新增sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/RegularExpireStrategy.java 删除过期token
新增sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ConcurrentClusterFlowChecker.java 主要对token是否能分发进行检测和回收token

server端新增接口

 /**
     * Request acquire concurrent tokens from remote token server.
     *
     * @param clientAddress the address of the request belong.
     * @param ruleId ruleId the unique rule ID
     * @param acquireCount token count to acquire
     * @return result of the token request
     */
    TokenResult requestConcurrentToken(String clientAddress,Long ruleId,int acquireCount);
    /**
     * Request lease concurrent tokens from remote token server.
     *
     * @param tokenId the unique token ID
     */
    void releaseConcurrentToken(Long tokenId);

Describe how to verify it

查看单元测试。

Special notes for reviews

@yunfeiyanggzq yunfeiyanggzq force-pushed the savepoint7.27-2 branch 3 times, most recently from 6d8cc45 to d74c0bc Compare July 27, 2020 09:41
@sczyh30 sczyh30 added area/cluster-flow Issues or PRs related to cluster flow control kind/feature Category issues or prs related to feature request. size/XXL Indicate a PR that changes 1000+ lines. to-review To review labels Jul 27, 2020
@sczyh30 sczyh30 requested review from sczyh30, a team, cdfive and jasonjoo2010 July 27, 2020 10:58
}
}
ClusterServerStatLogUtil.log("concurrent|pass|" + flowId, acquireCount);
TokenCacheNode node = TokenCacheNode.generateTokenCacheNode(rule, acquireCount, clientAddress);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to pay attention to the memory and GC footprint when there are large amount of requests coming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the max memory used is decided by rules*maxConcurrency, I think memory used is controllable.every token information is small and the token can be replaced fastly during the process of acquiring and releasing, there may be some young gc in the case you mention, I think it can be acceptable

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may provide benchmark of the scenario. Frequent YGC may need optimizing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do it, Thanks !

Copy link
Contributor Author

@yunfeiyanggzq yunfeiyanggzq Aug 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

图片
Because the process is too short, so the image is not statistically accurate.
图片
图片

the code is as follows:

       @Before
    public void setUp() {
        FlowRule rule = new FlowRule();
        ClusterFlowConfig config = new ClusterFlowConfig();
        config.setResourceTimeout(500);
        config.setClientOfflineTime(1000);
        config.setFlowId(111L);
        config.setThresholdType(ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL);
        rule.setClusterConfig(config);
        rule.setClusterMode(true);
        rule.setCount(1000);
        rule.setResource("test");
        rule.setGrade(RuleConstant.FLOW_GRADE_THREAD);
        ArrayList<FlowRule> rules = new ArrayList<>();
        rules.add(rule);
        ClusterFlowRuleManager.registerPropertyIfAbsent("1-name");
        ClusterFlowRuleManager.loadRules("1-name", rules);
    }
   
   @Test
    public void testConcurrentAcquireAndRelease() throws InterruptedException {
        setCurrentMillis(System.currentTimeMillis());
        final FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(111L);
        final CountDownLatch countDownLatch = new CountDownLatch(1000000);
        ExecutorService pool = Executors.newFixedThreadPool(100);
        final AtomicInteger success=new AtomicInteger(0);
        for (long i = 0; i < 1000000; i++) {
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    assert rule != null;
                    TokenResult result = ConcurrentClusterFlowChecker.acquireConcurrentToken("127.0.0.1", rule, 1);
                    Assert.assertTrue("concurrent control fail", CurrentConcurrencyManager.get(111L).get() <= rule.getCount());
                    if (result.getStatus() == TokenResultStatus.OK) {
                        success.incrementAndGet();
                        ConcurrentClusterFlowChecker.releaseConcurrentToken(result.getTokenId());
                    }
                    countDownLatch.countDown();
                }
            };
            pool.execute(task);
        }
        countDownLatch.await();
        pool.shutdown();
        System.out.println(success.get()+"成功的");
        assert rule != null;
        Assert.assertTrue("fail to acquire and release token",
                CurrentConcurrencyManager.get(rule.getClusterConfig().getFlowId()).get() == 0 && TokenCacheNodeManager.getSize() == 0);
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

每秒请求量越小gc越少

@sczyh30
Copy link
Member

sczyh30 commented Aug 4, 2020

And some of the logs should be improved (e.g. the scope, content).

@yunfeiyanggzq
Copy link
Contributor Author

And some of the logs should be improved (e.g. the scope, content).

I will do with the review as soon as possible

@yunfeiyanggzq
Copy link
Contributor Author

I have modify the code with the opinion of @sczyh30 ,please review again @sczyh30 @jasonjoo2010 @cdfive ,Thanks!

Copy link
Member

@sczyh30 sczyh30 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. We may improve the commits later.

@sczyh30 sczyh30 merged commit 4feb16a into alibaba:master Sep 16, 2020
@sczyh30
Copy link
Member

sczyh30 commented Sep 16, 2020

Nice work. Thanks for contributing!

@sczyh30 sczyh30 removed the to-review To review label Sep 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/cluster-flow Issues or PRs related to cluster flow control kind/feature Category issues or prs related to feature request. priority/high Very important, need to be worked with soon but not very urgent size/XXL Indicate a PR that changes 1000+ lines.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants