diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java index 315cf98d0e0..c54352571d2 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java @@ -29,8 +29,8 @@ public class AsyncRpcResult implements Result { private static final Logger logger = LoggerFactory.getLogger(AsyncRpcResult.class); /** - * RpcContext can be changed, because thread may have been used by other thread. It should be cloned before store. - * So we use Invocation instead, Invocation will create for every invoke, but invocation only support attachments of string type. + * RpcContext may already have been changed when callback happens, it happens when the same thread is used to execute another RPC call. + * So we should keep the reference of current RpcContext instance and restore it before callback being executed. */ private RpcContext storedContext; private RpcContext storedServerContext; @@ -39,18 +39,14 @@ public class AsyncRpcResult implements Result { private Invocation invocation; public AsyncRpcResult(CompletableFuture future) { - this.resultFuture = future; - // employ copy of context avoid the other call may modify the context content - this.storedContext = RpcContext.getContext().copyOf(); - this.storedServerContext = RpcContext.getServerContext().copyOf(); + this(future, null); } public AsyncRpcResult(CompletableFuture future, Invocation invocation) { this.resultFuture = future; this.invocation = invocation; - // employ copy of context avoid the other call may modify the context content - this.storedContext = RpcContext.getContext().copyOf(); - this.storedServerContext = RpcContext.getServerContext().copyOf(); + this.storedContext = RpcContext.getContext(); + this.storedServerContext = RpcContext.getServerContext(); } @Override diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureContext.java new file mode 100644 index 00000000000..2468837958c --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureContext.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc; + +import java.util.concurrent.CompletableFuture; + +/** + * Used for async call scenario. But if the method you are calling has a {@link CompletableFuture} signature + * you do not need to use this class since you will get a Future response directly. + *

+ * Remember to save the Future reference before making another call using the same thread, otherwise, + * the current Future will be override by the new one, which means you will lose the chance get the return value. + */ +public class FutureContext { + + public static ThreadLocal> futureTL = new ThreadLocal<>(); + + /** + * get future. + * + * @param + * @return future + */ + @SuppressWarnings("unchecked") + public static CompletableFuture getCompletableFuture() { + return (CompletableFuture) futureTL.get(); + } + + /** + * set future. + * + * @param future + */ + public static void setFuture(CompletableFuture future) { + futureTL.set(future); + } + +} diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java index 7c10081e5f2..e1725d20100 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java @@ -70,7 +70,6 @@ protected RpcContext initialValue() { private final Map attachments = new HashMap(); private final Map values = new HashMap(); - private Future future; private List urls; @@ -136,31 +135,6 @@ public static void restoreContext(RpcContext oldContext) { LOCAL.set(oldContext); } - - public RpcContext copyOf() { - RpcContext copy = new RpcContext(); - copy.attachments.putAll(this.attachments); - copy.values.putAll(this.values); - copy.future = this.future; - copy.urls = this.urls; - copy.url = this.url; - copy.methodName = this.methodName; - copy.parameterTypes = this.parameterTypes; - copy.arguments = this.arguments; - copy.localAddress = this.localAddress; - copy.remoteAddress = this.remoteAddress; - copy.invokers = this.invokers; - copy.invoker = this.invoker; - copy.invocation = this.invocation; - - copy.request = this.request; - copy.response = this.response; - copy.asyncContext = this.asyncContext; - - return copy; - } - - /** * remove context. * @@ -242,7 +216,7 @@ public boolean isConsumerSide() { */ @SuppressWarnings("unchecked") public CompletableFuture getCompletableFuture() { - return (CompletableFuture) future; + return FutureContext.getCompletableFuture(); } /** @@ -253,7 +227,7 @@ public CompletableFuture getCompletableFuture() { */ @SuppressWarnings("unchecked") public Future getFuture() { - return (Future) future; + return FutureContext.getCompletableFuture(); } /** @@ -261,8 +235,8 @@ public Future getFuture() { * * @param future */ - public void setFuture(Future future) { - this.future = future; + public void setFuture(CompletableFuture future) { + FutureContext.setFuture(future); } public List getUrls() { diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java index 4c49336ece4..06d5bde0ec9 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java @@ -49,13 +49,10 @@ public Result invoke(Invoker invoker, Invocation invocation) throws RpcExcept ((RpcInvocation) invocation).setInvoker(invoker); } try { - // TODO should we clear server context? RpcContext.removeServerContext(); return invoker.invoke(invocation); } finally { - // TODO Call removeContext? but we need to save future for RpcContext.getFuture(). - // TODO By calling clearAttachments, attachments will not be available when onResponse is invoked. - RpcContext.getContext().clearAttachments(); + RpcContext.removeContext(); } }