Skip to content

Commit

Permalink
support asynchronous call
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingSinger committed Oct 25, 2019
1 parent 751e796 commit 22c6802
Show file tree
Hide file tree
Showing 11 changed files with 319 additions and 90 deletions.
16 changes: 15 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<properties>
<java.source.version>1.8</java.source.version>
<java.target.version>1.8</java.target.version>

<maven.compiler.plugin>3.6.0</maven.compiler.plugin>
<sentinel.version>1.6.0</sentinel.version>
<apache.dubbo.version>2.7.1</apache.dubbo.version>

Expand Down Expand Up @@ -52,4 +52,18 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.compiler.plugin}</version>
<configuration>
<source>${java.source.version}</source>
<target>${java.target.version}</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,20 @@
import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.Tracer;
import com.alibaba.csp.sentinel.adapter.dubbo.fallback.DubboFallbackRegistry;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slots.block.BlockException;

import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.support.RpcUtils;

/**
* <p>Dubbo service consumer filter for Sentinel. Auto activated by default.</p>
*
* <p>
* If you want to disable the consumer filter, you can configure:
* <pre>
* &lt;dubbo:consumer filter="-sentinel.dubbo.consumer.filter"/&gt;
Expand All @@ -42,42 +41,39 @@
* @author Eric Zhao
*/
@Activate(group = "consumer")
public class SentinelDubboConsumerFilter implements Filter {
public class BaseSentinelDubboConsumerFilter extends BaseSentinelDubboFilter {

public SentinelDubboConsumerFilter() {
public BaseSentinelDubboConsumerFilter() {
RecordLog.info("Sentinel Apache Dubbo consumer filter initialized");
}

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Entry interfaceEntry = null;
Entry methodEntry = null;
RpcContext rpcContext = RpcContext.getContext();
try {
boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
String resourceName = DubboUtils.getResourceName(invoker, invocation);
interfaceEntry = SphU.entry(invoker.getInterface().getName(), EntryType.OUT);
methodEntry = SphU.entry(resourceName, EntryType.OUT);

Result result = invoker.invoke(invocation);
if (result.hasException()) {
Throwable e = result.getException();
// Record common exception.
Tracer.traceEntry(e, interfaceEntry);
Tracer.traceEntry(e, methodEntry);
if (!isAsync) {
interfaceEntry = SphU.entry(invoker.getUrl().getEncodedServiceKey(), EntryType.OUT);
methodEntry = SphU.entry(resourceName, EntryType.OUT);
} else {
// should generate the AsyncEntry when the invoke model in future or async
interfaceEntry = SphU.asyncEntry(invoker.getUrl().getEncodedServiceKey(), EntryType.OUT);
methodEntry = SphU.asyncEntry(resourceName, EntryType.OUT);
}
return result;
rpcContext.set(DubboUtils.DUBBO_INTERFACE_ENTRY_KEY, interfaceEntry);
rpcContext.set(DubboUtils.DUBBO_METHOD_ENTRY_KEY, methodEntry);
return invoker.invoke(invocation);
} catch (BlockException e) {
return DubboFallbackRegistry.getConsumerFallback().handle(invoker, invocation, e);
} catch (RpcException e) {
Tracer.traceEntry(e, interfaceEntry);
Tracer.traceEntry(e, methodEntry);
// timeout or nonbiz-exception
trace(e, invocation);
throw e;
} finally {
if (methodEntry != null) {
methodEntry.exit();
}
if (interfaceEntry != null) {
interfaceEntry.exit();
}
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.alibaba.csp.sentinel.adapter.dubbo;


import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.Tracer;
import com.alibaba.csp.sentinel.context.ContextUtil;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;

public abstract class BaseSentinelDubboFilter implements Filter {

@Override
public Result onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
//it's unnecessary to trace the business exception
trace(null, invocation);
return appResponse;
}


static void trace(Throwable throwable, Invocation invocation) {
Entry interfaceEntry = (Entry) RpcContext.getContext().get(DubboUtils.DUBBO_INTERFACE_ENTRY_KEY);
Entry methodEntry = (Entry) RpcContext.getContext().get(DubboUtils.DUBBO_METHOD_ENTRY_KEY);
if (methodEntry != null) {
Tracer.traceEntry(throwable, methodEntry);
methodEntry.exit();
}
if (interfaceEntry != null) {
Tracer.traceEntry(throwable, interfaceEntry);
interfaceEntry.exit();
}
if (!(interfaceEntry instanceof AsyncEntry)) {
ContextUtil.exit();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
public final class DubboUtils {

public static final String SENTINEL_DUBBO_APPLICATION_KEY = "dubboApplication";
public static final String DUBBO_METHOD_ENTRY_KEY = "dubboMethodEntry";
public static final String DUBBO_INTERFACE_ENTRY_KEY = "dubboInterfaceEntry";

public static String getApplication(Invocation invocation, String defaultValue) {
if (invocation == null || invocation.getAttachments() == null) {
Expand All @@ -34,7 +36,7 @@ public static String getApplication(Invocation invocation, String defaultValue)

public static String getResourceName(Invoker<?> invoker, Invocation invocation) {
StringBuilder buf = new StringBuilder(64);
buf.append(invoker.getInterface().getName())
buf.append(invoker.getUrl().getEncodedServiceKey())
.append(":")
.append(invocation.getMethodName())
.append("(");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,21 @@
import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.Tracer;
import com.alibaba.csp.sentinel.adapter.dubbo.fallback.DubboFallbackRegistry;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slots.block.BlockException;

import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;

/**
* <p>Apache Dubbo service provider filter that enables integration with Sentinel. Auto activated by default.</p>
* <p>Note: this only works for Apache Dubbo 2.7.x or above version.</p>
*
* <p>
* If you want to disable the provider filter, you can configure:
* <pre>
* &lt;dubbo:provider filter="-sentinel.dubbo.provider.filter"/&gt;
Expand All @@ -44,7 +42,7 @@
* @author Eric Zhao
*/
@Activate(group = "provider")
public class SentinelDubboProviderFilter implements Filter {
public class SentinelDubboProviderFilter extends BaseSentinelDubboFilter {

public SentinelDubboProviderFilter() {
RecordLog.info("Sentinel Apache Dubbo provider filter initialized");
Expand All @@ -54,40 +52,28 @@ public SentinelDubboProviderFilter() {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// Get origin caller.
String application = DubboUtils.getApplication(invocation, "");

RpcContext rpcContext = RpcContext.getContext();
Entry interfaceEntry = null;
Entry methodEntry = null;
try {
String resourceName = DubboUtils.getResourceName(invoker, invocation);
String interfaceName = invoker.getInterface().getName();
String interfaceName = invoker.getUrl().getEncodedServiceKey();
// Only need to create entrance context at provider side, as context will take effect
// at entrance of invocation chain only (for inbound traffic).
ContextUtil.enter(resourceName, application);
interfaceEntry = SphU.entry(interfaceName, EntryType.IN);
methodEntry = SphU.entry(resourceName, EntryType.IN, 1, invocation.getArguments());

Result result = invoker.invoke(invocation);
if (result.hasException()) {
Throwable e = result.getException();
// Record common exception.
Tracer.traceEntry(e, interfaceEntry);
Tracer.traceEntry(e, methodEntry);
}
return result;
rpcContext.set(DubboUtils.DUBBO_INTERFACE_ENTRY_KEY, interfaceEntry);
rpcContext.set(DubboUtils.DUBBO_METHOD_ENTRY_KEY, methodEntry);
return invoker.invoke(invocation);
} catch (BlockException e) {
return DubboFallbackRegistry.getProviderFallback().handle(invoker, invocation, e);
} catch (RpcException e) {
Tracer.traceEntry(e, interfaceEntry);
Tracer.traceEntry(e, methodEntry);
trace(e, invocation);
throw e;
} finally {
if (methodEntry != null) {
methodEntry.exit(1, invocation.getArguments());
}
if (interfaceEntry != null) {
interfaceEntry.exit();
}
ContextUtil.exit();
}
}


}

Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
sentinel.dubbo.provider.filter=com.alibaba.csp.sentinel.adapter.dubbo.SentinelDubboProviderFilter
sentinel.dubbo.consumer.filter=com.alibaba.csp.sentinel.adapter.dubbo.SentinelDubboConsumerFilter
sentinel.dubbo.consumer.filter=com.alibaba.csp.sentinel.adapter.dubbo.BaseSentinelDubboConsumerFilter
dubbo.application.context.name.filter=com.alibaba.csp.sentinel.adapter.dubbo.DubboAppContextFilter
21 changes: 16 additions & 5 deletions src/test/java/com/alibaba/csp/sentinel/BaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
*/
package com.alibaba.csp.sentinel;

import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;

import org.apache.dubbo.rpc.RpcContext;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

/**
* Base test class, provide common methods for subClass
* The package is same as CtSph, to call CtSph.resetChainMap() method for test
*
* <p>
* Note: Only for test. DO NOT USE IN PRODUCTION!
*
* @author cdfive
Expand All @@ -33,8 +36,16 @@ public class BaseTest {
* Clean up resources for context, clusterNodeMap, processorSlotChainMap
*/
protected static void cleanUpAll() {
RpcContext.removeContext();
ClusterBuilderSlot.getClusterNodeMap().clear();
CtSph.resetChainMap();
try {
RpcContext.removeContext();
ClusterBuilderSlot.getClusterNodeMap().clear();
CtSph.resetChainMap();
Method method = ContextUtil.class.getDeclaredMethod("resetContextMap");
method.setAccessible(true);
method.invoke(null, null);
ContextUtil.exit();
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.alibaba.csp.sentinel.adapter.dubbo.DubboUtils;
import com.alibaba.csp.sentinel.adapter.dubbo.provider.DemoService;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.junit.Test;
Expand Down Expand Up @@ -64,7 +66,11 @@ public void testGetApplicationNoAttachments() {
@Test
public void testGetResourceName() {
Invoker invoker = mock(Invoker.class);
when(invoker.getInterface()).thenReturn(DemoService.class);
URL url = URL.valueOf("dubbo://127.0.0.1:2181")
.addParameter(Constants.VERSION_KEY,"1.0.0")
.addParameter(Constants.GROUP_KEY,"grp1")
.addParameter(Constants.INTERFACE_KEY,DemoService.class.getName());
when(invoker.getUrl()).thenReturn(url);

Invocation invocation = mock(Invocation.class);
Method method = DemoService.class.getMethods()[0];
Expand All @@ -73,6 +79,6 @@ public void testGetResourceName() {

String resourceName = DubboUtils.getResourceName(invoker, invocation);

assertEquals("com.alibaba.csp.sentinel.adapter.dubbo.provider.DemoService:sayHello(java.lang.String,int)", resourceName);
assertEquals("grp1*com.alibaba.csp.sentinel.adapter.dubbo.provider.DemoService:1.0.0:sayHello(java.lang.String,int)", resourceName);
}
}
Loading

0 comments on commit 22c6802

Please sign in to comment.