diff --git a/CHANGES.md b/CHANGES.md index 7c04d5b87e8..0efe5519f8c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,10 +6,10 @@ 2. Asset transfer to ASF, includeing pom, license, DISCLAIMER and so on, #1491 3. Introduce of new dispatcher policy: EagerThreadpool, #1568 4. Separate monitor data with group and version, #1407 -5. Spring Boot Enhancenment, #1611 -6. Gaceful shutdown enhancement +5. Spring Boot Enhancement, #1611 +6. Graceful shutdown enhancement - Remove exporter destroy logic in AnnotationBean. - Waiting for registry notification on consumer side by checking channel state. 7. Simplify consumer/provider side check in RpcContext, #1444. -Issues and Pull Requests, check [milestone-2.6.2](https://github.com/apache/incubator-dubbo/milestone/15). \ No newline at end of file +Issues and Pull Requests, check [milestone-2.6.2](https://github.com/apache/incubator-dubbo/milestone/15). diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index bd7dc2022cd..b987b16e658 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -63,4 +63,14 @@ Thanks for contributing! ### Code style We provide a template file [dubbo_codestyle_for_idea.xml](https://github.com/apache/incubator-dubbo/tree/master/codestyle/dubbo_codestyle_for_idea.xml) for IntelliJ idea, you can import it to you IDE. -If you use Eclipse you can config manually by referencing the same file. \ No newline at end of file +If you use Eclipse you can config manually by referencing the same file. + +**NOTICE** + +It is very important to set the dubbo_codestyle_for_idea.xml, otherwise you will fail to pass the Travis CI. Steps to set the code style are as below: + +1. Enter `Editor > Code Style` +2. To manage a code style scheme, in the Code Style page, select the desired scheme from the drop-down list, and click ![manage profiles](codestyle/manage_profiles.png). +From the drop-down list, select `Import Scheme`, then select this option `IntelliJ IDEA code style XML` to import scheme +3. In the Scheme field, type the name of the new scheme and press ⏎ to save the changes. + diff --git a/FAQ.md b/FAQ.md deleted file mode 100644 index 28e2f0cc564..00000000000 --- a/FAQ.md +++ /dev/null @@ -1,25 +0,0 @@ -### Where is dubbo-admin? - -dubbo-admin has been moved from core repository to https://github.com/apache/incubator-dubbo-ops since 2.6.1 - -### Which version should I choose? - -Currently, dubbo keeps 3 versions evolve in parallel: - -* 2.7.x (master): requires Java 1.8, major feature branch. - -* 2.6.x: requires Java 1.6, minor feature & bugfix branch, GA, production ready. - -* 2.5.x: requires Java 1.6, maintenance branch, only accept security vulnerability and critical bugfix, expected to be EOL soon. - -check [this](https://github.com/apache/incubator-dubbo/issues/1208) for detailed version management plan. - -For contributors, please make sure all changes on the right branch, that is, most of the pull request should go to 2.7.x, and be backported to 2.6.x and 2.5.x if necessary. If the fix is specific to a branch, please make sure your pull request goes to the right branch. - -For committers, make sure select the right label and target branch for every PR, and don't forget to back port the fix to lower version is necessary. - -#### How to register ip correctly in docker? - -[Example question](https://github.com/alibaba/dubbo/issues/742) - -Dubbo supports specifying ip/port via system environment variables, examples can be found [here](https://github.com/dubbo/dubbo-samples/tree/master/dubbo-samples-docker). diff --git a/PULL_REQUEST_TEMPLATE.md b/PULL_REQUEST_TEMPLATE.md index 5427becaba5..8cb19f93455 100644 --- a/PULL_REQUEST_TEMPLATE.md +++ b/PULL_REQUEST_TEMPLATE.md @@ -16,5 +16,5 @@ Follow this checklist to help us incorporate your contribution quickly and easil - [ ] Format the pull request title like `[Dubbo-XXX] Fix UnknownException when host config not exist #XXX`. Each commit in the pull request should have a meaningful subject line and body. - [ ] Write a pull request description that is detailed enough to understand what the pull request does, how, and why. - [ ] Write necessary unit-test to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/incubator-dubbo/tree/master/dubbo-test). -- [ ] Run `mvn clean install -DskipTests` & `mvn clean test-compile failsafe:integration-test` to make sure unit-test and integration-test pass. +- [ ] Run `mvn clean install -DskipTests=false` & `mvn clean test-compile failsafe:integration-test` to make sure unit-test and integration-test pass. - [ ] If this contribution is large, please follow the [Software Donation Guide](https://github.com/apache/incubator-dubbo/wiki/Software-donation-guide). diff --git a/README.md b/README.md index b5906c7f46b..444ec822095 100644 --- a/README.md +++ b/README.md @@ -147,7 +147,7 @@ The consumer will print out `Hello world` on the screen. * [Your first Dubbo application](http://dubbo.apache.org/en-us/blog/dubbo-101.html) - A 101 tutorial to reveal more details, with the same code above. * [Dubbo user manual](http://dubbo.apache.org/en-us/docs/user/preface/background.html) - How to use Dubbo and all its features. -* [Dubbo developer guide](http://dubbo.apache.org/en-us/docs/dev/build.html) - How to invovle in Dubbo development. +* [Dubbo developer guide](http://dubbo.apache.org/en-us/docs/dev/build.html) - How to involve in Dubbo development. * [Dubbo admin manual](http://dubbo.apache.org/en-us/docs/admin/install/provider-demo.html) - How to admin and manage Dubbo services. ## Contact diff --git a/codestyle/manage_profiles.png b/codestyle/manage_profiles.png new file mode 100644 index 00000000000..1664d67ea43 Binary files /dev/null and b/codestyle/manage_profiles.png differ diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalance.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalance.java index 7a8d0aaaba7..feb9202d50f 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalance.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalance.java @@ -26,10 +26,17 @@ /** * AbstractLoadBalance - * */ public abstract class AbstractLoadBalance implements LoadBalance { - + /** + * Calculate the weight according to the uptime proportion of warmup time + * the new weight will be within 1(inclusive) to weight(inclusive) + * + * @param uptime the uptime in milliseconds + * @param warmup the warmup time in milliseconds + * @param weight the weight of an invoker + * @return weight which takes warmup into account + */ static int calculateWarmupWeight(int uptime, int warmup, int weight) { int ww = (int) ((float) uptime / ((float) warmup / (float) weight)); return ww < 1 ? 1 : (ww > weight ? weight : ww); @@ -48,6 +55,15 @@ public Invoker select(List> invokers, URL url, Invocation invo protected abstract Invoker doSelect(List> invokers, URL url, Invocation invocation); + + /** + * Get the weight of the invoker's invocation which takes warmup time into account + * if the uptime is within the warmup time, the weight will be reduce proportionally + * + * @param invoker the invoker + * @param invocation the invocation of this invoker + * @return weight + */ protected int getWeight(Invoker invoker, Invocation invocation) { int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); if (weight > 0) { @@ -60,7 +76,7 @@ protected int getWeight(Invoker invoker, Invocation invocation) { } } } - return weight; + return weight >= 0 ? weight : 0; } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveLoadBalance.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveLoadBalance.java index 860dd3d0a13..784195877be 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveLoadBalance.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveLoadBalance.java @@ -46,6 +46,8 @@ protected Invoker doSelect(List> invokers, URL url, Invocation int leastCount = 0; // The index of invokers having the same least active value (leastActive) int[] leastIndexes = new int[length]; + // the weight of every invokers + int[] weights = new int[length]; // The sum of the warmup weights of all the least active invokes int totalWeight = 0; // The weight of the first least active invoke @@ -53,6 +55,7 @@ protected Invoker doSelect(List> invokers, URL url, Invocation // Every least active invoker has the same weight value? boolean sameWeight = true; + // Filter out all the least active invokers for (int i = 0; i < length; i++) { Invoker invoker = invokers.get(i); @@ -60,6 +63,8 @@ protected Invoker doSelect(List> invokers, URL url, Invocation int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Get the weight of the invoke configuration. The default value is 100. int afterWarmup = getWeight(invoker, invocation); + // save for later use + weights[i] = afterWarmup; // If it is the first invoker or the active number of the invoker is less than the current least active number if (leastActive == -1 || active < leastActive) { // Reset the active number of the current invoker to the least active number @@ -95,12 +100,12 @@ protected Invoker doSelect(List> invokers, URL url, Invocation if (!sameWeight && totalWeight > 0) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on // totalWeight. - int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight) + 1; + int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexes[i]; - offsetWeight -= getWeight(invokers.get(leastIndex), invocation); - if (offsetWeight <= 0) { + offsetWeight -= weights[leastIndex]; + if (offsetWeight < 0) { return invokers.get(leastIndex); } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/RandomLoadBalance.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/RandomLoadBalance.java index eea0ca319db..88453ad7547 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/RandomLoadBalance.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/RandomLoadBalance.java @@ -25,7 +25,6 @@ /** * random load balance. - * */ public class RandomLoadBalance extends AbstractLoadBalance { @@ -33,13 +32,23 @@ public class RandomLoadBalance extends AbstractLoadBalance { @Override protected Invoker doSelect(List> invokers, URL url, Invocation invocation) { - int length = invokers.size(); // Number of invokers - boolean sameWeight = true; // Every invoker has the same weight? + // Number of invokers + int length = invokers.size(); + // Every invoker has the same weight? + boolean sameWeight = true; + // the weight of every invokers + int[] weights = new int[length]; + // the first invoker's weight int firstWeight = getWeight(invokers.get(0), invocation); - int totalWeight = firstWeight; // The sum of weights + weights[0] = firstWeight; + // The sum of weights + int totalWeight = firstWeight; for (int i = 1; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); - totalWeight += weight; // Sum + // save for later use + weights[i] = weight; + // Sum + totalWeight += weight; if (sameWeight && weight != firstWeight) { sameWeight = false; } @@ -49,7 +58,7 @@ protected Invoker doSelect(List> invokers, URL url, Invocation int offset = ThreadLocalRandom.current().nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < length; i++) { - offset -= getWeight(invokers.get(i), invocation); + offset -= weights[i]; if (offset < 0) { return invokers.get(i); } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalance.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalance.java index 242a2ffc23f..d30ebe1e5c3 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalance.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalance.java @@ -101,9 +101,7 @@ protected Invoker doSelect(List> invokers, URL url, Invocation String identifyString = invoker.getUrl().toIdentityString(); WeightedRoundRobin weightedRoundRobin = map.get(identifyString); int weight = getWeight(invoker, invocation); - if (weight < 0) { - weight = 0; - } + if (weightedRoundRobin == null) { weightedRoundRobin = new WeightedRoundRobin(); weightedRoundRobin.setWeight(weight); diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/ArrayMerger.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/ArrayMerger.java index d6e58778ba6..166bec2d916 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/ArrayMerger.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/ArrayMerger.java @@ -16,6 +16,7 @@ */ package org.apache.dubbo.rpc.cluster.merger; +import org.apache.dubbo.common.utils.ArrayUtils; import org.apache.dubbo.rpc.cluster.Merger; import java.lang.reflect.Array; @@ -25,34 +26,48 @@ public class ArrayMerger implements Merger { public static final ArrayMerger INSTANCE = new ArrayMerger(); @Override - public Object[] merge(Object[]... others) { - if (others.length == 0) { - return null; + public Object[] merge(Object[]... items) { + if (ArrayUtils.isEmpty(items)) { + return new Object[0]; } + + int i = 0; + while (i < items.length && items[i] == null) { + i++; + } + + if (i == items.length) { + return new Object[0]; + } + + Class type = items[i].getClass().getComponentType(); + int totalLen = 0; - for (int i = 0; i < others.length; i++) { - Object item = others[i]; - if (item != null && item.getClass().isArray()) { - totalLen += Array.getLength(item); - } else { - throw new IllegalArgumentException((i + 1) + "th argument is not an array"); + for (; i < items.length; i++) { + if (items[i] == null) { + continue; + } + Class itemType = items[i].getClass().getComponentType(); + if (itemType != type) { + throw new IllegalArgumentException("Arguments' types are different"); } + totalLen += items[i].length; } if (totalLen == 0) { - return null; + return new Object[0]; } - Class type = others[0].getClass().getComponentType(); - Object result = Array.newInstance(type, totalLen); + int index = 0; - for (Object array : others) { - for (int i = 0; i < Array.getLength(array); i++) { - Array.set(result, index++, Array.get(array, i)); + for (Object[] array : items) { + if (array != null) { + for (int j = 0; j < array.length; j++) { + Array.set(result, index++, array[j]); + } } } return (Object[]) result; } - } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/BooleanArrayMerger.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/BooleanArrayMerger.java index 090f8b13f7f..13e5f558abe 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/BooleanArrayMerger.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/BooleanArrayMerger.java @@ -17,21 +17,29 @@ package org.apache.dubbo.rpc.cluster.merger; +import org.apache.dubbo.common.utils.ArrayUtils; import org.apache.dubbo.rpc.cluster.Merger; public class BooleanArrayMerger implements Merger { @Override public boolean[] merge(boolean[]... items) { + if (ArrayUtils.isEmpty(items)) { + return new boolean[0]; + } int totalLen = 0; for (boolean[] array : items) { - totalLen += array.length; + if (array != null) { + totalLen += array.length; + } } boolean[] result = new boolean[totalLen]; int index = 0; for (boolean[] array : items) { - for (boolean item : array) { - result[index++] = item; + if (array != null) { + for (boolean item : array) { + result[index++] = item; + } } } return result; diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/ByteArrayMerger.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/ByteArrayMerger.java index c135bb34cc4..963fdddc465 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/ByteArrayMerger.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/ByteArrayMerger.java @@ -17,21 +17,29 @@ package org.apache.dubbo.rpc.cluster.merger; +import org.apache.dubbo.common.utils.ArrayUtils; import org.apache.dubbo.rpc.cluster.Merger; public class ByteArrayMerger implements Merger { @Override public byte[] merge(byte[]... items) { + if (ArrayUtils.isEmpty(items)) { + return new byte[0]; + } int total = 0; for (byte[] array : items) { - total += array.length; + if (array != null) { + total += array.length; + } } byte[] result = new byte[total]; int index = 0; for (byte[] array : items) { - for (byte item : array) { - result[index++] = item; + if (array != null) { + for (byte item : array) { + result[index++] = item; + } } } return result; diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/CharArrayMerger.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/CharArrayMerger.java index e9a3dbcf197..76302cf0990 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/CharArrayMerger.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/CharArrayMerger.java @@ -17,21 +17,29 @@ package org.apache.dubbo.rpc.cluster.merger; +import org.apache.dubbo.common.utils.ArrayUtils; import org.apache.dubbo.rpc.cluster.Merger; public class CharArrayMerger implements Merger { @Override public char[] merge(char[]... items) { + if (ArrayUtils.isEmpty(items)) { + return new char[0]; + } int total = 0; for (char[] array : items) { - total += array.length; + if (array != null) { + total += array.length; + } } char[] result = new char[total]; int index = 0; for (char[] array : items) { - for (char item : array) { - result[index++] = item; + if (array != null) { + for (char item : array) { + result[index++] = item; + } } } return result; diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/DoubleArrayMerger.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/DoubleArrayMerger.java index c42e2252764..8026d8df001 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/DoubleArrayMerger.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/DoubleArrayMerger.java @@ -17,21 +17,29 @@ package org.apache.dubbo.rpc.cluster.merger; +import org.apache.dubbo.common.utils.ArrayUtils; import org.apache.dubbo.rpc.cluster.Merger; public class DoubleArrayMerger implements Merger { @Override public double[] merge(double[]... items) { + if (ArrayUtils.isEmpty(items)) { + return new double[0]; + } int total = 0; for (double[] array : items) { - total += array.length; + if (array != null) { + total += array.length; + } } double[] result = new double[total]; int index = 0; for (double[] array : items) { - for (double item : array) { - result[index++] = item; + if (array != null) { + for (double item : array) { + result[index++] = item; + } } } return result; diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/FloatArrayMerger.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/FloatArrayMerger.java index d6a9e8b4642..f8186ae8e6c 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/FloatArrayMerger.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/FloatArrayMerger.java @@ -17,21 +17,29 @@ package org.apache.dubbo.rpc.cluster.merger; +import org.apache.dubbo.common.utils.ArrayUtils; import org.apache.dubbo.rpc.cluster.Merger; public class FloatArrayMerger implements Merger { @Override public float[] merge(float[]... items) { + if (ArrayUtils.isEmpty(items)) { + return new float[0]; + } int total = 0; for (float[] array : items) { - total += array.length; + if (array != null) { + total += array.length; + } } float[] result = new float[total]; int index = 0; for (float[] array : items) { - for (float item : array) { - result[index++] = item; + if (array != null) { + for (float item : array) { + result[index++] = item; + } } } return result; diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/IntArrayMerger.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/IntArrayMerger.java index 7be893997d2..6d49834b34a 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/IntArrayMerger.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/IntArrayMerger.java @@ -17,21 +17,29 @@ package org.apache.dubbo.rpc.cluster.merger; +import org.apache.dubbo.common.utils.ArrayUtils; import org.apache.dubbo.rpc.cluster.Merger; public class IntArrayMerger implements Merger { @Override public int[] merge(int[]... items) { + if (ArrayUtils.isEmpty(items)) { + return new int[0]; + } int totalLen = 0; - for (int[] item : items) { - totalLen += item.length; + for (int[] array : items) { + if (array != null) { + totalLen += array.length; + } } int[] result = new int[totalLen]; int index = 0; - for (int[] item : items) { - for (int i : item) { - result[index++] = i; + for (int[] array : items) { + if (array != null) { + for (int item : array) { + result[index++] = item; + } } } return result; diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/ListMerger.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/ListMerger.java index 9ec2fe92510..9c81854f3ab 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/ListMerger.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/ListMerger.java @@ -17,15 +17,20 @@ package org.apache.dubbo.rpc.cluster.merger; +import org.apache.dubbo.common.utils.ArrayUtils; import org.apache.dubbo.rpc.cluster.Merger; import java.util.ArrayList; +import java.util.Collections; import java.util.List; public class ListMerger implements Merger> { @Override public List merge(List... items) { + if (ArrayUtils.isEmpty(items)) { + return Collections.emptyList(); + } List result = new ArrayList(); for (List item : items) { if (item != null) { diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/LongArrayMerger.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/LongArrayMerger.java index 40ab0257fa4..52b090f002f 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/LongArrayMerger.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/LongArrayMerger.java @@ -17,21 +17,29 @@ package org.apache.dubbo.rpc.cluster.merger; +import org.apache.dubbo.common.utils.ArrayUtils; import org.apache.dubbo.rpc.cluster.Merger; public class LongArrayMerger implements Merger { @Override public long[] merge(long[]... items) { + if (ArrayUtils.isEmpty(items)) { + return new long[0]; + } int total = 0; for (long[] array : items) { - total += array.length; + if (array != null) { + total += array.length; + } } long[] result = new long[total]; int index = 0; for (long[] array : items) { - for (long item : array) { - result[index++] = item; + if (array != null) { + for (long item : array) { + result[index++] = item; + } } } return result; diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/MapMerger.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/MapMerger.java index 3e11e8bad43..9598ba7a257 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/MapMerger.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/MapMerger.java @@ -16,8 +16,10 @@ */ package org.apache.dubbo.rpc.cluster.merger; +import org.apache.dubbo.common.utils.ArrayUtils; import org.apache.dubbo.rpc.cluster.Merger; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -25,8 +27,8 @@ public class MapMerger implements Merger> { @Override public Map merge(Map... items) { - if (items.length == 0) { - return null; + if (ArrayUtils.isEmpty(items)) { + return Collections.emptyMap(); } Map result = new HashMap(); for (Map item : items) { diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/MergerFactory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/MergerFactory.java index 9264fb7c15d..845d21a6cd0 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/MergerFactory.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/MergerFactory.java @@ -30,7 +30,19 @@ public class MergerFactory { private static final ConcurrentMap, Merger> mergerCache = new ConcurrentHashMap, Merger>(); + /** + * Find the merger according to the returnType class, the merger will + * merge an array of returnType into one + * + * @param returnType the merger will return this type + * @return the merger which merges an array of returnType into one, return null if not exist + * @throws IllegalArgumentException if returnType is null + */ public static Merger getMerger(Class returnType) { + if (returnType == null) { + throw new IllegalArgumentException("returnType is null"); + } + Merger result; if (returnType.isArray()) { Class type = returnType.getComponentType(); diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/SetMerger.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/SetMerger.java index 558d5853ccd..d2b7aeb5e86 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/SetMerger.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/SetMerger.java @@ -16,8 +16,10 @@ */ package org.apache.dubbo.rpc.cluster.merger; +import org.apache.dubbo.common.utils.ArrayUtils; import org.apache.dubbo.rpc.cluster.Merger; +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -25,7 +27,9 @@ public class SetMerger implements Merger> { @Override public Set merge(Set... items) { - + if (ArrayUtils.isEmpty(items)) { + return Collections.emptySet(); + } Set result = new HashSet(); for (Set item : items) { diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/ShortArrayMerger.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/ShortArrayMerger.java index 3cbce9be51b..8ad5182a77f 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/ShortArrayMerger.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/merger/ShortArrayMerger.java @@ -17,21 +17,29 @@ package org.apache.dubbo.rpc.cluster.merger; +import org.apache.dubbo.common.utils.ArrayUtils; import org.apache.dubbo.rpc.cluster.Merger; public class ShortArrayMerger implements Merger { @Override public short[] merge(short[]... items) { + if (ArrayUtils.isEmpty(items)) { + return new short[0]; + } int total = 0; for (short[] array : items) { - total += array.length; + if (array != null) { + total += array.length; + } } short[] result = new short[total]; int index = 0; for (short[] array : items) { - for (short item : array) { - result[index++] = item; + if (array != null) { + for (short item : array) { + result[index++] = item; + } } } return result; diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalanceTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalanceTest.java index 7281fdbdebb..278725f80bb 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalanceTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalanceTest.java @@ -19,22 +19,36 @@ import org.apache.dubbo.rpc.Invoker; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +@SuppressWarnings("rawtypes") public class ConsistentHashLoadBalanceTest extends LoadBalanceBaseTest { - @Ignore - @Test - public void testConsistentHashLoadBalance() { - int runs = 10000; - Map counter = getInvokeCounter(runs, ConsistentHashLoadBalance.NAME); - for (Invoker minvoker : counter.keySet()) { - Long count = counter.get(minvoker).get(); - Assert.assertTrue("abs diff should < avg", Math.abs(count - runs / (0f + invokers.size())) < runs / (0f + invokers.size())); - } - } + + @Test + public void testConsistentHashLoadBalance() { + int runs = 10000; + long unHitedInvokerCount = 0; + Map hitedInvokers = new HashMap<>(); + Map counter = getInvokeCounter(runs, ConsistentHashLoadBalance.NAME); + for (Invoker minvoker : counter.keySet()) { + Long count = counter.get(minvoker).get(); + + if (count == 0) { + unHitedInvokerCount++; + } else { + hitedInvokers.put(minvoker, count); + } + } + + Assert.assertEquals("the number of unHitedInvoker should be counter.size() - 1", counter.size() - 1, + unHitedInvokerCount); + Assert.assertEquals("the number of hitedInvoker should be 1", 1, hitedInvokers.size()); + Assert.assertEquals("the number of hited count should be the number of runs", runs, + hitedInvokers.values().iterator().next().intValue()); + } } diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveBalanceTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveBalanceTest.java index 836994fdcc8..c54e9a00304 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveBalanceTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveBalanceTest.java @@ -30,8 +30,8 @@ public class LeastActiveBalanceTest extends LoadBalanceBaseTest { public void testLeastActiveLoadBalance_select() { int runs = 10000; Map counter = getInvokeCounter(runs, LeastActiveLoadBalance.NAME); - for (Invoker minvoker : counter.keySet()) { - Long count = counter.get(minvoker).get(); + for (Map.Entry entry : counter.entrySet()) { + Long count = entry.getValue().get(); // System.out.println(count); Assert.assertTrue("abs diff shoud < avg", Math.abs(count - runs / (0f + invokers.size())) < runs / (0f + invokers.size())); diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java index f2af3d39dff..2248aef376a 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java @@ -75,6 +75,7 @@ public void setUp() throws Exception { invocation = mock(Invocation.class); given(invocation.getMethodName()).willReturn("method1"); + given(invocation.getArguments()).willReturn(new Object[] {"arg1","arg2","arg3"}); invoker1 = mock(Invoker.class); invoker2 = mock(Invoker.class); diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RandomLoadBalanceTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RandomLoadBalanceTest.java index d31a85d08d4..3b65ee39580 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RandomLoadBalanceTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RandomLoadBalanceTest.java @@ -32,8 +32,8 @@ public class RandomLoadBalanceTest extends LoadBalanceBaseTest { public void testRandomLoadBalanceSelect() { int runs = 1000; Map counter = getInvokeCounter(runs, RandomLoadBalance.NAME); - for (Invoker minvoker : counter.keySet()) { - Long count = counter.get(minvoker).get(); + for (Map.Entry entry : counter.entrySet()) { + Long count = entry.getValue().get(); Assert.assertTrue("abs diff should < avg", Math.abs(count - runs / (0f + invokers.size())) < runs / (0f + invokers.size())); } @@ -43,8 +43,8 @@ public void testRandomLoadBalanceSelect() { } } counter = getInvokeCounter(runs, LeastActiveLoadBalance.NAME); - for (Invoker minvoker : counter.keySet()) { - Long count = counter.get(minvoker).get(); + for (Map.Entry entry : counter.entrySet()) { + Long count = entry.getValue().get(); } Assert.assertEquals(runs, counter.get(invoker1).intValue()); Assert.assertEquals(0, counter.get(invoker2).intValue()); diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalanceTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalanceTest.java index 5242f90bada..93c89eff978 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalanceTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalanceTest.java @@ -47,8 +47,8 @@ private void assertStrictWRRResult(int loop, Map resultMa public void testRoundRobinLoadBalanceSelect() { int runs = 10000; Map counter = getInvokeCounter(runs, RoundRobinLoadBalance.NAME); - for (Invoker minvoker : counter.keySet()) { - Long count = counter.get(minvoker).get(); + for (Map.Entry entry : counter.entrySet()) { + Long count = entry.getValue().get(); Assert.assertTrue("abs diff should < 1", Math.abs(count - runs / (0f + invokers.size())) < 1f); } } diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/merger/ResultMergerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/merger/ResultMergerTest.java index f5044f305e6..e4c30a4553f 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/merger/ResultMergerTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/merger/ResultMergerTest.java @@ -28,18 +28,45 @@ import java.util.Set; public class ResultMergerTest { + + /** + * MergerFactory test + */ + @Test + public void testMergerFactoryIllegalArgumentException() { + try { + MergerFactory.getMerger(null); + Assert.fail("expected IllegalArgumentException for null argument"); + } catch (IllegalArgumentException exception) { + Assert.assertEquals("returnType is null", exception.getMessage()); + } + } + + /** + * ArrayMerger test + */ + @Test + public void testArrayMergerIllegalArgumentException() { + String[] stringArray = {"1", "2", "3"}; + Integer[] integerArray = {3, 4, 5}; + try { + Object result = ArrayMerger.INSTANCE.merge(stringArray, null, integerArray); + Assert.fail("expected IllegalArgumentException for different arguments' types"); + } catch (IllegalArgumentException exception) { + Assert.assertEquals("Arguments' types are different", exception.getMessage()); + } + } + /** * ArrayMerger test - * - * @throws Exception */ @Test - public void testArrayMerger() throws Exception { + public void testArrayMerger() { String[] stringArray1 = {"1", "2", "3"}; String[] stringArray2 = {"4", "5", "6"}; String[] stringArray3 = {}; - Object result = ArrayMerger.INSTANCE.merge(stringArray1, stringArray2, stringArray3); + Object result = ArrayMerger.INSTANCE.merge(stringArray1, stringArray2, stringArray3, null); Assert.assertTrue(result.getClass().isArray()); Assert.assertEquals(6, Array.getLength(result)); Assert.assertTrue(String.class.isInstance(Array.get(result, 0))); @@ -47,139 +74,170 @@ public void testArrayMerger() throws Exception { Assert.assertEquals(String.valueOf(i + 1), Array.get(result, i)); } - int[] intArray1 = {1, 2, 3}; - int[] intArray2 = {4, 5, 6}; - int[] intArray3 = {7}; - result = MergerFactory.getMerger(int[].class).merge(intArray1, intArray2, intArray3); + Integer[] intArray1 = {1, 2, 3}; + Integer[] intArray2 = {4, 5, 6}; + Integer[] intArray3 = {7}; + // trigger ArrayMerger + result = MergerFactory.getMerger(Integer[].class).merge(intArray1, intArray2, intArray3, null); Assert.assertTrue(result.getClass().isArray()); Assert.assertEquals(7, Array.getLength(result)); - Assert.assertTrue(int.class == result.getClass().getComponentType()); + Assert.assertTrue(Integer.class == result.getClass().getComponentType()); for (int i = 0; i < 7; i++) { Assert.assertEquals(i + 1, Array.get(result, i)); } + result = ArrayMerger.INSTANCE.merge(null); + Assert.assertEquals(0, Array.getLength(result)); + + result = ArrayMerger.INSTANCE.merge(null, null); + Assert.assertEquals(0, Array.getLength(result)); + + result = ArrayMerger.INSTANCE.merge(null, new Object[0]); + Assert.assertEquals(0, Array.getLength(result)); } /** * BooleanArrayMerger test - * - * @throws Exception */ @Test - public void testBooleanArrayMerger() throws Exception { + public void testBooleanArrayMerger() { boolean[] arrayOne = {true, false}; boolean[] arrayTwo = {false}; - boolean[] result = MergerFactory.getMerger(boolean[].class).merge(arrayOne, arrayTwo); + boolean[] result = MergerFactory.getMerger(boolean[].class).merge(arrayOne, arrayTwo, null); Assert.assertEquals(3, result.length); boolean[] mergedResult = {true, false, false}; for (int i = 0; i < mergedResult.length; i++) { Assert.assertEquals(mergedResult[i], result[i]); } + + result = MergerFactory.getMerger(boolean[].class).merge(null); + Assert.assertEquals(0, result.length); + + result = MergerFactory.getMerger(boolean[].class).merge(null, null); + Assert.assertEquals(0, result.length); } /** * ByteArrayMerger test - * - * @throws Exception */ @Test - public void testByteArrayMerger() throws Exception { + public void testByteArrayMerger() { byte[] arrayOne = {1, 2}; byte[] arrayTwo = {1, 32}; - byte[] result = MergerFactory.getMerger(byte[].class).merge(arrayOne, arrayTwo); + byte[] result = MergerFactory.getMerger(byte[].class).merge(arrayOne, arrayTwo, null); Assert.assertEquals(4, result.length); byte[] mergedResult = {1, 2, 1, 32}; for (int i = 0; i < mergedResult.length; i++) { Assert.assertEquals(mergedResult[i], result[i]); } + + result = MergerFactory.getMerger(byte[].class).merge(null); + Assert.assertEquals(0, result.length); + + result = MergerFactory.getMerger(byte[].class).merge(null, null); + Assert.assertEquals(0, result.length); } /** * CharArrayMerger test - * - * @throws Exception */ @Test - public void testCharArrayMerger() throws Exception { + public void testCharArrayMerger() { char[] arrayOne = "hello".toCharArray(); char[] arrayTwo = "world".toCharArray(); - char[] result = MergerFactory.getMerger(char[].class).merge(arrayOne, arrayTwo); + char[] result = MergerFactory.getMerger(char[].class).merge(arrayOne, arrayTwo, null); Assert.assertEquals(10, result.length); char[] mergedResult = "helloworld".toCharArray(); for (int i = 0; i < mergedResult.length; i++) { Assert.assertEquals(mergedResult[i], result[i]); } + + result = MergerFactory.getMerger(char[].class).merge(null); + Assert.assertEquals(0, result.length); + + result = MergerFactory.getMerger(char[].class).merge(null, null); + Assert.assertEquals(0, result.length); } /** * DoubleArrayMerger test - * - * @throws Exception */ @Test - public void testDoubleArrayMerger() throws Exception { + public void testDoubleArrayMerger() { double[] arrayOne = {1.2d, 3.5d}; double[] arrayTwo = {2d, 34d}; - double[] result = MergerFactory.getMerger(double[].class).merge(arrayOne, arrayTwo); + double[] result = MergerFactory.getMerger(double[].class).merge(arrayOne, arrayTwo, null); Assert.assertEquals(4, result.length); double[] mergedResult = {1.2d, 3.5d, 2d, 34d}; for (int i = 0; i < mergedResult.length; i++) { Assert.assertTrue(mergedResult[i] == result[i]); } + + result = MergerFactory.getMerger(double[].class).merge(null); + Assert.assertEquals(0, result.length); + + result = MergerFactory.getMerger(double[].class).merge(null, null); + Assert.assertEquals(0, result.length); } /** * FloatArrayMerger test - * - * @throws Exception */ @Test - public void testFloatArrayMerger() throws Exception { + public void testFloatArrayMerger() { float[] arrayOne = {1.2f, 3.5f}; float[] arrayTwo = {2f, 34f}; - float[] result = MergerFactory.getMerger(float[].class).merge(arrayOne, arrayTwo); + float[] result = MergerFactory.getMerger(float[].class).merge(arrayOne, arrayTwo, null); Assert.assertEquals(4, result.length); double[] mergedResult = {1.2f, 3.5f, 2f, 34f}; for (int i = 0; i < mergedResult.length; i++) { Assert.assertTrue(mergedResult[i] == result[i]); } + + result = MergerFactory.getMerger(float[].class).merge(null); + Assert.assertEquals(0, result.length); + + result = MergerFactory.getMerger(float[].class).merge(null, null); + Assert.assertEquals(0, result.length); } /** * IntArrayMerger test - * - * @throws Exception */ @Test - public void testIntArrayMerger() throws Exception { + public void testIntArrayMerger() { int[] arrayOne = {1, 2}; int[] arrayTwo = {2, 34}; - int[] result = MergerFactory.getMerger(int[].class).merge(arrayOne, arrayTwo); + int[] result = MergerFactory.getMerger(int[].class).merge(arrayOne, arrayTwo, null); Assert.assertEquals(4, result.length); double[] mergedResult = {1, 2, 2, 34}; for (int i = 0; i < mergedResult.length; i++) { Assert.assertTrue(mergedResult[i] == result[i]); } + + result = MergerFactory.getMerger(int[].class).merge(null); + Assert.assertEquals(0, result.length); + + result = MergerFactory.getMerger(int[].class).merge(null, null); + Assert.assertEquals(0, result.length); } /** * ListMerger test - * - * @throws Exception */ @Test - public void testListMerger() throws Exception { - List list1 = new ArrayList(){{ + public void testListMerger() { + List list1 = new ArrayList() {{ add(null); add("1"); - add("2"); + add("2"); }}; - List list2 = new ArrayList(){{ + List list2 = new ArrayList() {{ add("3"); add("4"); }}; - List result = MergerFactory.getMerger(List.class).merge(list1, list2); + List result = MergerFactory.getMerger(List.class).merge(list1, list2, null); Assert.assertEquals(5, result.size()); ArrayList expected = new ArrayList() {{ add(null); @@ -189,67 +247,82 @@ public void testListMerger() throws Exception { add("4"); }}; Assert.assertEquals(expected, result); + + result = MergerFactory.getMerger(List.class).merge(null); + Assert.assertEquals(0, result.size()); + + result = MergerFactory.getMerger(List.class).merge(null, null); + Assert.assertEquals(0, result.size()); } /** * LongArrayMerger test - * - * @throws Exception */ @Test - public void testMapArrayMerger() throws Exception { - Map mapOne = new HashMap() {{ + public void testMapArrayMerger() { + Map mapOne = new HashMap() {{ put("11", 222); put("223", 11); }}; - Map mapTwo = new HashMap() {{ + Map mapTwo = new HashMap() {{ put("3333", 3232); put("444", 2323); }}; - Map result = MergerFactory.getMerger(Map.class).merge(mapOne, mapTwo); + Map result = MergerFactory.getMerger(Map.class).merge(mapOne, mapTwo, null); Assert.assertEquals(4, result.size()); - Map mergedResult = new HashMap() {{ + Map mergedResult = new HashMap() {{ put("11", 222); put("223", 11); put("3333", 3232); put("444", 2323); }}; Assert.assertEquals(mergedResult, result); + + result = MergerFactory.getMerger(Map.class).merge(null); + Assert.assertEquals(0, result.size()); + + result = MergerFactory.getMerger(Map.class).merge(null, null); + Assert.assertEquals(0, result.size()); } /** * LongArrayMerger test - * - * @throws Exception */ @Test - public void testLongArrayMerger() throws Exception { + public void testLongArrayMerger() { long[] arrayOne = {1l, 2l}; long[] arrayTwo = {2l, 34l}; - long[] result = MergerFactory.getMerger(long[].class).merge(arrayOne, arrayTwo); + long[] result = MergerFactory.getMerger(long[].class).merge(arrayOne, arrayTwo, null); Assert.assertEquals(4, result.length); double[] mergedResult = {1l, 2l, 2l, 34l}; for (int i = 0; i < mergedResult.length; i++) { Assert.assertTrue(mergedResult[i] == result[i]); } + + result = MergerFactory.getMerger(long[].class).merge(null); + Assert.assertEquals(0, result.length); + + result = MergerFactory.getMerger(long[].class).merge(null, null); + Assert.assertEquals(0, result.length); } /** * SetMerger test - * - * @throws Exception */ @Test - public void testSetMerger() throws Exception { - Set set1 = new HashSet(); - set1.add(null); - set1.add("1"); - set1.add("2"); - Set set2 = new HashSet(); - set2.add("2"); - set2.add("3"); + public void testSetMerger() { + Set set1 = new HashSet() {{ + add(null); + add("1"); + add("2"); + }}; - Set result = MergerFactory.getMerger(Set.class).merge(set1, set2); + Set set2 = new HashSet() {{ + add("2"); + add("3"); + }}; + + Set result = MergerFactory.getMerger(Set.class).merge(set1, set2, null); Assert.assertEquals(4, result.size()); Assert.assertEquals(new HashSet() { @@ -260,22 +333,32 @@ public void testSetMerger() throws Exception { add("3"); } }, result); + + result = MergerFactory.getMerger(Set.class).merge(null); + Assert.assertEquals(0, result.size()); + + result = MergerFactory.getMerger(Set.class).merge(null, null); + Assert.assertEquals(0, result.size()); } /** * ShortArrayMerger test - * - * @throws Exception */ @Test - public void testShortArrayMerger() throws Exception { + public void testShortArrayMerger() { short[] arrayOne = {1, 2}; short[] arrayTwo = {2, 34}; - short[] result = MergerFactory.getMerger(short[].class).merge(arrayOne, arrayTwo); + short[] result = MergerFactory.getMerger(short[].class).merge(arrayOne, arrayTwo, null); Assert.assertEquals(4, result.length); double[] mergedResult = {1, 2, 2, 34}; for (int i = 0; i < mergedResult.length; i++) { Assert.assertTrue(mergedResult[i] == result[i]); } + + result = MergerFactory.getMerger(short[].class).merge(null); + Assert.assertEquals(0, result.length); + + result = MergerFactory.getMerger(short[].class).merge(null, null); + Assert.assertEquals(0, result.length); } } diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java index 2039b0a0e4c..0344ee1b1a1 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java @@ -432,10 +432,10 @@ public void testSelectBalance() { counter.get(sinvoker).incrementAndGet(); } - for (Invoker minvoker : counter.keySet()) { - Long count = counter.get(minvoker).get(); + for (Map.Entry entry : counter.entrySet()) { + Long count = entry.getValue().get(); // System.out.println(count); - if (minvoker.isAvailable()) + if (entry.getKey().isAvailable()) Assert.assertTrue("count should > avg", count > runs / invokers.size()); } diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/Menu.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/Menu.java index b3564d27ae7..5f2b56143f8 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/Menu.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/Menu.java @@ -30,8 +30,8 @@ public Menu() { } public Menu(Map> menus) { - for (String key : menus.keySet()) { - this.menus.put(key, new ArrayList(menus.get(key))); + for (Map.Entry> entry : menus.entrySet()) { + this.menus.put(entry.getKey(), new ArrayList(entry.getValue())); } } diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvokerTest.java index 12d6ba41e3c..ba0b617b88c 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvokerTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvokerTest.java @@ -161,11 +161,11 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl merge(expected, firstMenuMap); merge(expected, secondMenuMap); assertEquals(expected.keySet(), menu.getMenus().keySet()); - for (String key : expected.keySet()) { + for (Map.Entry> entry : expected.entrySet()) { // FIXME: cannot guarantee the sequence of the merge result, check implementation in // MergeableClusterInvoker#invoke - List values1 = new ArrayList(expected.get(key)); - List values2 = new ArrayList(menu.getMenus().get(key)); + List values1 = new ArrayList(entry.getValue()); + List values2 = new ArrayList(menu.getMenus().get(entry.getKey())); Collections.sort(values1); Collections.sort(values2); assertEquals(values1, values2); diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java index 9265287a114..c6f8b1f6a7c 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java @@ -487,11 +487,21 @@ public class Constants { */ public static final String REGISTRY_RETRY_PERIOD_KEY = "retry.period"; + /** + * Most retry times + */ + public static final String REGISTRY_RETRY_TIMES_KEY = "retry.times"; + /** * Default value for the period of retry interval in milliseconds: 5000 */ public static final int DEFAULT_REGISTRY_RETRY_PERIOD = 5 * 1000; + /** + * Default value for the times of retry: 3 + */ + public static final int DEFAULT_REGISTRY_RETRY_TIMES = 3; + /** * Reconnection period in milliseconds for register center */ diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/beanutil/JavaBeanSerializeUtil.java b/dubbo-common/src/main/java/org/apache/dubbo/common/beanutil/JavaBeanSerializeUtil.java index a1bdc7af5eb..995965ac74a 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/beanutil/JavaBeanSerializeUtil.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/beanutil/JavaBeanSerializeUtil.java @@ -144,12 +144,11 @@ private static void serializeInternal(JavaBeanDescriptor descriptor, Object obj, } } else if (obj instanceof Map) { Map map = (Map) obj; - for (Object key : map.keySet()) { - Object value = map.get(key); + map.forEach((key, value) -> { Object keyDescriptor = key == null ? null : createDescriptorIfAbsent(key, accessor, cache); Object valueDescriptor = value == null ? null : createDescriptorIfAbsent(value, accessor, cache); descriptor.setProperty(keyDescriptor, valueDescriptor); - } // ~ end of loop map + });// ~ end of loop map } else { if (JavaBeanAccessor.isAccessByMethod(accessor)) { Map methods = ReflectUtils.getBeanPropertyReadMethods(obj.getClass()); diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java index c39f3b67a59..731e1bb6b02 100644 --- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java +++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java @@ -173,46 +173,7 @@ private void init() { } checkInterfaceAndMethods(interfaceClass, methods); } - String resolve = System.getProperty(interfaceName); - String resolveFile = null; - if (resolve == null || resolve.length() == 0) { - resolveFile = System.getProperty("dubbo.resolve.file"); - if (resolveFile == null || resolveFile.length() == 0) { - File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties"); - if (userResolveFile.exists()) { - resolveFile = userResolveFile.getAbsolutePath(); - } - } - if (resolveFile != null && resolveFile.length() > 0) { - Properties properties = new Properties(); - FileInputStream fis = null; - try { - fis = new FileInputStream(new File(resolveFile)); - properties.load(fis); - } catch (IOException e) { - throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e); - } finally { - try { - if (null != fis) { - fis.close(); - } - } catch (IOException e) { - logger.warn(e.getMessage(), e); - } - } - resolve = properties.getProperty(interfaceName); - } - } - if (resolve != null && resolve.length() > 0) { - url = resolve; - if (logger.isWarnEnabled()) { - if (resolveFile != null) { - logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service."); - } else { - logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service."); - } - } - } + resolveFile(); if (consumer != null) { if (application == null) { application = consumer.getApplication(); @@ -529,4 +490,46 @@ public String getUniqueServiceName() { return buf.toString(); } + private void resolveFile() { + String resolve = System.getProperty(interfaceName); + String resolveFile = null; + if (resolve == null || resolve.length() == 0) { + resolveFile = System.getProperty("dubbo.resolve.file"); + if (resolveFile == null || resolveFile.length() == 0) { + File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties"); + if (userResolveFile.exists()) { + resolveFile = userResolveFile.getAbsolutePath(); + } + } + if (resolveFile != null && resolveFile.length() > 0) { + Properties properties = new Properties(); + FileInputStream fis = null; + try { + fis = new FileInputStream(new File(resolveFile)); + properties.load(fis); + } catch (IOException e) { + throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e); + } finally { + try { + if (null != fis) { + fis.close(); + } + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } + } + resolve = properties.getProperty(interfaceName); + } + } + if (resolve != null && resolve.length() > 0) { + url = resolve; + if (logger.isWarnEnabled()) { + if (resolveFile != null) { + logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service."); + } else { + logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service."); + } + } + } + } } diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java index 07d7d78fe8f..37539eb81e5 100644 --- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java +++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java @@ -310,9 +310,9 @@ protected synchronized void doExport() { if (path == null || path.length() == 0) { path = interfaceName; } - doExportUrls(); ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), ref, interfaceClass); ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel); + doExportUrls(); } private void checkRef() { diff --git a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/util/PropertySourcesUtils.java b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/util/PropertySourcesUtils.java index b61378729b7..28d43fc2179 100644 --- a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/util/PropertySourcesUtils.java +++ b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/util/PropertySourcesUtils.java @@ -55,7 +55,7 @@ public static Map getSubProperties(Iterable> p if (name.startsWith(normalizedPrefix)) { String subName = name.substring(normalizedPrefix.length()); String value = propertyResolver.getProperty(name); - subProperties.put(subName, value); + subProperties.putIfAbsent(subName, value); } } } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java index b299b2fd7e5..fcbea5fd3c3 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java @@ -49,12 +49,23 @@ public abstract class AbstractRetryTask implements TimerTask { /** * retry period */ - protected final long retryPeriod; + final long retryPeriod; + + /** + * define the most retry times + */ + private final int retryTimes; /** * task name for this task */ - protected final String taskName; + private final String taskName; + + /** + * times of retry. + * retry task is execute in single thread so that the times is not need volatile. + */ + private int times = 1; private volatile boolean cancel; @@ -67,6 +78,7 @@ public abstract class AbstractRetryTask implements TimerTask { this.taskName = taskName; cancel = false; this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); + this.retryTimes = url.getParameter(Constants.REGISTRY_RETRY_TIMES_KEY, Constants.DEFAULT_REGISTRY_RETRY_TIMES); } public void cancel() { @@ -86,7 +98,7 @@ protected void reput(Timeout timeout, long tick) { if (timer.isStop() || timeout.isCancelled() || isCancel()) { return; } - + times++; timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS); } @@ -96,6 +108,11 @@ public void run(Timeout timeout) throws Exception { // other thread cancel this timeout or stop the timer. return; } + if (times > retryTimes) { + // reach the most times of retry. + logger.warn("Final failed to execute task " + taskName + ", url: " + url + ", retry " + retryTimes + " times."); + return; + } if (logger.isInfoEnabled()) { logger.info(taskName + " : " + url); } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java index 5fa6cfe9a47..5ec3fbec44f 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java @@ -126,9 +126,9 @@ public static void sent(Channel channel, Request request) { * @param channel channel to close */ public static void closeChannel(Channel channel) { - for (long id : CHANNELS.keySet()) { - if (channel.equals(CHANNELS.get(id))) { - DefaultFuture future = getFuture(id); + for (Map.Entry entry: CHANNELS.entrySet()) { + if (channel.equals(entry.getValue())) { + DefaultFuture future = getFuture(entry.getKey()); if (future != null && !future.isDone()) { Response disconnectResponse = new Response(future.getId()); disconnectResponse.setStatus(Response.CHANNEL_INACTIVE); diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/telnet/support/command/LogTelnetHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/telnet/support/command/LogTelnetHandler.java index 121b9d3538e..bdbdebf6cfa 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/telnet/support/command/LogTelnetHandler.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/telnet/support/command/LogTelnetHandler.java @@ -52,7 +52,7 @@ public String telnet(Channel channel, String message) { if (!StringUtils.isInteger(str[0])) { LoggerFactory.setLevel(Level.valueOf(message.toUpperCase())); } else { - int SHOW_LOG_LENGTH = Integer.parseInt(str[0]); + int showLogLength = Integer.parseInt(str[0]); if (file != null && file.exists()) { try { @@ -62,12 +62,12 @@ public String telnet(Channel channel, String message) { try { size = filechannel.size(); ByteBuffer bb; - if (size <= SHOW_LOG_LENGTH) { + if (size <= showLogLength) { bb = ByteBuffer.allocate((int) size); filechannel.read(bb, 0); } else { - int pos = (int) (size - SHOW_LOG_LENGTH); - bb = ByteBuffer.allocate(SHOW_LOG_LENGTH); + int pos = (int) (size - showLogLength); + bb = ByteBuffer.allocate(showLogLength); filechannel.read(bb, pos); } bb.flip(); diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java index 5c0d9012c2c..2d026edecbd 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.Date; import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.fail; @@ -114,8 +115,8 @@ public void test_Decode_Error_MagicNum() throws IOException { inputBytes.put(new byte[]{MAGIC_HIGH, 0}, TelnetCodec.DecodeResult.NEED_MORE_INPUT); inputBytes.put(new byte[]{0, MAGIC_LOW}, TelnetCodec.DecodeResult.NEED_MORE_INPUT); - for (byte[] input : inputBytes.keySet()) { - testDecode_assertEquals(assemblyDataProtocol(input), inputBytes.get(input)); + for (Map.Entry entry: inputBytes.entrySet()) { + testDecode_assertEquals(assemblyDataProtocol(entry.getKey()), entry.getValue()); } } diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/TelnetCodecTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/TelnetCodecTest.java index 4a81121317a..47eb2212687 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/TelnetCodecTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/TelnetCodecTest.java @@ -35,6 +35,7 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.HashMap; +import java.util.Map; public class TelnetCodecTest { protected Codec2 codec; @@ -235,8 +236,8 @@ public void testDecode_WithExitByte() throws IOException { exitbytes.put(new byte[]{1, -1, -12, -1, -3, 6}, false); //must equal the bytes exitbytes.put(new byte[]{-1, -19, -1, -3, 6}, true); /* Linux Pause */ - for (byte[] exit : exitbytes.keySet()) { - testDecode_WithExitByte(exit, exitbytes.get(exit)); + for (Map.Entry entry : exitbytes.entrySet()) { + testDecode_WithExitByte(entry.getKey(), entry.getValue()); } } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/logging/MessageFormatter.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/logging/MessageFormatter.java index a138bcf6e60..f860875bce1 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/logging/MessageFormatter.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/logging/MessageFormatter.java @@ -175,8 +175,8 @@ static FormattingTuple arrayFormat(final String messagePattern, int j; StringBuffer sbuf = new StringBuffer(messagePattern.length() + 50); - int L; - for (L = 0; L < argArray.length; L++) { + int l; + for (l = 0; l < argArray.length; l++) { j = messagePattern.indexOf(DELIM_STR, i); @@ -194,7 +194,7 @@ static FormattingTuple arrayFormat(final String messagePattern, } else { if (isEscapedDelimeter(messagePattern, j)) { if (!isDoubleEscaped(messagePattern, j)) { - L--; // DELIM_START was escaped, thus should not be incremented + l--; // DELIM_START was escaped, thus should not be incremented sbuf.append(messagePattern.substring(i, j - 1)); sbuf.append(DELIM_START); i = j + 1; @@ -203,20 +203,20 @@ static FormattingTuple arrayFormat(final String messagePattern, // itself escaped: "abc x:\\{}" // we have to consume one backward slash sbuf.append(messagePattern.substring(i, j - 1)); - deeplyAppendParameter(sbuf, argArray[L], new HashMap()); + deeplyAppendParameter(sbuf, argArray[l], new HashMap()); i = j + 2; } } else { // normal case sbuf.append(messagePattern.substring(i, j)); - deeplyAppendParameter(sbuf, argArray[L], new HashMap()); + deeplyAppendParameter(sbuf, argArray[l], new HashMap()); i = j + 2; } } } // append the characters following the last {} pair. sbuf.append(messagePattern.substring(i, messagePattern.length())); - if (L < argArray.length - 1) { + if (l < argArray.length - 1) { return new FormattingTuple(sbuf.toString(), argArray, throwableCandidate); } else { return new FormattingTuple(sbuf.toString(), argArray, null); diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java index 3483a607056..9d3b7c4d506 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java @@ -19,7 +19,26 @@ import org.apache.dubbo.common.extension.SPI; /** + * Extension for intercepting the invocation for both service provider and consumer, furthermore, most of + * functions in dubbo are implemented base on the same mechanism. Since every time when remote method is + * invoked, the filter extensions will be executed too, the corresponding penalty should be considered before + * more filters are added. + *
+ *  They way filter work from sequence point of view is
+ *    
+ *    ...code before filter ...
+ *          invoker.invoke(invocation) //filter work in a filter implementation class
+ *          ...code after filter ...
+ *    
+ *    Caching is implemented in dubbo using filter approach. If cache is configured for invocation then before
+ *    remote call configured caching type's (e.g. Thread Local, JCache etc) implementation invoke method gets called.
+ * 
* Filter. (SPI, Singleton, ThreadSafe) + * + * @see org.apache.dubbo.rpc.filter.GenericFilter + * @see org.apache.dubbo.rpc.filter.EchoFilter + * @see org.apache.dubbo.rpc.filter.TokenFilter + * @see org.apache.dubbo.rpc.filter.TpsLimitFilter */ @SPI public interface Filter { diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcResult.java index 3f27420021d..dfec74c4fff 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcResult.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcResult.java @@ -16,6 +16,8 @@ */ package org.apache.dubbo.rpc; +import java.lang.reflect.Field; + /** * RPC Result. * @@ -39,6 +41,23 @@ public RpcResult(Throwable exception) { @Override public Object recreate() throws Throwable { if (exception != null) { + // fix issue#619 + try { + // get Throwable class + Class clazz = exception.getClass(); + while (!clazz.getName().equals(Throwable.class.getName())) { + clazz = clazz.getSuperclass(); + } + // get stackTrace value + Field stackTraceField = clazz.getDeclaredField("stackTrace"); + stackTraceField.setAccessible(true); + Object stackTrace = stackTraceField.get(exception); + if (stackTrace == null) { + exception.setStackTrace(new StackTraceElement[0]); + } + } catch (Exception e) { + // ignore + } throw exception; } return result; diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java index 8b20cc44934..de68434e155 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java @@ -16,12 +16,10 @@ */ package org.apache.dubbo.rpc; -import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -47,12 +45,6 @@ public class RpcStatus { private final AtomicLong failedMaxElapsed = new AtomicLong(); private final AtomicLong succeededMaxElapsed = new AtomicLong(); - /** - * Semaphore used to control concurrency limit set by `executes` - */ - private volatile Semaphore executesLimit; - private volatile int executesPermits; - private RpcStatus() { } @@ -109,16 +101,23 @@ public static void removeStatus(URL url, String methodName) { } } - /** - * @param url - */ public static void beginCount(URL url, String methodName) { - beginCount(getStatus(url)); - beginCount(getStatus(url, methodName)); + beginCount(url, methodName, Integer.MAX_VALUE); } - private static void beginCount(RpcStatus status) { - status.active.incrementAndGet(); + /** + * @param url + */ + public static boolean beginCount(URL url, String methodName, int max) { + RpcStatus appStatus = getStatus(url); + RpcStatus methodStatus = getStatus(url, methodName); + if (methodStatus.active.incrementAndGet() > max) { + methodStatus.active.decrementAndGet(); + return false; + } else { + appStatus.active.incrementAndGet(); + return true; + } } /** @@ -312,26 +311,5 @@ public long getAverageTps() { return getTotal(); } - /** - * Get the semaphore for thread number. Semaphore's permits is decided by {@link Constants#EXECUTES_KEY} - * - * @param maxThreadNum value of {@link Constants#EXECUTES_KEY} - * @return thread number semaphore - */ - public Semaphore getSemaphore(int maxThreadNum) { - if(maxThreadNum <= 0) { - return null; - } - if (executesLimit == null || executesPermits != maxThreadNum) { - synchronized (this) { - if (executesLimit == null || executesPermits != maxThreadNum) { - executesLimit = new Semaphore(maxThreadNum); - executesPermits = maxThreadNum; - } - } - } - - return executesLimit; - } -} \ No newline at end of file +} diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ActiveLimitFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ActiveLimitFilter.java index 7f7aff8024c..696840f6ceb 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ActiveLimitFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ActiveLimitFilter.java @@ -27,7 +27,16 @@ import org.apache.dubbo.rpc.RpcStatus; /** - * LimitInvokerFilter + * ActiveLimitFilter restrict the concurrent client invocation for a service or service's method from client side. + * To use active limit filter, configured url with actives and provide valid >0 integer value. + *
+ *     e.g. 
+ *      In the above example maximum 2 concurrent invocation is allowed.
+ *      If there are more than configured (in this example 2) is trying to invoke remote method, then rest of invocation
+ *      will wait for configured timeout(default is 0 second) before invocation gets kill by dubbo.
+ * 
+ * + * @see Filter */ @Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY) public class ActiveLimitFilter implements Filter { @@ -38,49 +47,44 @@ public Result invoke(Invoker invoker, Invocation invocation) throws RpcExcept String methodName = invocation.getMethodName(); int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0); RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); - if (max > 0) { + if (max > 0 && !RpcStatus.beginCount(url, methodName, max)) { long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0); long start = System.currentTimeMillis(); long remain = timeout; - int active = count.getActive(); - if (active >= max) { - synchronized (count) { - while ((active = count.getActive()) >= max) { - try { - count.wait(remain); - } catch (InterruptedException e) { - } - long elapsed = System.currentTimeMillis() - start; - remain = timeout - elapsed; - if (remain <= 0) { - throw new RpcException("Waiting concurrent invoke timeout in client-side for service: " - + invoker.getInterface().getName() + ", method: " - + invocation.getMethodName() + ", elapsed: " + elapsed - + ", timeout: " + timeout + ". concurrent invokes: " + active - + ". max concurrent invoke limit: " + max); - } + synchronized (count) { + while (!RpcStatus.beginCount(url, methodName, max)) { + try { + count.wait(remain); + } catch (InterruptedException e) { + // ignore + } + long elapsed = System.currentTimeMillis() - start; + remain = timeout - elapsed; + if (remain <= 0) { + throw new RpcException("Waiting concurrent invoke timeout in client-side for service: " + + invoker.getInterface().getName() + ", method: " + + invocation.getMethodName() + ", elapsed: " + elapsed + + ", timeout: " + timeout + ". concurrent invokes: " + count.getActive() + + ". max concurrent invoke limit: " + max); } } } } + + boolean isSuccess = true; + long begin = System.currentTimeMillis(); try { - long begin = System.currentTimeMillis(); - RpcStatus.beginCount(url, methodName); - try { - Result result = invoker.invoke(invocation); - RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true); - return result; - } catch (RuntimeException t) { - RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false); - throw t; - } + return invoker.invoke(invocation); + } catch (RuntimeException t) { + isSuccess = false; + throw t; } finally { + RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess); if (max > 0) { synchronized (count) { - count.notify(); + count.notifyAll(); } } } } - } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java index 17a6ca01f68..17d265ef6bb 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java @@ -25,7 +25,7 @@ import org.apache.dubbo.rpc.RpcException; /** - * ClassLoaderInvokerFilter + * Set the current execution thread class loader to service interface's class loader. */ @Activate(group = Constants.PROVIDER, order = -30000) public class ClassLoaderFilter implements Filter { diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/CompatibleFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/CompatibleFilter.java index aef65aa36be..ace832ca33c 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/CompatibleFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/CompatibleFilter.java @@ -32,7 +32,18 @@ import java.lang.reflect.Type; /** - * CompatibleFilter + * CompatibleFilter make the remote method's return value compatible to invoker's version of object. + * To make return object compatible it does + *
+ *    1)If the url contain serialization key of type json or fastjson then transform
+ *    the return value to instance of {@link java.util.Map}
+ *    2)If the return value is not a instance of invoked method's return type available at
+ *    local jvm then POJO conversion.
+ *    3)If return value is other than above return value as it is.
+ * 
+ * + * @see Filter + * */ public class CompatibleFilter implements Filter { @@ -51,9 +62,11 @@ public Result invoke(Invoker invoker, Invocation invocation) throws RpcExcept String serialization = invoker.getUrl().getParameter(Constants.SERIALIZATION_KEY); if ("json".equals(serialization) || "fastjson".equals(serialization)) { + // If the serialization key is json or fastjson Type gtype = method.getGenericReturnType(); newValue = PojoUtils.realize(value, type, gtype); } else if (!type.isInstance(value)) { + //if local service interface's method's return type is not instance of return value newValue = PojoUtils.isPojo(type) ? PojoUtils.realize(value, type) : CompatibleTypeUtils.compatibleTypeConvert(value, type); diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java index f852d7b9e96..ba5e77b2740 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java @@ -28,7 +28,12 @@ import org.apache.dubbo.rpc.RpcInvocation; /** - * ConsumerContextInvokerFilter + * ConsumerContextFilter set current RpcContext with invoker,invocation, local host, remote host and port + * for consumer invoker.It does it to make the requires info available to execution thread's RpcContext. + * + * @see org.apache.dubbo.rpc.Filter + * @see org.apache.dubbo.rpc.PostProcessFilter + * @see RpcContext */ @Activate(group = Constants.CONSUMER, order = -10000) public class ConsumerContextFilter extends AbstractPostProcessFilter { diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java index 2e7d0c5ae90..344419d1504 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java @@ -30,7 +30,11 @@ import java.util.Map; /** - * ContextInvokerFilter + * ContextFilter set the provider RpcContext with invoker, invocation, local port it is using and host for + * current execution thread. + * + * @see RpcContext + * @see org.apache.dubbo.rpc.PostProcessFilter */ @Activate(group = Constants.PROVIDER, order = -10000) public class ContextFilter extends AbstractPostProcessFilter { diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/DeprecatedFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/DeprecatedFilter.java index 14732ddbd22..95803cbd1a7 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/DeprecatedFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/DeprecatedFilter.java @@ -30,7 +30,10 @@ import java.util.Set; /** - * DeprecatedInvokerFilter + * DeprecatedFilter logs error message if a invoked method has been marked as deprecated. To check whether a method + * is deprecated or not it looks for deprecated attribute value and consider it is deprecated it value is true + * + * @see Filter */ @Activate(group = Constants.CONSUMER, value = Constants.DEPRECATED_KEY) public class DeprecatedFilter implements Filter { diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/EchoFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/EchoFilter.java index 5dfa8f0813e..cba9e7f4f46 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/EchoFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/EchoFilter.java @@ -26,7 +26,7 @@ import org.apache.dubbo.rpc.RpcResult; /** - * EchoInvokerFilter + * Dubbo provided default Echo echo service, which is available for all dubbo provider service interface. */ @Activate(group = Constants.PROVIDER, order = -110000) public class EchoFilter implements Filter { diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ExecuteLimitFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ExecuteLimitFilter.java index 17e05f785f2..d0995da7bd6 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ExecuteLimitFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ExecuteLimitFilter.java @@ -26,10 +26,11 @@ import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.RpcStatus; -import java.util.concurrent.Semaphore; - /** - * ThreadLimitInvokerFilter + * The maximum parallel execution request count per method per service for the provider.If the max configured + * executes is set to 10 and if invoke request where it is already 10 then it will throws exception. It + * continue the same behaviour un till it is <10. + * */ @Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY) public class ExecuteLimitFilter implements Filter { @@ -38,27 +39,17 @@ public class ExecuteLimitFilter implements Filter { public Result invoke(Invoker invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); - Semaphore executesLimit = null; - boolean acquireResult = false; int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0); - if (max > 0) { - RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName()); -// if (count.getActive() >= max) { - /** - * http://manzhizhen.iteye.com/blog/2386408 - * use semaphore for concurrency control (to limit thread number) - */ - executesLimit = count.getSemaphore(max); - if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) { - throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than limited."); - } + if (max > 0 && !RpcStatus.beginCount(url, methodName, max)) { + throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + + url + ", cause: The service using threads greater than limited."); } + long begin = System.currentTimeMillis(); boolean isSuccess = true; - RpcStatus.beginCount(url, methodName); try { - Result result = invoker.invoke(invocation); - return result; + return invoker.invoke(invocation); } catch (Throwable t) { isSuccess = false; if (t instanceof RuntimeException) { @@ -68,9 +59,6 @@ public Result invoke(Invoker invoker, Invocation invocation) throws RpcExcept } } finally { RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess); - if(acquireResult) { - executesLimit.release(); - } } } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TokenFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TokenFilter.java index 3b9695fb16f..35ef2aca526 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TokenFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TokenFilter.java @@ -29,7 +29,10 @@ import java.util.Map; /** - * TokenInvokerFilter + * Perform check whether given provider token is matching with remote token or not. If it does not match + * it will not allow to invoke remote method. + * + * @see Filter */ @Activate(group = Constants.PROVIDER, value = Constants.TOKEN_KEY) public class TokenFilter implements Filter { diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TpsLimitFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TpsLimitFilter.java index ad4137a0505..5ce78d141ee 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TpsLimitFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TpsLimitFilter.java @@ -28,8 +28,12 @@ import org.apache.dubbo.rpc.filter.tps.TPSLimiter; /** - * Limit TPS for either service or service's particular method - */ + * TpsLimitFilter limit the TPS (transaction per second) for all method of a service or a particular method. + * Service or method url can define tps or tps.interval to control this control.It use {@link DefaultTPSLimiter} + * as it limit checker. If a provider service method is configured with tps(optionally with tps.interval),then + * if invocation count exceed the configured tps value (default is -1 which means unlimited) then invocation will get + * RpcException. + * */ @Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY) public class TpsLimitFilter implements Filter { diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiter.java index 431a31f7ae2..71b579bc75d 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiter.java @@ -23,6 +23,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +/** + * DefaultTPSLimiter is a default implementation for tps filter. It is an in memory based implementation for stroring + * tps information. It internally use + * + * @see org.apache.dubbo.rpc.filter.TpsLimitFilter + */ public class DefaultTPSLimiter implements TPSLimiter { private final ConcurrentMap stats diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java index 6ef9ab524bd..2fcdaef82a8 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java @@ -18,6 +18,10 @@ import java.util.concurrent.atomic.AtomicInteger; +/** + * Judge whether a particular invocation of service provider method should be allowed within a configured time interval. + * As a state it contain name of key ( e.g. method), last invocation time, interval and rate count. + */ class StatItem { private String name; diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/TPSLimiter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/TPSLimiter.java index 7145a96cdf9..f104eb12c3a 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/TPSLimiter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/TPSLimiter.java @@ -19,6 +19,14 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.Invocation; +/** + * Provide boolean information whether a invocation of a provider service's methods or a particular method + * is allowed within a last invocation and current invocation. + *
+ *     e.g. if tps for a method m1 is 5 for a minute then if 6th call is made within the span of 1 minute then 6th
+ *     should not be allowed isAllowable will return false.
+ * 
+ */ public interface TPSLimiter { /** diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcResultTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcResultTest.java new file mode 100644 index 00000000000..8083a604ea0 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcResultTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc; + + +import org.junit.Assert; +import org.junit.Test; + +import static org.junit.Assert.fail; + +public class RpcResultTest { + @Test + public void testRecreateWithNormalException() { + NullPointerException npe = new NullPointerException(); + RpcResult rpcResult = new RpcResult(npe); + try { + rpcResult.recreate(); + fail(); + } catch (Throwable throwable) { + StackTraceElement[] stackTrace = throwable.getStackTrace(); + Assert.assertNotNull(stackTrace); + Assert.assertTrue(stackTrace.length > 1); + } + } + + /** + * please run this test in Run mode + */ + @Test + public void testRecreateWithEmptyStackTraceException() { + // begin to construct a NullPointerException with empty stackTrace + Throwable throwable = null; + Long begin = System.currentTimeMillis(); + while (System.currentTimeMillis() - begin < 60000) { + try { + ((Object) null).getClass(); + } catch (Exception e) { + if (e.getStackTrace().length == 0) { + throwable = e; + break; + } + } + } + /** + * may be there is -XX:-OmitStackTraceInFastThrow or run in Debug mode + */ + if (throwable == null) { + System.out.println("###testRecreateWithEmptyStackTraceException fail to construct NPE"); + return; + } + // end construct a NullPointerException with empty stackTrace + + RpcResult rpcResult = new RpcResult(throwable); + try { + rpcResult.recreate(); + fail(); + } catch (Throwable t) { + StackTraceElement[] stackTrace = t.getStackTrace(); + Assert.assertNotNull(stackTrace); + Assert.assertTrue(stackTrace.length == 0); + } + } +} diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ActiveLimitFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ActiveLimitFilterTest.java index b49e10fe25e..032bb8c8dc8 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ActiveLimitFilterTest.java +++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ActiveLimitFilterTest.java @@ -21,6 +21,7 @@ import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.RpcStatus; import org.apache.dubbo.rpc.support.BlockMyInvoker; import org.apache.dubbo.rpc.support.MockInvocation; import org.apache.dubbo.rpc.support.MyInvoker; @@ -28,7 +29,9 @@ import org.junit.Test; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; /** @@ -36,7 +39,6 @@ */ public class ActiveLimitFilterTest { - private static volatile int count = 0; Filter activeLimitFilter = new ActiveLimitFilter(); @Test @@ -57,6 +59,7 @@ public void testInvokeLessActives() { @Test public void testInvokeGreaterActives() { + AtomicInteger count = new AtomicInteger(0); URL url = URL.valueOf("test://test:11/test?accesslog=true&group=dubbo&version=1.1&actives=1&timeout=1"); final Invoker invoker = new BlockMyInvoker(url, 100); final Invocation invocation = new MockInvocation(); @@ -74,7 +77,7 @@ public void run() { try { activeLimitFilter.invoke(invoker, invocation); } catch (RpcException expected) { - count++; + count.incrementAndGet(); } } } @@ -88,6 +91,94 @@ public void run() { } catch (InterruptedException e) { e.printStackTrace(); } - assertNotSame(0, count); + assertNotSame(0, count.intValue()); } -} \ No newline at end of file + + @Test + public void testInvokeTimeOut() { + int totalThread = 100; + int maxActives = 10; + long timeout = 1; + long blockTime = 100; + AtomicInteger count = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch latchBlocking = new CountDownLatch(totalThread); + URL url = URL.valueOf("test://test:11/test?accesslog=true&group=dubbo&version=1.1&actives="+maxActives+"&timeout="+timeout); + final Invoker invoker = new BlockMyInvoker(url, blockTime); + final Invocation invocation = new MockInvocation(); + RpcStatus.removeStatus(url); + RpcStatus.removeStatus(url, invocation.getMethodName()); + for (int i = 0; i < totalThread; i++) { + Thread thread = new Thread(new Runnable() { + public void run() { + try{ + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + try { + activeLimitFilter.invoke(invoker, invocation); + } catch (RpcException expected) { + count.incrementAndGet(); + } + }finally { + latchBlocking.countDown(); + } + } + }); + thread.start(); + } + latch.countDown(); + + try { + latchBlocking.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + assertEquals(90, count.intValue()); + } + + @Test + public void testInvokeNotTimeOut() { + int totalThread = 100; + int maxActives = 10; + long timeout = 1000; + long blockTime = 0; + AtomicInteger count = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch latchBlocking = new CountDownLatch(totalThread); + URL url = URL.valueOf("test://test:11/test?accesslog=true&group=dubbo&version=1.1&actives="+maxActives+"&timeout="+timeout); + final Invoker invoker = new BlockMyInvoker(url, blockTime); + final Invocation invocation = new MockInvocation(); + for (int i = 0; i < totalThread; i++) { + Thread thread = new Thread(new Runnable() { + public void run() { + try{ + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + try { + activeLimitFilter.invoke(invoker, invocation); + } catch (RpcException expected) { + count.incrementAndGet(); + } + }finally { + latchBlocking.countDown(); + } + } + }); + thread.start(); + } + latch.countDown(); + + try { + latchBlocking.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + assertEquals(0, count.intValue()); + } +} diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java index 426fe91730c..aa5c02b4a90 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java @@ -131,7 +131,7 @@ private static String exportOrunexportCallbackService(Channel channel, URL url, * @param url */ @SuppressWarnings("unchecked") - private static Object referOrdestroyCallbackService(Channel channel, URL url, Class clazz, Invocation inv, int instid, boolean isRefer) { + private static Object referOrDestroyCallbackService(Channel channel, URL url, Class clazz, Invocation inv, int instid, boolean isRefer) { Object proxy = null; String invokerCacheKey = getServerSideCallbackInvokerCacheKey(channel, clazz.getName(), instid); String proxyCacheKey = getServerSideCallbackServiceCacheKey(channel, clazz.getName(), instid); @@ -150,7 +150,7 @@ private static Object referOrdestroyCallbackService(Channel channel, URL url, Cl increaseInstanceCount(channel, countkey); //convert error fail fast . - //ignore concurrent problem. + //ignore concurrent problem. Set> callbackInvokers = (Set>) channel.getAttribute(Constants.CHANNEL_CALLBACK_KEY); if (callbackInvokers == null) { callbackInvokers = new ConcurrentHashSet>(1); @@ -280,14 +280,14 @@ public static Object decodeInvocationArgument(Channel channel, RpcInvocation inv return inObject; case CallbackServiceCodec.CALLBACK_CREATE: try { - return referOrdestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), true); + return referOrDestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), true); } catch (Exception e) { logger.error(e.getMessage(), e); throw new IOException(StringUtils.toString(e)); } case CallbackServiceCodec.CALLBACK_DESTROY: try { - return referOrdestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), false); + return referOrDestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), false); } catch (Exception e) { throw new IOException(StringUtils.toString(e)); } @@ -295,4 +295,4 @@ public static Object decodeInvocationArgument(Channel channel, RpcInvocation inv return inObject; } } -} \ No newline at end of file +}