Skip to content

Commit

Permalink
modify
Browse files Browse the repository at this point in the history
Signed-off-by: yunfeiyanggzq <[email protected]>
  • Loading branch information
yunfeiyanggzq committed Aug 21, 2020
1 parent 2748c04 commit 159b398
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.ReleaseTokenStrategy;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;

Expand All @@ -34,7 +35,7 @@
/**
* @author yunfeiyanggzq
*/
public class ReleaseTokenStrategy implements com.alibaba.csp.sentinel.slots.block.flow.ReleaseTokenStrategy {
public class DefaultReleaseTokenStrategy implements ReleaseTokenStrategy {

/**
* the map to store the {@link Timeout}, if the {@link Timeout} is null, the ResourceTimeoutStrategy is RuleConstant.DEFAULT_RESOURCE_TIMEOUT_STRATEGY.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public static TokenResult acquireConcurrentToken(/*@Valid*/ String clientAddress
TokenCacheNodeManager.putTokenCacheNode(node.getTokenId(), node);
TokenResult tokenResult = new TokenResult(TokenResultStatus.OK);
tokenResult.setTokenId(node.getTokenId());
System.out.println("获得token" + tokenResult.getTokenId());
return tokenResult;
}

Expand All @@ -98,6 +97,5 @@ public static void releaseConcurrentToken(/*@Valid*/ long tokenId) {
AtomicInteger nowCalls = CurrentConcurrencyManager.get(node.getFlowId());
nowCalls.getAndAdd(-1 * acquireCount);
ClusterServerStatLogUtil.log("concurrent|release|" + rule.getClusterConfig().getFlowId(), acquireCount);
System.out.println("释放token"+tokenId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
*/
package com.alibaba.csp.sentinel.slots.block.flow;

import java.util.Collection;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.cluster.ClusterStateManager;
import com.alibaba.csp.sentinel.cluster.client.ReleaseTokenStrategyProvider;
import com.alibaba.csp.sentinel.cluster.server.EmbeddedClusterTokenServerProvider;
import com.alibaba.csp.sentinel.cluster.client.TokenClientProvider;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenService;
import com.alibaba.csp.sentinel.cluster.client.ReleaseTokenStrategyProvider;
import com.alibaba.csp.sentinel.cluster.client.TokenClientProvider;
import com.alibaba.csp.sentinel.cluster.server.EmbeddedClusterTokenServerProvider;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.node.DefaultNode;
Expand All @@ -36,6 +35,8 @@
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.csp.sentinel.util.function.Function;

import java.util.Collection;

/**
* Rule checker for flow control rules.
*
Expand All @@ -59,16 +60,16 @@ public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, Resou
}

public void release(Context context) {
getReleaseTokenStrategy().releaseTokenWhenExitSlot(context);
exitSlot(context);
}

public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node,
int acquireCount) {
int acquireCount) {
return canPassCheck(rule, context, node, acquireCount, false);
}

public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
Expand Down Expand Up @@ -139,7 +140,7 @@ static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context

return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
Expand All @@ -158,7 +159,7 @@ private static boolean passClusterCheck(FlowRule rule, Context context, DefaultN
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
long flowId = rule.getClusterConfig().getFlowId();
TokenResult result = requestToken(clusterService, rule, context, node, acquireCount, prioritized);
TokenResult result = requestToken(clusterService, rule, context, node, acquireCount, prioritized);
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
// If client is absent, then fallback to local mode.
} catch (Throwable ex) {
Expand Down Expand Up @@ -214,7 +215,7 @@ private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRul
case TokenResultStatus.OK:
// Store the tokenId and start a resource timeout timer to release the timeout token if necessary.
if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {
getReleaseTokenStrategy().doWithResourceTimeoutToken(result, rule, context, node, acquireCount, prioritized);
setEntry(result, rule, context, node, acquireCount, prioritized);
}
return true;
case TokenResultStatus.SHOULD_WAIT:
Expand All @@ -236,7 +237,36 @@ private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRul
}
}

public static ReleaseTokenStrategy getReleaseTokenStrategy() {
private static ReleaseTokenStrategy getReleaseTokenStrategy() {
return ReleaseTokenStrategyProvider.getStrategy();
}

private static void setEntry(TokenResult result, FlowRule rule, Context context, DefaultNode node,
int acquireCount, boolean prioritized) {
if (getReleaseTokenStrategy() == null) {
// if spi fail
Entry entry = context.getCurEntry();
if (entry != null) {
entry.setTokenId(result.getTokenId());
}
} else {
getReleaseTokenStrategy().doWithResourceTimeoutToken(result, rule, context, node, acquireCount, prioritized);
}
}

private static void exitSlot(Context context) {
if (getReleaseTokenStrategy() == null) {
// if spi fail
Entry entry = context.getCurEntry();
if (entry != null) {
long tokenId = entry.getTokenId();
TokenService server = pickClusterService();
if (server != null) {
server.releaseConcurrentToken(tokenId);
}
}
} else {
getReleaseTokenStrategy().releaseTokenWhenExitSlot(context);
}
}
}

0 comments on commit 159b398

Please sign in to comment.