Skip to content

Commit

Permalink
Fixes apache#3620, provider attachment lose on consumer side, fix thi…
Browse files Browse the repository at this point in the history
…s by reverting RpcContext copy
  • Loading branch information
chickenlj committed Mar 19, 2019
1 parent 30e12a4 commit 940efe5
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public class AsyncRpcResult implements Result {
private static final Logger logger = LoggerFactory.getLogger(AsyncRpcResult.class);

/**
* RpcContext can be changed, because thread may have been used by other thread. It should be cloned before store.
* So we use Invocation instead, Invocation will create for every invoke, but invocation only support attachments of string type.
* RpcContext may already have been changed when callback happens, it happens when the same thread is used to execute another RPC call.
* So we should keep the reference of current RpcContext instance and restore it before callback being executed.
*/
private RpcContext storedContext;
private RpcContext storedServerContext;
Expand All @@ -39,18 +39,14 @@ public class AsyncRpcResult implements Result {
private Invocation invocation;

public AsyncRpcResult(CompletableFuture<Result> future) {
this.resultFuture = future;
// employ copy of context avoid the other call may modify the context content
this.storedContext = RpcContext.getContext().copyOf();
this.storedServerContext = RpcContext.getServerContext().copyOf();
this(future, null);
}

public AsyncRpcResult(CompletableFuture<Result> future, Invocation invocation) {
this.resultFuture = future;
this.invocation = invocation;
// employ copy of context avoid the other call may modify the context content
this.storedContext = RpcContext.getContext().copyOf();
this.storedServerContext = RpcContext.getServerContext().copyOf();
this.storedContext = RpcContext.getContext();
this.storedServerContext = RpcContext.getServerContext();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc;

import java.util.concurrent.CompletableFuture;

/**
* Used for async call scenario. But if the method you are calling has a {@link CompletableFuture<?>} signature
* you do not need to use this class since you will get a Future response directly.
* <p>
* Remember to save the Future reference before making another call using the same thread, otherwise,
* the current Future will be override by the new one, which means you will lose the chance get the return value.
*/
public class FutureContext {

public static ThreadLocal<CompletableFuture<?>> futureTL = new ThreadLocal<>();

/**
* get future.
*
* @param <T>
* @return future
*/
@SuppressWarnings("unchecked")
public static <T> CompletableFuture<T> getCompletableFuture() {
return (CompletableFuture<T>) futureTL.get();
}

/**
* set future.
*
* @param future
*/
public static void setFuture(CompletableFuture<?> future) {
futureTL.set(future);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ protected RpcContext initialValue() {

private final Map<String, String> attachments = new HashMap<String, String>();
private final Map<String, Object> values = new HashMap<String, Object>();
private Future<?> future;

private List<URL> urls;

Expand Down Expand Up @@ -136,31 +135,6 @@ public static void restoreContext(RpcContext oldContext) {
LOCAL.set(oldContext);
}


public RpcContext copyOf() {
RpcContext copy = new RpcContext();
copy.attachments.putAll(this.attachments);
copy.values.putAll(this.values);
copy.future = this.future;
copy.urls = this.urls;
copy.url = this.url;
copy.methodName = this.methodName;
copy.parameterTypes = this.parameterTypes;
copy.arguments = this.arguments;
copy.localAddress = this.localAddress;
copy.remoteAddress = this.remoteAddress;
copy.invokers = this.invokers;
copy.invoker = this.invoker;
copy.invocation = this.invocation;

copy.request = this.request;
copy.response = this.response;
copy.asyncContext = this.asyncContext;

return copy;
}


/**
* remove context.
*
Expand Down Expand Up @@ -242,7 +216,7 @@ public boolean isConsumerSide() {
*/
@SuppressWarnings("unchecked")
public <T> CompletableFuture<T> getCompletableFuture() {
return (CompletableFuture<T>) future;
return FutureContext.getCompletableFuture();
}

/**
Expand All @@ -253,16 +227,16 @@ public <T> CompletableFuture<T> getCompletableFuture() {
*/
@SuppressWarnings("unchecked")
public <T> Future<T> getFuture() {
return (Future<T>) future;
return FutureContext.getCompletableFuture();
}

/**
* set future.
*
* @param future
*/
public void setFuture(Future<?> future) {
this.future = future;
public void setFuture(CompletableFuture<?> future) {
FutureContext.setFuture(future);
}

public List<URL> getUrls() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,10 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
// TODO should we clear server context?
RpcContext.removeServerContext();
return invoker.invoke(invocation);
} finally {
// TODO Call removeContext? but we need to save future for RpcContext.getFuture().
// TODO By calling clearAttachments, attachments will not be available when onResponse is invoked.
RpcContext.getContext().clearAttachments();
RpcContext.removeContext();
}
}

Expand Down

0 comments on commit 940efe5

Please sign in to comment.