From 539ea09ed4acd504b0429cbd7f1748aab4b2cbbc Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Thu, 23 May 2019 11:37:09 +0800 Subject: [PATCH 1/3] register callback once for the full filter chain. --- .../org/apache/dubbo/rpc/AsyncRpcResult.java | 18 ++--- .../filter/CallbackRegistrationFilter.java | 65 +++++++++++++++++++ .../rpc/protocol/ProtocolFilterWrapper.java | 22 +++---- .../internal/org.apache.dubbo.rpc.Filter | 3 +- 4 files changed, 80 insertions(+), 28 deletions(-) create mode 100644 dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/CallbackRegistrationFilter.java diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java index 9546e2a39ce..fc262475e88 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java @@ -137,20 +137,10 @@ public Object recreate() throws Throwable { } public Result thenApplyWithContext(Function fn) { - CompletableFuture future = this.thenApply(fn.compose(beforeContext).andThen(afterContext)); - AsyncRpcResult nextAsyncRpcResult = new AsyncRpcResult(this); - nextAsyncRpcResult.subscribeTo(future); - return nextAsyncRpcResult; - } - - public void subscribeTo(CompletableFuture future) { - future.whenComplete((obj, t) -> { - if (t != null) { - this.completeExceptionally(t); - } else { - this.complete((Result) obj); - } - }); + this.thenApply(fn.compose(beforeContext).andThen(afterContext)); + // You may need to return a new Result instance representing the next async stage, + // like thenApply will return a new CompletableFuture. + return this; } @Override diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/CallbackRegistrationFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/CallbackRegistrationFilter.java new file mode 100644 index 00000000000..0eb2e468345 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/CallbackRegistrationFilter.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.filter; + +import org.apache.dubbo.common.constants.CommonConstants; +import org.apache.dubbo.common.extension.Activate; +import org.apache.dubbo.rpc.Filter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.ListenableFilter; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcException; + +import java.util.List; + +@Activate(group = {CommonConstants.PROVIDER, CommonConstants.CONSUMER}, order = -999999) +public class CallbackRegistrationFilter implements Filter { + + private List filters; + + public CallbackRegistrationFilter() { + } + + public void setFilters(List filters) { + this.filters = filters; + } + + @Override + public Result invoke(Invoker invoker, Invocation invocation) throws RpcException { + Result asyncResult = invoker.invoke(invocation); + + asyncResult.thenApplyWithContext(r -> { + for (int i = filters.size() - 1; i >= 0; i--) { + Filter filter = filters.get(i); + // onResponse callback + if (filter instanceof ListenableFilter) { + Listener listener = ((ListenableFilter) filter).listener(); + if (listener != null) { + listener.onResponse(r, invoker, invocation); + } + } else { + filter.onResponse(r, invoker, invocation); + } + } + return r; + }); + + return asyncResult; + } + +} diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java index 9ee7d3e1f2b..31b25186eed 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java @@ -27,11 +27,11 @@ import org.apache.dubbo.rpc.Protocol; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.filter.CallbackRegistrationFilter; import java.util.List; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_PROTOCOL; - import static org.apache.dubbo.rpc.Constants.REFERENCE_FILTER_KEY; import static org.apache.dubbo.rpc.Constants.SERVICE_FILTER_KEY; @@ -49,12 +49,19 @@ public ProtocolFilterWrapper(Protocol protocol) { this.protocol = protocol; } + + private static Invoker buildInvokerChain(final Invoker invoker, String key, String group) { Invoker last = invoker; List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); + if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); + // register callback at CallbackRegistrationFilter + if (filter instanceof CallbackRegistrationFilter) { + ((CallbackRegistrationFilter) filter).setFilters(filters); + } final Invoker next = last; last = new Invoker() { @@ -88,18 +95,7 @@ public Result invoke(Invocation invocation) throws RpcException { } throw e; } - return asyncResult.thenApplyWithContext(r -> { - // onResponse callback - if (filter instanceof ListenableFilter) { - Filter.Listener listener = ((ListenableFilter) filter).listener(); - if (listener != null) { - listener.onResponse(r, invoker, invocation); - } - } else { - filter.onResponse(r, invoker, invocation); - } - return r; - }); + return asyncResult; } @Override diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter index 376f966e7ba..24065214f46 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter +++ b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter @@ -11,4 +11,5 @@ exception=org.apache.dubbo.rpc.filter.ExceptionFilter executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter compatible=org.apache.dubbo.rpc.filter.CompatibleFilter -timeout=org.apache.dubbo.rpc.filter.TimeoutFilter \ No newline at end of file +timeout=org.apache.dubbo.rpc.filter.TimeoutFilter +callback-registration=org.apache.dubbo.rpc.filter.CallbackRegistrationFilter \ No newline at end of file From dcf0a700c3f97df6b76820dbe47e36361093c9f6 Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Thu, 23 May 2019 17:14:20 +0800 Subject: [PATCH 2/3] use Invoker to replace Filter --- .../org/apache/dubbo/rpc/AppResponse.java | 18 +++++ .../filter/CallbackRegistrationFilter.java | 65 ------------------- .../rpc/protocol/ProtocolFilterWrapper.java | 61 +++++++++++++++-- 3 files changed, 73 insertions(+), 71 deletions(-) delete mode 100644 dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/CallbackRegistrationFilter.java diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java index 16e938095a5..6657b79b407 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java @@ -17,6 +17,7 @@ package org.apache.dubbo.rpc; import java.io.Serializable; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.function.Function; @@ -64,6 +65,23 @@ public AppResponse(Throwable exception) { @Override public Object recreate() throws Throwable { if (exception != null) { + // fix issue#619 + try { + // get Throwable class + Class clazz = exception.getClass(); + while (!clazz.getName().equals(Throwable.class.getName())) { + clazz = clazz.getSuperclass(); + } + // get stackTrace value + Field stackTraceField = clazz.getDeclaredField("stackTrace"); + stackTraceField.setAccessible(true); + Object stackTrace = stackTraceField.get(exception); + if (stackTrace == null) { + exception.setStackTrace(new StackTraceElement[0]); + } + } catch (Exception e) { + // ignore + } throw exception; } return result; diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/CallbackRegistrationFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/CallbackRegistrationFilter.java deleted file mode 100644 index 0eb2e468345..00000000000 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/CallbackRegistrationFilter.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.rpc.filter; - -import org.apache.dubbo.common.constants.CommonConstants; -import org.apache.dubbo.common.extension.Activate; -import org.apache.dubbo.rpc.Filter; -import org.apache.dubbo.rpc.Invocation; -import org.apache.dubbo.rpc.Invoker; -import org.apache.dubbo.rpc.ListenableFilter; -import org.apache.dubbo.rpc.Result; -import org.apache.dubbo.rpc.RpcException; - -import java.util.List; - -@Activate(group = {CommonConstants.PROVIDER, CommonConstants.CONSUMER}, order = -999999) -public class CallbackRegistrationFilter implements Filter { - - private List filters; - - public CallbackRegistrationFilter() { - } - - public void setFilters(List filters) { - this.filters = filters; - } - - @Override - public Result invoke(Invoker invoker, Invocation invocation) throws RpcException { - Result asyncResult = invoker.invoke(invocation); - - asyncResult.thenApplyWithContext(r -> { - for (int i = filters.size() - 1; i >= 0; i--) { - Filter filter = filters.get(i); - // onResponse callback - if (filter instanceof ListenableFilter) { - Listener listener = ((ListenableFilter) filter).listener(); - if (listener != null) { - listener.onResponse(r, invoker, invocation); - } - } else { - filter.onResponse(r, invoker, invocation); - } - } - return r; - }); - - return asyncResult; - } - -} diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java index 31b25186eed..f54d07688b8 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java @@ -27,7 +27,6 @@ import org.apache.dubbo.rpc.Protocol; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.filter.CallbackRegistrationFilter; import java.util.List; @@ -58,10 +57,6 @@ private static Invoker buildInvokerChain(final Invoker invoker, String if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); - // register callback at CallbackRegistrationFilter - if (filter instanceof CallbackRegistrationFilter) { - ((CallbackRegistrationFilter) filter).setFilters(filters); - } final Invoker next = last; last = new Invoker() { @@ -110,7 +105,8 @@ public String toString() { }; } } - return last; + + return new CallbackRegistrationInvoker<>(last, filters); } @Override @@ -139,4 +135,57 @@ public void destroy() { protocol.destroy(); } + static class CallbackRegistrationInvoker implements Invoker { + + private final Invoker filterInvoker; + private final List filters; + + public CallbackRegistrationInvoker(Invoker filterInvoker, List filters) { + this.filterInvoker = filterInvoker; + this.filters = filters; + } + + @Override + public Result invoke(Invocation invocation) throws RpcException { + Result asyncResult = filterInvoker.invoke(invocation); + + asyncResult.thenApplyWithContext(r -> { + for (int i = filters.size() - 1; i >= 0; i--) { + Filter filter = filters.get(i); + // onResponse callback + if (filter instanceof ListenableFilter) { + Filter.Listener listener = ((ListenableFilter) filter).listener(); + if (listener != null) { + listener.onResponse(r, filterInvoker, invocation); + } + } else { + filter.onResponse(r, filterInvoker, invocation); + } + } + return r; + }); + + return asyncResult; + } + + @Override + public Class getInterface() { + return filterInvoker.getInterface(); + } + + @Override + public URL getUrl() { + return filterInvoker.getUrl(); + } + + @Override + public boolean isAvailable() { + return filterInvoker.isAvailable(); + } + + @Override + public void destroy() { + filterInvoker.destroy(); + } + } } From a65dad84875208e31547240f2d1565a582e5b9bc Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Thu, 23 May 2019 17:19:04 +0800 Subject: [PATCH 3/3] revert unrelated commit --- .../java/org/apache/dubbo/rpc/AppResponse.java | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java index 6657b79b407..16e938095a5 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java @@ -17,7 +17,6 @@ package org.apache.dubbo.rpc; import java.io.Serializable; -import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.function.Function; @@ -65,23 +64,6 @@ public AppResponse(Throwable exception) { @Override public Object recreate() throws Throwable { if (exception != null) { - // fix issue#619 - try { - // get Throwable class - Class clazz = exception.getClass(); - while (!clazz.getName().equals(Throwable.class.getName())) { - clazz = clazz.getSuperclass(); - } - // get stackTrace value - Field stackTraceField = clazz.getDeclaredField("stackTrace"); - stackTraceField.setAccessible(true); - Object stackTrace = stackTraceField.get(exception); - if (stackTrace == null) { - exception.setStackTrace(new StackTraceElement[0]); - } - } catch (Exception e) { - // ignore - } throw exception; } return result;