Skip to content

Commit

Permalink
Add prioritized entry support in ProcessorSlot and SphU (alibaba#255)
Browse files Browse the repository at this point in the history
- Refactor the slot interface to support prioritized entry
- Add `entryWithPriority` in SphU

Signed-off-by: Eric Zhao <[email protected]>
  • Loading branch information
sczyh30 authored Nov 21, 2018
1 parent 26e0631 commit 4289105
Show file tree
Hide file tree
Showing 19 changed files with 123 additions and 65 deletions.
56 changes: 36 additions & 20 deletions sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtSph.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ private AsyncEntry asyncEntryWithNoChain(ResourceWrapper resourceWrapper, Contex
return entry;
}

private AsyncEntry asyncEntryInternal(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
private AsyncEntry asyncEntryWithPriorityInternal(ResourceWrapper resourceWrapper, int count, boolean prioritized,
Object... args) throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
Expand All @@ -87,7 +88,7 @@ private AsyncEntry asyncEntryInternal(ResourceWrapper resourceWrapper, int count

AsyncEntry asyncEntry = new AsyncEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, args);
chain.entry(context, resourceWrapper, null, count, prioritized, args);
// Initiate the async context only when the entry successfully passed the slot chain.
asyncEntry.initAsyncContext();
// The asynchronous call may take time in background, and current context should not be hanged on it.
Expand All @@ -108,23 +109,12 @@ private AsyncEntry asyncEntryInternal(ResourceWrapper resourceWrapper, int count
return asyncEntry;
}

/**
* Do all {@link Rule}s checking about the resource.
*
* <p>Each distinct resource will use a {@link ProcessorSlot} to do rules checking. Same resource will use
* same {@link ProcessorSlot} globally. </p>
*
* <p>Note that total {@link ProcessorSlot} count must not exceed {@link Constants#MAX_SLOT_CHAIN_SIZE},
* otherwise no rules checking will do. In this condition, all requests will pass directly, with no checking
* or exception.</p>
*
* @param resourceWrapper resource name
* @param count tokens needed
* @param args arguments of user method call
* @return {@link Entry} represents this call
* @throws BlockException if any rule's threshold is exceeded
*/
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
private AsyncEntry asyncEntryInternal(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
return asyncEntryWithPriorityInternal(resourceWrapper, count, false, args);
}

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
Expand Down Expand Up @@ -154,7 +144,7 @@ public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) t

Entry e = new CtEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, args);
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
Expand All @@ -165,6 +155,26 @@ public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) t
return e;
}

/**
* Do all {@link Rule}s checking about the resource.
*
* <p>Each distinct resource will use a {@link ProcessorSlot} to do rules checking. Same resource will use
* same {@link ProcessorSlot} globally. </p>
*
* <p>Note that total {@link ProcessorSlot} count must not exceed {@link Constants#MAX_SLOT_CHAIN_SIZE},
* otherwise no rules checking will do. In this condition, all requests will pass directly, with no checking
* or exception.</p>
*
* @param resourceWrapper resource name
* @param count tokens needed
* @param args arguments of user method call
* @return {@link Entry} represents this call
* @throws BlockException if any rule's threshold is exceeded
*/
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
return entryWithPriority(resourceWrapper, count, false, args);
}

/**
* Get {@link ProcessorSlotChain} of the resource. new {@link ProcessorSlotChain} will
* be created if the resource doesn't relate one.
Expand Down Expand Up @@ -305,4 +315,10 @@ public AsyncEntry asyncEntry(String name, EntryType type, int count, Object... a
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return asyncEntryInternal(resource, count, args);
}

@Override
public Entry entryWithPriority(String name, EntryType type, int count, boolean prioritized) throws BlockException {
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return entryWithPriority(resource, count, prioritized);
}
}
14 changes: 14 additions & 0 deletions sentinel-core/src/main/java/com/alibaba/csp/sentinel/Sph.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,18 @@ public interface Sph {
* @since 0.2.0
*/
AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException;

/**
* Create a protected resource with priority.
*
* @param name the unique name for the protected resource
* @param type the resource is an inbound or an outbound method. This is used
* to mark whether it can be blocked when the system is unstable
* @param count the count that the resource requires
* @param prioritized whether the entry is prioritized
* @return entry get
* @throws BlockException if the block criteria is met
* @since 1.4.0
*/
Entry entryWithPriority(String name, EntryType type, int count, boolean prioritized) throws BlockException;
}
25 changes: 25 additions & 0 deletions sentinel-core/src/main/java/com/alibaba/csp/sentinel/SphU.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,29 @@ public static AsyncEntry asyncEntry(String name, EntryType type) throws BlockExc
public static AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException {
return Env.sph.asyncEntry(name, type, count, args);
}

