Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async generic invoke support. #3637

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ public class Constants {

public static final String $INVOKE = "$invoke";

/**
* aysnc generic invoke method name.
*/
public static final String $INVOKE_ASYNC = "$invokeAsync";

public static final String $ECHO = "$echo";

public static final int DEFAULT_IO_THREADS = Math.min(Runtime.getRuntime().availableProcessors() + 1, 32);
Expand Down Expand Up @@ -357,6 +362,7 @@ public class Constants {
public static final String ASYNC_KEY = "async";

public static final String FUTURE_GENERATED_KEY = "future_generated";

public static final String FUTURE_RETURNTYPE_KEY = "future_returntype";

public static final String ASYNC_SUFFIX = "Async";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dubbo.generic;


import com.alibaba.fastjson.JSON;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.metadata.definition.ServiceDefinitionBuilder;
Expand All @@ -32,9 +33,6 @@
import org.apache.dubbo.service.ComplexObject;
import org.apache.dubbo.service.DemoService;
import org.apache.dubbo.service.DemoServiceImpl;

import com.alibaba.fastjson.JSON;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand All @@ -43,6 +41,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

public class GenericServiceTest {

Expand All @@ -66,6 +65,26 @@ public void testGeneric() {
exporter.unexport();
}

@Test
public void asyncGenericCall() throws Exception {
DemoService server = new DemoServiceImpl();
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
URL url = URL.valueOf("dubbo://127.0.0.1:5342/" + DemoService.class.getName() + "?version=1.0.0");
Exporter<DemoService> exporter = protocol.export(proxyFactory.getInvoker(server, DemoService.class, url));
Invoker<DemoService> invoker = protocol.refer(DemoService.class, url);

GenericService client = (GenericService) proxyFactory.getProxy(invoker, true);
CompletableFuture<Object> future = client.$invokeAsync("sayHello", new String[]{"java.lang.String"}, new Object[]{"haha"});

// it will print hello haha and then pass the assert.
future.whenComplete((o, throwable) -> System.out.println(o));
Assertions.assertEquals("hello haha", future.get());

invoker.destroy();
exporter.unexport();
}

@Test
public void testGeneric2() {
DemoService server = new DemoServiceImpl();
Expand Down Expand Up @@ -107,7 +126,7 @@ public void testGenericComplexCompute4FullServiceMetadata() {

FullServiceDefinition fullServiceDefinition = ServiceDefinitionBuilder.buildFullDefinition(DemoService.class);
MethodDefinition methodDefinition = getMethod("complexCompute", fullServiceDefinition.getMethods());
Map mapObject = createComplexObject(fullServiceDefinition,var1, var2, l, var3, var4, testEnum);
Map mapObject = createComplexObject(fullServiceDefinition, var1, var2, l, var3, var4, testEnum);
ComplexObject complexObject = map2bean(mapObject);

Invoker<GenericService> invoker = protocol.refer(GenericService.class, url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ public class GenericFilter implements Filter {

@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (inv.getMethodName().equals(Constants.$INVOKE)
boolean asyncGeneric = inv.getMethodName().equals(Constants.$INVOKE_ASYNC);
boolean genericMethod = inv.getMethodName().equals(Constants.$INVOKE) || asyncGeneric;
if (genericMethod
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !GenericService.class.isAssignableFrom(invoker.getInterface())) {
Expand All @@ -76,7 +78,7 @@ public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
} else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for (int i = 0; i < args.length; i++) {
if (byte[].class == args[i].getClass()) {
try(UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) {
try (UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) {
args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
.deserialize(null, is).readObject();
Expand Down Expand Up @@ -108,7 +110,14 @@ public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
}
}
}
Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
RpcInvocation realInvocation = new RpcInvocation(method, args, inv.getAttachments());
if (asyncGeneric) {
// We need to remove FUTURE_RETURNTYPE_KEY to ensure that the
// server return value is still a normal type instead of a Future.
// In addition, the server does not provide the function of asynchronous generic invoke.
realInvocation.getAttachments().remove(Constants.FUTURE_RETURNTYPE_KEY);
}
Result result = invoker.invoke(realInvocation);
if (result.hasException()
&& !(result.getException() instanceof GenericException)) {
return new RpcResult(new GenericException(result.getException()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
*/
package org.apache.dubbo.rpc.service;

import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

/**
* Generic service interface
*
* @export
*/
public interface GenericService {

Expand All @@ -35,4 +36,17 @@ public interface GenericService {
*/
Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException;

/**
* Async generic invocation
*
* @param method method name, same with {@link #$invoke(String, String[], Object[])}'s method name. e.g. findPerson
* @param parameterTypes Parameter types
* @param args Arguments
* @return future type. We can use it with {@link CompletableFuture#whenComplete(BiConsumer)}
* @throws GenericException potential exception thrown from the invocation
*/
default CompletableFuture<Object> $invokeAsync(String method, String[] parameterTypes, Object[] args) throws GenericException {
return CompletableFuture.completedFuture($invoke(method, parameterTypes, args));
}

}