Skip to content

Commit

Permalink
Merge pull request #1 from apache/master
Browse files Browse the repository at this point in the history
Update from upstream
  • Loading branch information
khanimteyaz authored Dec 6, 2018
2 parents 133d570 + 628ad77 commit 4704218
Show file tree
Hide file tree
Showing 19 changed files with 131 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
Expand All @@ -32,6 +33,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* Abstract implementation of Directory: Invoker list returned from this Directory's list method have been filtered by Routers
Expand Down Expand Up @@ -62,7 +64,14 @@ public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
this.url = url;

if (url.getProtocol().equals(Constants.REGISTRY_PROTOCOL)) {
Map<String, String> queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
this.url = url.clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
} else {
this.url = url;
}

this.consumerUrl = consumerUrl;
setRouters(routers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,48 +26,75 @@

/**
* LeastActiveLoadBalance
* <p>
* Filter the number of invokers with the least number of active calls and count the weights and quantities of these invokers.
* If there is only one invoker, use the invoker directly;
* if there are multiple invokers and the weights are not the same, then random according to the total weight;
* if there are multiple invokers and the same weight, then randomly called.
*/
public class LeastActiveLoadBalance extends AbstractLoadBalance {

public static final String NAME = "leastactive";

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int leastActive = -1; // The least active value of all invokers
int leastCount = 0; // The number of invokers having the same least active value (leastActive)
int[] leastIndexes = new int[length]; // The index of invokers having the same least active value (leastActive)
int totalWeight = 0; // The sum of with warmup weights
int firstWeight = 0; // Initial value, used for comparision
boolean sameWeight = true; // Every invoker has the same weight value?
// Number of invokers
int length = invokers.size();
// The least active value of all invokers
int leastActive = -1;
// The number of invokers having the same least active value (leastActive)
int leastCount = 0;
// The index of invokers having the same least active value (leastActive)
int[] leastIndexes = new int[length];
// The sum of the warmup weights of all the least active invokes
int totalWeight = 0;
// The weight of the first least active invoke
int firstWeight = 0;
// Every least active invoker has the same weight value?
boolean sameWeight = true;

// Filter out all the least active invokers
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
// Get the active number of the invoke
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// Get the weight of the invoke configuration. The default value is 100.
int afterWarmup = getWeight(invoker, invocation);
if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
leastActive = active; // Record the current least active value
leastCount = 1; // Reset leastCount, count again based on current leastCount
leastIndexes[0] = i; // Reset
totalWeight = afterWarmup; // Reset
firstWeight = afterWarmup; // Record the weight the first invoker
sameWeight = true; // Reset, every invoker has the same weight value?
} else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
leastIndexes[leastCount++] = i; // Record index number of this invoker
totalWeight += afterWarmup; // Add this invoker's with warmup weight to totalWeight.
// If it is the first invoker or the active number of the invoker is less than the current least active number
if (leastActive == -1 || active < leastActive) {
// Reset the active number of the current invoker to the least active number
leastActive = active;
// Reset the number of least active invokers
leastCount = 1;
// Put the first least active invoker first in leastIndexs
leastIndexes[0] = i;
// Reset totalWeight
totalWeight = afterWarmup;
// Record the weight the first least active invoker
firstWeight = afterWarmup;
// Each invoke has the same weight (only one invoker here)
sameWeight = true;
// If current invoker's active value equals with leaseActive, then accumulating.
} else if (active == leastActive) {
// Record the index of the least active invoker in leastIndexs order
leastIndexes[leastCount++] = i;
// Accumulate the total weight of the least active invoker
totalWeight += afterWarmup;
// If every invoker has the same weight?
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// assert(leastCount > 0)
// Choose an invoker from all the least active invokers
if (leastCount == 1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexes[0]);
}
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on
// totalWeight.
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight) + 1;
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
Expand All @@ -81,4 +108,4 @@ protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* ConditionRouter
*
*/
public class ConditionRouter implements Router, Comparable<Router> {
public class ConditionRouter implements Router {

private static final Logger logger = LoggerFactory.getLogger(ConditionRouter.class);
private static Pattern ROUTE_PATTERN = Pattern.compile("([&!=,]*)\\s*([^&!=,\\s]+)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
/**
* TagRouter
*/
public class TagRouter implements Router, Comparable<Router> {
public class TagRouter implements Router {

private static final Logger logger = LoggerFactory.getLogger(TagRouter.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,27 @@ public FailoverClusterInvoker(Directory<T> directory) {
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
copyInvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
checkInvokers(copyInvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Expand All @@ -83,7 +83,7 @@ public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, L
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
Expand All @@ -104,7 +104,7 @@ public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, L
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,20 @@
package org.apache.dubbo.rpc.cluster.router.script;


import java.util.ArrayList;
import java.util.List;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.Router;
import org.apache.dubbo.rpc.cluster.router.MockInvoker;

import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import javax.script.ScriptException;
import java.util.ArrayList;
import java.util.List;

public class ScriptRouterTest {

private URL SCRIPT_URL = URL.valueOf("script://javascript?type=javascript");
Expand Down Expand Up @@ -85,5 +83,48 @@ public void testRoutePickInvokers() {
Assert.assertEquals(invoker3, filteredInvokers.get(1));
}

//TODO Add tests for abnormal scene
@Test
public void testRouteHostFilter() {
List<Invoker<String>> invokers = new ArrayList<Invoker<String>>();
MockInvoker<String> invoker1 = new MockInvoker<String>(URL.valueOf("dubbo://10.134.108.1:20880/com.dubbo.HelloService"));
MockInvoker<String> invoker2 = new MockInvoker<String>(URL.valueOf("dubbo://10.134.108.2:20880/com.dubbo.HelloService"));
MockInvoker<String> invoker3 = new MockInvoker<String>(URL.valueOf("dubbo://10.134.108.3:20880/com.dubbo.HelloService"));
invokers.add(invoker1);
invokers.add(invoker2);
invokers.add(invoker3);

String script = "function route(invokers, invocation, context){ " +
" var result = new java.util.ArrayList(invokers.size()); " +
" var targetHost = new java.util.ArrayList(); " +
" targetHost.add(\"10.134.108.2\"); " +
" for (var i = 0; i < invokers.length; i++) { " +
" if(targetHost.contains(invokers[i].getUrl().getHost())){ " +
" result.add(invokers[i]); " +
" } " +
" } " +
" return result; " +
"} " +
"route(invokers, invocation, context) ";

Router router = new ScriptRouterFactory().getRouter(getRouteUrl(script));
List<Invoker<String>> routeResult = router.route(invokers, invokers.get(0).getUrl(), new RpcInvocation());
Assert.assertEquals(1, routeResult.size());
Assert.assertEquals(invoker2,routeResult.get(0));
}

@Test
public void testRoute_throwException() {
List<Invoker<String>> invokers = new ArrayList<Invoker<String>>();
MockInvoker<String> invoker1 = new MockInvoker<String>(URL.valueOf("dubbo://10.134.108.1:20880/com.dubbo.HelloService"));
MockInvoker<String> invoker2 = new MockInvoker<String>(URL.valueOf("dubbo://10.134.108.2:20880/com.dubbo.HelloService"));
MockInvoker<String> invoker3 = new MockInvoker<String>(URL.valueOf("dubbo://10.134.108.3:20880/com.dubbo.HelloService"));
invokers.add(invoker1);
invokers.add(invoker2);
invokers.add(invoker3);

String script = "/";
Router router = new ScriptRouterFactory().getRouter(getRouteUrl(script));
List<Invoker<String>> routeResult = router.route(invokers, invokers.get(0).getUrl(), new RpcInvocation());
Assert.assertEquals(3, routeResult.size());
}
}
2 changes: 1 addition & 1 deletion dubbo-cluster/src/test/resources/log4j.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
<appender name="DUBBO" class="org.apache.dubbo.common.utils.DubboAppender">
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="[%d{dd/MM/yy hh:mm:ss:sss z}] %t %5p %c{2}: %m%n"/>
<param name="ConversionPattern" value="[%d{dd/MM/yy HH:mm:ss:SSS z}] %t %5p %c{2}: %m%n"/>
</layout>
</appender>
<root>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,8 @@ public boolean hasExtension(String name) {
if (name == null || name.length() == 0) {
throw new IllegalArgumentException("Extension name == null");
}
try {
this.getExtensionClass(name);
return true;
} catch (Throwable t) {
return false;
}
Class<?> c = this.getExtensionClass(name);
return c != null;
}

public Set<String> getSupportedExtensions() {
Expand Down Expand Up @@ -565,11 +561,7 @@ private Class<?> getExtensionClass(String name) {
if (name == null) {
throw new IllegalArgumentException("Extension name == null");
}
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw new IllegalStateException("No such extension \"" + name + "\" for " + type.getName() + "!");
}
return clazz;
return getExtensionClasses().get(name);
}

private Map<String, Class<?>> getExtensionClasses() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.junit.Ignore;
import org.junit.Test;


Expand Down Expand Up @@ -87,6 +88,7 @@ public void run() {


@Test
@Ignore
public void testCustomExecutor() {
Executor mockedExecutor = mock(Executor.class);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
Expand Down
2 changes: 1 addition & 1 deletion dubbo-config/dubbo-config-api/src/test/resources/log4j.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="[%d{dd/MM/yy hh:mm:ss:sss z}] %t %5p %c{2}: %m%n"/>
<param name="ConversionPattern" value="[%d{dd/MM/yy HH:mm:ss:SSS z}] %t %5p %c{2}: %m%n"/>
</layout>
</appender>
<root>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="[%d{dd/MM/yy hh:mm:ss:sss z}] %t %5p %c{2}: %m%n"/>
<param name="ConversionPattern" value="[%d{dd/MM/yy HH:mm:ss:SSS z}] %t %5p %c{2}: %m%n"/>
</layout>
</appender>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="[%d{dd/MM/yy hh:mm:ss:sss z}] %t %5p %c{2}: %m%n"/>
<param name="ConversionPattern" value="[%d{dd/MM/yy HH:mm:ss:SSS z}] %t %5p %c{2}: %m%n"/>
</layout>
</appender>
<root>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d{dd/MM/yy hh:mm:ss:sss z}] %t %5p %c{2}: %m%n
log4j.appender.stdout.layout.ConversionPattern=[%d{dd/MM/yy HH:mm:ss:SSS z}] %t %5p %c{2}: %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d{dd/MM/yy hh:mm:ss:sss z}] %t %5p %c{2}: %m%n
log4j.appender.stdout.layout.ConversionPattern=[%d{dd/MM/yy HH:mm:ss:SSS z}] %t %5p %c{2}: %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -594,12 +593,6 @@ public List<Invoker<T>> doList(Invocation invocation) {
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
Expand Down
Loading

0 comments on commit 4704218

Please sign in to comment.