diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveLoadBalance.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveLoadBalance.java index 956ff7709fb..573e8a1c863 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveLoadBalance.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveLoadBalance.java @@ -16,7 +16,6 @@ */ package org.apache.dubbo.rpc.cluster.loadbalance; -import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; @@ -27,7 +26,6 @@ /** * LeastActiveLoadBalance - * */ public class LeastActiveLoadBalance extends AbstractLoadBalance { @@ -39,26 +37,26 @@ protected Invoker doSelect(List> invokers, URL url, Invocation int leastActive = -1; // The least active value of all invokers int leastCount = 0; // The number of invokers having the same least active value (leastActive) int[] leastIndexes = new int[length]; // The index of invokers having the same least active value (leastActive) - int totalWeight = 0; // The sum of weights + int totalWeight = 0; // The sum of with warmup weights int firstWeight = 0; // Initial value, used for comparision boolean sameWeight = true; // Every invoker has the same weight value? for (int i = 0; i < length; i++) { Invoker invoker = invokers.get(i); int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number - int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight + int afterWarmup = getWeight(invoker, invocation); if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value. leastActive = active; // Record the current least active value leastCount = 1; // Reset leastCount, count again based on current leastCount leastIndexes[0] = i; // Reset - totalWeight = weight; // Reset - firstWeight = weight; // Record the weight the first invoker + totalWeight = afterWarmup; // Reset + firstWeight = afterWarmup; // Record the weight the first invoker sameWeight = true; // Reset, every invoker has the same weight value? } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating. leastIndexes[leastCount++] = i; // Record index number of this invoker - totalWeight += weight; // Add this invoker's weight to totalWeight. + totalWeight += afterWarmup; // Add this invoker's with warmup weight to totalWeight. // If every invoker has the same weight? if (sameWeight && i > 0 - && weight != firstWeight) { + && afterWarmup != firstWeight) { sameWeight = false; } } @@ -70,7 +68,7 @@ protected Invoker doSelect(List> invokers, URL url, Invocation } if (!sameWeight && totalWeight > 0) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. - int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); + int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight) + 1; // Return a invoker based on the random value. for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexes[i]; diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveBalanceTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveBalanceTest.java index fd151bcdb2f..836994fdcc8 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveBalanceTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LeastActiveBalanceTest.java @@ -17,7 +17,6 @@ package org.apache.dubbo.rpc.cluster.loadbalance; import org.apache.dubbo.rpc.Invoker; - import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -39,4 +38,31 @@ public void testLeastActiveLoadBalance_select() { } } + @Test + public void testSelectByWeight() { + int sumInvoker1 = 0; + int sumInvoker2 = 0; + int loop = 10000; + + LeastActiveLoadBalance lb = new LeastActiveLoadBalance(); + for (int i = 0; i < loop; i++) { + Invoker selected = lb.select(weightInvokers, null, weightTestInvocation); + + if (selected.getUrl().getProtocol().equals("test1")) { + sumInvoker1++; + } + + if (selected.getUrl().getProtocol().equals("test2")) { + sumInvoker2++; + } + // never select invoker3 because it's active is more than invoker1 and invoker2 + Assert.assertTrue("select is not the least active one", !selected.getUrl().getProtocol().equals("test3")); + } + + // the sumInvoker1 : sumInvoker2 approximately equal to 1: 9 + System.out.println(sumInvoker1); + System.out.println(sumInvoker2); + + Assert.assertEquals("select failed!", sumInvoker1 + sumInvoker2, loop); + } } diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java index 877631f56e5..f9db9aeca09 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java @@ -21,8 +21,9 @@ import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.RpcInvocation; +import org.apache.dubbo.rpc.RpcStatus; import org.apache.dubbo.rpc.cluster.LoadBalance; - import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -50,6 +51,12 @@ public class LoadBalanceBaseTest { Invoker invoker4; Invoker invoker5; + RpcStatus weightTestRpcStatus1; + RpcStatus weightTestRpcStatus2; + RpcStatus weightTestRpcStatus3; + + RpcInvocation weightTestInvocation; + /** * @throws java.lang.Exception */ @@ -145,4 +152,55 @@ private static int calculateDefaultWarmupWeight(int uptime) { return AbstractLoadBalance.calculateWarmupWeight(uptime, Constants.DEFAULT_WARMUP, Constants.DEFAULT_WEIGHT); } + /*------------------------------------test invokers for weight---------------------------------------*/ + + protected List> weightInvokers = new ArrayList>(); + protected Invoker weightInvoker1; + protected Invoker weightInvoker2; + protected Invoker weightInvoker3; + + @Before + public void before() throws Exception { + weightInvoker1 = mock(Invoker.class); + weightInvoker2 = mock(Invoker.class); + weightInvoker3 = mock(Invoker.class); + + weightTestInvocation = new RpcInvocation(); + weightTestInvocation.setMethodName("test"); + + URL url1 = URL.valueOf("test1://0:1/DemoService"); + url1 = url1.addParameter(Constants.WEIGHT_KEY, 1); + url1 = url1.addParameter(weightTestInvocation.getMethodName() + "." + Constants.WEIGHT_KEY, 1); + url1 = url1.addParameter("active", 0); + + URL url2 = URL.valueOf("test2://0:9/DemoService"); + url2 = url2.addParameter(Constants.WEIGHT_KEY, 9); + url2 = url2.addParameter(weightTestInvocation.getMethodName() + "." + Constants.WEIGHT_KEY, 9); + url2 = url2.addParameter("active", 0); + + URL url3 = URL.valueOf("test3://1:6/DemoService"); + url3 = url3.addParameter(Constants.WEIGHT_KEY, 6); + url3 = url3.addParameter(weightTestInvocation.getMethodName() + "." + Constants.WEIGHT_KEY, 6); + url3 = url3.addParameter("active", 1); + + given(weightInvoker1.isAvailable()).willReturn(true); + given(weightInvoker1.getUrl()).willReturn(url1); + + given(weightInvoker2.isAvailable()).willReturn(true); + given(weightInvoker2.getUrl()).willReturn(url2); + + given(weightInvoker3.isAvailable()).willReturn(true); + given(weightInvoker3.getUrl()).willReturn(url3); + + weightInvokers.add(weightInvoker1); + weightInvokers.add(weightInvoker2); + weightInvokers.add(weightInvoker3); + + weightTestRpcStatus1 = RpcStatus.getStatus(weightInvoker1.getUrl(), weightTestInvocation.getMethodName()); + weightTestRpcStatus2 = RpcStatus.getStatus(weightInvoker2.getUrl(), weightTestInvocation.getMethodName()); + weightTestRpcStatus3 = RpcStatus.getStatus(weightInvoker3.getUrl(), weightTestInvocation.getMethodName()); + + // weightTestRpcStatus3 active is 1 + RpcStatus.beginCount(weightInvoker3.getUrl(), weightTestInvocation.getMethodName()); + } } \ No newline at end of file diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RandomLoadBalanceTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RandomLoadBalanceTest.java index 3f20291d3dc..d31a85d08d4 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RandomLoadBalanceTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RandomLoadBalanceTest.java @@ -18,7 +18,6 @@ import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcStatus; - import org.junit.Assert; import org.junit.Test; @@ -54,4 +53,35 @@ public void testRandomLoadBalanceSelect() { Assert.assertEquals(0, counter.get(invoker5).intValue()); } + @Test + public void testSelectByWeight() { + int sumInvoker1 = 0; + int sumInvoker2 = 0; + int sumInvoker3 = 0; + int loop = 10000; + + RandomLoadBalance lb = new RandomLoadBalance(); + for (int i = 0; i < loop; i++) { + Invoker selected = lb.select(weightInvokers, null, weightTestInvocation); + + if (selected.getUrl().getProtocol().equals("test1")) { + sumInvoker1++; + } + + if (selected.getUrl().getProtocol().equals("test2")) { + sumInvoker2++; + } + + if (selected.getUrl().getProtocol().equals("test3")) { + sumInvoker3++; + } + } + + // 1 : 9 : 6 + System.out.println(sumInvoker1); + System.out.println(sumInvoker2); + System.out.println(sumInvoker3); + Assert.assertEquals("select failed!", sumInvoker1 + sumInvoker2 + sumInvoker3, loop); + } + } diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalanceTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalanceTest.java index f3ae3518740..e10f69fea62 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalanceTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalanceTest.java @@ -17,7 +17,6 @@ package org.apache.dubbo.rpc.cluster.loadbalance; import org.apache.dubbo.rpc.Invoker; - import org.junit.Assert; import org.junit.Test; @@ -34,4 +33,36 @@ public void testRoundRobinLoadBalanceSelect() { Assert.assertTrue("abs diff should < 1", Math.abs(count - runs / (0f + invokers.size())) < 1f); } } + + @Test + public void testSelectByWeight() { + int sumInvoker1 = 0; + int sumInvoker2 = 0; + int sumInvoker3 = 0; + int loop = 10000; + + RoundRobinLoadBalance lb = new RoundRobinLoadBalance(); + for (int i = 0; i < loop; i++) { + Invoker selected = lb.select(weightInvokers, null, weightTestInvocation); + + if (selected.getUrl().getProtocol().equals("test1")) { + sumInvoker1++; + } + + if (selected.getUrl().getProtocol().equals("test2")) { + sumInvoker2++; + } + + if (selected.getUrl().getProtocol().equals("test3")) { + sumInvoker3++; + } + } + + // 1 : 9 : 6 + System.out.println(sumInvoker1); + System.out.println(sumInvoker2); + System.out.println(sumInvoker3); + Assert.assertEquals("select failed!", sumInvoker1 + sumInvoker2 + sumInvoker3, loop); + } + }