/**
* Checking all {@link Rule}s related the resource. The entry is prioritized.
*
* @param name the unique name for the protected resource
* @throws BlockException if the block criteria is met, eg. when any rule's threshold is exceeded.
* @since 1.4.0
*/
public static Entry entryWithPriority(String name) throws BlockException {
return Env.sph.entryWithPriority(name, EntryType.OUT, 1, true);
}

/**
* Checking all {@link Rule}s related the resource. The entry is prioritized.
*
* @param name the unique name for the protected resource
* @param type the resource is an inbound or an outbound method. This is used
* to mark whether it can be blocked when the system is unstable,
* only inbound traffic could be blocked by {@link SystemRule}
* @throws BlockException if the block criteria is met, eg. when any rule's threshold is exceeded.
* @since 1.4.0
*/
public static Entry entryWithPriority(String name, EntryType type) throws BlockException {
return Env.sph.entryWithPriority(name, type, 1, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T>
private AbstractLinkedProcessorSlot<?> next = null;

@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args)
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, args);
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}

@SuppressWarnings("unchecked")
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, Object... args)
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
entry(context, resourceWrapper, t, count, args);
entry(context, resourceWrapper, t, count, prioritized, args);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ public class DefaultProcessorSlotChain extends ProcessorSlotChain {
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args)
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, args);
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}

@Override
Expand Down Expand Up @@ -70,9 +70,9 @@ public AbstractLinkedProcessorSlot<?> getNext() {
}

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args)
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
first.transformEntry(context, resourceWrapper, t, count, args);
first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* @author qinan.qn
* @author jialiang.linjl
* @author leyou(lihao)
* @author Eric Zhao
*/
public interface ProcessorSlot<T> {

Expand All @@ -31,26 +32,28 @@ public interface ProcessorSlot<T> {
*
* @param context current {@link Context}
* @param resourceWrapper current resource
* @param param Generics parameter, usually is a {@link com.alibaba.csp.sentinel.node.Node}
* @param param generics parameter, usually is a {@link com.alibaba.csp.sentinel.node.Node}
* @param count tokens needed
* @param prioritized whether the entry is prioritized
* @param args parameters of the original call
* @throws Throwable blocked exception or unexpected error
*/
void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args)
throws Throwable;
void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
Object... args) throws Throwable;

/**
* Means finish of {@link #entry(Context, ResourceWrapper, Object, int, Object...)}.
* Means finish of {@link #entry(Context, ResourceWrapper, Object, int, boolean, Object...)}.
*
* @param context current {@link Context}
* @param resourceWrapper current resource
* @param obj relevant object (e.g. Node)
* @param count tokens needed
* @param prioritized whether the entry is prioritized
* @param args parameters of the original call
* @throws Throwable blocked exception or unexpected error
*/
void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args)
throws Throwable;
void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
Object... args) throws Throwable;

/**
* Exit of this slot.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import com.alibaba.csp.sentinel.node.IntervalProperty;

/***
/**
* @author youji.zj
* @author jialiang.linjl
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
checkBlackWhiteAuthority(resourceWrapper, context);
fireEntry(context, resourceWrapper, node, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);
fireEntry(context, resourceWrapper, node, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
throws Throwable {
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count);

fireEntry(context, resourceWrapper, node, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode>
private ClusterNode clusterNode = null;

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
Expand All @@ -100,7 +100,7 @@ public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode
context.getCurEntry().setOriginNode(originNode);
}

fireEntry(context, resourceWrapper, node, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, Object... args)
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
throws Throwable {
try {
fireEntry(context, resourceWrapper, obj, count, args);
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
context.getOrigin(), count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args)
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
/*
* It's interesting that we use context name rather resource name as the map key.
Expand Down Expand Up @@ -168,7 +168,7 @@ public void entry(Context context, ResourceWrapper resourceWrapper, Object obj,
}

context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
throws Throwable {
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);

// Request passed, add thread count and pass count.
node.increaseThreadNum();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
throws Throwable {
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
SystemRuleManager.checkSystem(resourceWrapper);
fireEntry(context, resourceWrapper, node, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private MustBlockSlot addMustBlockSlot(ResourceWrapper resourceWrapper) {
private class ShouldNotPassSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode param, int count,
Object... args) {
boolean prioritized, Object... args) {
throw new IllegalStateException("Should not enter this slot!");
}

Expand All @@ -323,7 +323,7 @@ private class MustBlockSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode param, int count,
Object... args) throws Throwable {
boolean prioritized, Object... args) throws Throwable {
throw new BlockException("custom") {};
}

Expand All @@ -339,7 +339,7 @@ private class ShouldPassSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode param, int count,
Object... args) {
boolean prioritized, Object... args) {
entered = true;
}

Expand Down
Loading

0 comments on commit 4289105

Please sign in to comment.