diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java index e54fcf86a35..c96207f9c4c 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java @@ -91,6 +91,11 @@ public class Constants { public static final String $INVOKE = "$invoke"; + /** + * aysnc generic invoke method name. + */ + public static final String $INVOKE_ASYNC = "$invokeAsync"; + public static final String $ECHO = "$echo"; public static final int DEFAULT_IO_THREADS = Math.min(Runtime.getRuntime().availableProcessors() + 1, 32); @@ -357,6 +362,7 @@ public class Constants { public static final String ASYNC_KEY = "async"; public static final String FUTURE_GENERATED_KEY = "future_generated"; + public static final String FUTURE_RETURNTYPE_KEY = "future_returntype"; public static final String ASYNC_SUFFIX = "Async"; diff --git a/dubbo-compatible/src/test/java/org/apache/dubbo/generic/GenericServiceTest.java b/dubbo-compatible/src/test/java/org/apache/dubbo/generic/GenericServiceTest.java index 09d62cc067c..f7d7f5aa672 100644 --- a/dubbo-compatible/src/test/java/org/apache/dubbo/generic/GenericServiceTest.java +++ b/dubbo-compatible/src/test/java/org/apache/dubbo/generic/GenericServiceTest.java @@ -18,6 +18,7 @@ package org.apache.dubbo.generic; +import com.alibaba.fastjson.JSON; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.metadata.definition.ServiceDefinitionBuilder; @@ -32,9 +33,6 @@ import org.apache.dubbo.service.ComplexObject; import org.apache.dubbo.service.DemoService; import org.apache.dubbo.service.DemoServiceImpl; - -import com.alibaba.fastjson.JSON; - import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -43,6 +41,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; public class GenericServiceTest { @@ -66,6 +65,26 @@ public void testGeneric() { exporter.unexport(); } + @Test + public void asyncGenericCall() throws Exception { + DemoService server = new DemoServiceImpl(); + ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); + Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); + URL url = URL.valueOf("dubbo://127.0.0.1:5342/" + DemoService.class.getName() + "?version=1.0.0"); + Exporter exporter = protocol.export(proxyFactory.getInvoker(server, DemoService.class, url)); + Invoker invoker = protocol.refer(DemoService.class, url); + + GenericService client = (GenericService) proxyFactory.getProxy(invoker, true); + CompletableFuture future = client.$invokeAsync("sayHello", new String[]{"java.lang.String"}, new Object[]{"haha"}); + + // it will print hello haha and then pass the assert. + future.whenComplete((o, throwable) -> System.out.println(o)); + Assertions.assertEquals("hello haha", future.get()); + + invoker.destroy(); + exporter.unexport(); + } + @Test public void testGeneric2() { DemoService server = new DemoServiceImpl(); @@ -107,7 +126,7 @@ public void testGenericComplexCompute4FullServiceMetadata() { FullServiceDefinition fullServiceDefinition = ServiceDefinitionBuilder.buildFullDefinition(DemoService.class); MethodDefinition methodDefinition = getMethod("complexCompute", fullServiceDefinition.getMethods()); - Map mapObject = createComplexObject(fullServiceDefinition,var1, var2, l, var3, var4, testEnum); + Map mapObject = createComplexObject(fullServiceDefinition, var1, var2, l, var3, var4, testEnum); ComplexObject complexObject = map2bean(mapObject); Invoker invoker = protocol.refer(GenericService.class, url); diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/GenericFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/GenericFilter.java index ae7b92c40f1..a48e0381f44 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/GenericFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/GenericFilter.java @@ -51,7 +51,9 @@ public class GenericFilter implements Filter { @Override public Result invoke(Invoker invoker, Invocation inv) throws RpcException { - if (inv.getMethodName().equals(Constants.$INVOKE) + boolean asyncGeneric = inv.getMethodName().equals(Constants.$INVOKE_ASYNC); + boolean genericMethod = inv.getMethodName().equals(Constants.$INVOKE) || asyncGeneric; + if (genericMethod && inv.getArguments() != null && inv.getArguments().length == 3 && !GenericService.class.isAssignableFrom(invoker.getInterface())) { @@ -76,7 +78,7 @@ public Result invoke(Invoker invoker, Invocation inv) throws RpcException { } else if (ProtocolUtils.isJavaGenericSerialization(generic)) { for (int i = 0; i < args.length; i++) { if (byte[].class == args[i].getClass()) { - try(UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) { + try (UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) { args[i] = ExtensionLoader.getExtensionLoader(Serialization.class) .getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA) .deserialize(null, is).readObject(); @@ -108,7 +110,14 @@ public Result invoke(Invoker invoker, Invocation inv) throws RpcException { } } } - Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments())); + RpcInvocation realInvocation = new RpcInvocation(method, args, inv.getAttachments()); + if (asyncGeneric) { + // We need to remove FUTURE_RETURNTYPE_KEY to ensure that the + // server return value is still a normal type instead of a Future. + // In addition, the server does not provide the function of asynchronous generic invoke. + realInvocation.getAttachments().remove(Constants.FUTURE_RETURNTYPE_KEY); + } + Result result = invoker.invoke(realInvocation); if (result.hasException() && !(result.getException() instanceof GenericException)) { return new RpcResult(new GenericException(result.getException())); diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/service/GenericService.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/service/GenericService.java index 07517d474a4..b9883831f6e 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/service/GenericService.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/service/GenericService.java @@ -16,10 +16,11 @@ */ package org.apache.dubbo.rpc.service; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; + /** * Generic service interface - * - * @export */ public interface GenericService { @@ -35,4 +36,17 @@ public interface GenericService { */ Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException; + /** + * Async generic invocation + * + * @param method method name, same with {@link #$invoke(String, String[], Object[])}'s method name. e.g. findPerson + * @param parameterTypes Parameter types + * @param args Arguments + * @return future type. We can use it with {@link CompletableFuture#whenComplete(BiConsumer)} + * @throws GenericException potential exception thrown from the invocation + */ + default CompletableFuture $invokeAsync(String method, String[] parameterTypes, Object[] args) throws GenericException { + return CompletableFuture.completedFuture($invoke(method, parameterTypes, args)); + } + } \ No newline at end of file