From d2b5914c359758d7339e028938bc62cad1183c0b Mon Sep 17 00:00:00 2001 From: yizhenqiang Date: Sun, 3 Feb 2019 16:35:21 +0800 Subject: [PATCH] Support multiple shared links (#2457) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * make dubbo support multiple shared links, upgrading RPC throughput * Fix compilation error * Fix compilation error * opti import * if add {} * checkstyle fail * fix getSharedClient referenceCount calculation error bug * 优化 import * Fix the problem that the getSharedClient thread is not safe * Fix the problem that the getSharedClient thread is not safe * Try fixing ci error, https://travis-ci.org/apache/incubator-dubbo/jobs/453185295 * 将DEFAULT_CONNECTIONS_KEY修改成SERVICE_CONNECTIONS_KEY * dubbo.xsd add shareconnections attribute, * Optimize code format * Fix mult connect ghost connect problem * format code * Remove the concept of ghostClientMap and ghost connection. In fact, ghostClient is LazyConnectExchangeClient. At present, the LazyConnectExchangeClient object is added directly in ReferenceCountExchangeClient to realize the mapping relationship with ReferenceCountExchangeClient. The relationship between previous ghostClient and url mapping is not applicable to the current new share. Multiple connections. * Optimize the ReferenceCountExchangeClient and remove the reference to the lazyConnectExchangeClient because it doesn't make much sense; add locks in the close operation of the AbstractClient, because connect, disconnect, and close should not be done at the same time. * format code * try remove close lock * Restore close method * Restore ReferenceCountExchangeClient reference to LazyConnectExchangeClient object * Optimize the logic of using the LazyConnectExchangeClient inside the ReferenceCountExchangeClient; Supplemental shared multi-connected unit test --- .../org/apache/dubbo/common/Constants.java | 8 +- .../apache/dubbo/config/ConsumerConfig.java | 14 + .../src/main/resources/META-INF/dubbo.xsd | 6 + .../remoting/transport/AbstractClient.java | 13 +- .../rpc/protocol/dubbo/DubboProtocol.java | 330 +++++++++++++----- .../dubbo/LazyConnectExchangeClient.java | 1 - .../dubbo/ReferenceCountExchangeClient.java | 36 +- .../ReferenceCountExchangeClientTest.java | 94 ++++- 8 files changed, 379 insertions(+), 123 deletions(-) diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java index e5ac1750a2e..11807313d90 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java @@ -146,7 +146,13 @@ public class Constants { public static final int DEFAULT_ALIVE = 60 * 1000; - public static final int DEFAULT_CONNECTIONS = 0; + /** + * By default, a consumer JVM instance and a provider JVM instance share a long TCP connection (except when connections are set), + * which can set the number of long TCP connections shared to avoid the bottleneck of sharing a single long TCP connection. + */ + public static final String DEFAULT_SHARE_CONNECTIONS = "1"; + + public static final String SHARE_CONNECTIONS_KEY = "shareconnections"; public static final int DEFAULT_ACCEPTS = 0; diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ConsumerConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ConsumerConfig.java index ce106550601..3a556e7f8c4 100644 --- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ConsumerConfig.java +++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ConsumerConfig.java @@ -57,6 +57,12 @@ public class ConsumerConfig extends AbstractReferenceConfig { */ private Integer queues; + /** + * By default, a TCP long-connection communication is shared between the consumer process and the provider process. + * This property can be set to share multiple TCP long-connection communications. Note that only the dubbo protocol takes effect. + */ + private Integer shareconnections; + @Override public void setTimeout(Integer timeout) { super.setTimeout(timeout); @@ -118,4 +124,12 @@ public Integer getQueues() { public void setQueues(Integer queues) { this.queues = queues; } + + public Integer getShareconnections() { + return shareconnections; + } + + public void setShareconnections(Integer shareconnections) { + this.shareconnections = shareconnections; + } } diff --git a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd index c5b890c7fc4..41af239deb5 100644 --- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd +++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd @@ -891,6 +891,12 @@ + + + + + + diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java index 4138398f235..2afdc4d0f02 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java @@ -174,17 +174,22 @@ public void send(Object message, boolean sent) throws RemotingException { } protected void connect() throws RemotingException { + connectLock.lock(); + try { + if (isConnected()) { return; } doConnect(); + if (!isConnected()) { throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: Connect wait timeout: " + getConnectTimeout() + "ms."); + } else { if (logger.isInfoEnabled()) { logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " @@ -192,12 +197,15 @@ protected void connect() throws RemotingException { + ", channel is " + this.getChannel()); } } + } catch (RemotingException e) { throw e; + } catch (Throwable e) { throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: " + e.getMessage(), e); + } finally { connectLock.unlock(); } @@ -241,11 +249,13 @@ public void reconnect() throws RemotingException { @Override public void close() { + try { super.close(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } + try { if (executor != null) { ExecutorUtil.shutdownNow(executor, 100); @@ -253,11 +263,13 @@ public void close() { } catch (Throwable e) { logger.warn(e.getMessage(), e); } + try { disconnect(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } + try { doClose(); } catch (Throwable e) { @@ -310,5 +322,4 @@ public String toString() { * @return channel */ protected abstract Channel getChannel(); - } diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index e9f42dd56f4..f284dd40dd6 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -22,7 +22,9 @@ import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.serialize.support.SerializableClassRegistry; import org.apache.dubbo.common.serialize.support.SerializationOptimizer; +import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.common.utils.ConfigUtils; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.remoting.Channel; @@ -49,11 +51,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; /** * dubbo protocol support. @@ -65,6 +69,7 @@ public class DubboProtocol extends AbstractProtocol { public static final int DEFAULT_PORT = 20880; private static final String IS_CALLBACK_SERVICE_INVOKE = "_isCallBackServiceInvoke"; private static DubboProtocol INSTANCE; + /** * */ @@ -72,8 +77,7 @@ public class DubboProtocol extends AbstractProtocol { /** * */ - private final Map referenceClientMap = new ConcurrentHashMap<>(); - private final ConcurrentMap ghostClientMap = new ConcurrentHashMap<>(); + private final Map> referenceClientMap = new ConcurrentHashMap<>(); private final ConcurrentMap locks = new ConcurrentHashMap<>(); private final Set optimizers = new ConcurrentHashSet<>(); /** @@ -81,55 +85,60 @@ public class DubboProtocol extends AbstractProtocol { * servicekey-stubmethods */ private final ConcurrentMap stubServiceMethodsMap = new ConcurrentHashMap<>(); + private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public CompletableFuture reply(ExchangeChannel channel, Object message) throws RemotingException { - if (message instanceof Invocation) { - Invocation inv = (Invocation) message; - Invoker invoker = getInvoker(channel, inv); - // need to consider backward-compatibility if it's a callback - if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { - String methodsStr = invoker.getUrl().getParameters().get("methods"); - boolean hasMethod = false; - if (methodsStr == null || !methodsStr.contains(",")) { - hasMethod = inv.getMethodName().equals(methodsStr); - } else { - String[] methods = methodsStr.split(","); - for (String method : methods) { - if (inv.getMethodName().equals(method)) { - hasMethod = true; - break; - } + + if (!(message instanceof Invocation)) { + throw new RemotingException(channel, "Unsupported request: " + + (message == null ? null : (message.getClass().getName() + ": " + message)) + + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); + } + + Invocation inv = (Invocation) message; + Invoker invoker = getInvoker(channel, inv); + // need to consider backward-compatibility if it's a callback + if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { + String methodsStr = invoker.getUrl().getParameters().get("methods"); + boolean hasMethod = false; + if (methodsStr == null || !methodsStr.contains(",")) { + hasMethod = inv.getMethodName().equals(methodsStr); + } else { + String[] methods = methodsStr.split(","); + for (String method : methods) { + if (inv.getMethodName().equals(method)) { + hasMethod = true; + break; } } - if (!hasMethod) { - logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() - + " not found in callback service interface ,invoke will be ignored." - + " please update the api interface. url is:" - + invoker.getUrl()) + " ,invocation is :" + inv); - return null; - } } - RpcContext rpcContext = RpcContext.getContext(); - rpcContext.setRemoteAddress(channel.getRemoteAddress()); - Result result = invoker.invoke(inv); - - if (result instanceof AsyncRpcResult) { - return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r); - } else { - return CompletableFuture.completedFuture(result); + if (!hasMethod) { + logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + + " not found in callback service interface ,invoke will be ignored." + + " please update the api interface. url is:" + + invoker.getUrl()) + " ,invocation is :" + inv); + return null; } } - throw new RemotingException(channel, "Unsupported request: " - + (message == null ? null : (message.getClass().getName() + ": " + message)) - + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); + RpcContext rpcContext = RpcContext.getContext(); + rpcContext.setRemoteAddress(channel.getRemoteAddress()); + Result result = invoker.invoke(inv); + + if (result instanceof AsyncRpcResult) { + return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r); + + } else { + return CompletableFuture.completedFuture(result); + } } @Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); + } else { super.received(channel, message); } @@ -164,6 +173,7 @@ private Invocation createInvocation(Channel channel, URL url, String methodKey) if (method == null || method.length() == 0) { return null; } + RpcInvocation invocation = new RpcInvocation(method, new Class[0], new Object[0]); invocation.setAttachment(Constants.PATH_KEY, url.getPath()); invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY)); @@ -172,6 +182,7 @@ private Invocation createInvocation(Channel channel, URL url, String methodKey) if (url.getParameter(Constants.STUB_EVENT_KEY, false)) { invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()); } + return invocation; } }; @@ -185,6 +196,7 @@ public static DubboProtocol getDubboProtocol() { // load ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(DubboProtocol.NAME); } + return INSTANCE; } @@ -213,24 +225,26 @@ Invoker getInvoker(Channel channel, Invocation inv) throws RemotingException boolean isStubServiceInvoke = false; int port = channel.getLocalAddress().getPort(); String path = inv.getAttachments().get(Constants.PATH_KEY); + // if it's callback service on client side isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY)); if (isStubServiceInvoke) { port = channel.getRemoteAddress().getPort(); } + //callback isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke; if (isCallBackServiceInvoke) { path += "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY); inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString()); } - String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); + String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); DubboExporter exporter = (DubboExporter) exporterMap.get(serviceKey); if (exporter == null) { - throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + - exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv); + throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv); } return exporter.getInvoker(); @@ -264,6 +278,7 @@ public Exporter export(Invoker invoker) throws RpcException { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } + } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } @@ -271,6 +286,7 @@ public Exporter export(Invoker invoker) throws RpcException { openServer(url); optimizeSerialization(url); + return exporter; } @@ -313,6 +329,7 @@ private ExchangeServer createServer(URL url) { } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } + str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); @@ -320,6 +337,7 @@ private ExchangeServer createServer(URL url) { throw new RpcException("Unsupported client type: " + str); } } + return server; } @@ -348,10 +366,13 @@ private void optimizeSerialization(URL url) throws RpcException { } optimizers.add(className); + } catch (ClassNotFoundException e) { throw new RpcException("Cannot find the serialization optimizer class: " + className, e); + } catch (InstantiationException e) { throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e); + } catch (IllegalAccessException e) { throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e); } @@ -360,65 +381,173 @@ private void optimizeSerialization(URL url) throws RpcException { @Override public Invoker refer(Class serviceType, URL url) throws RpcException { optimizeSerialization(url); + // create rpc invoker. DubboInvoker invoker = new DubboInvoker(serviceType, url, getClients(url), invokers); invokers.add(invoker); + return invoker; } private ExchangeClient[] getClients(URL url) { // whether to share connection - boolean serviceShareConnect = false; + + boolean useShareConnect = false; + int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); + List shareClients = null; // if not configured, connection is shared, otherwise, one connection for one service if (connections == 0) { - serviceShareConnect = true; - connections = 1; + useShareConnect = true; + + /** + * The xml configuration should have a higher priority than properties. + */ + String shareConnectionsStr = url.getParameter(Constants.SHARE_CONNECTIONS_KEY, (String) null); + connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(Constants.SHARE_CONNECTIONS_KEY, + Constants.DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr); + shareClients = getSharedClient(url, connections); } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { - if (serviceShareConnect) { - clients[i] = getSharedClient(url); + if (useShareConnect) { + clients[i] = shareClients.get(i); + } else { clients[i] = initClient(url); } } + return clients; } /** * Get shared connection + * + * @param url + * @param connectNum connectNum must be greater than or equal to 1 */ - private ExchangeClient getSharedClient(URL url) { + private List getSharedClient(URL url, int connectNum) { String key = url.getAddress(); - ReferenceCountExchangeClient client = referenceClientMap.get(key); - if (client != null) { - if (!client.isClosed()) { - client.incrementAndGetCount(); - return client; - } else { - referenceClientMap.remove(key); - } + List clients = referenceClientMap.get(key); + + if (checkClientCanUse(clients)) { + batchClientRefIncr(clients); + return clients; } locks.putIfAbsent(key, new Object()); synchronized (locks.get(key)) { - if (referenceClientMap.containsKey(key)) { - return referenceClientMap.get(key); + clients = referenceClientMap.get(key); + // dubbo check + if (checkClientCanUse(clients)) { + batchClientRefIncr(clients); + return clients; } - ExchangeClient exchangeClient = initClient(url); - client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); - referenceClientMap.put(key, client); - ghostClientMap.remove(key); + // connectNum must be greater than or equal to 1 + connectNum = Math.max(connectNum, 1); + + // If the clients is empty, then the first initialization is + if (CollectionUtils.isEmpty(clients)) { + clients = buildReferenceCountExchangeClientList(url, connectNum); + referenceClientMap.put(key, clients); + + } else { + for (int i = 0; i < clients.size(); i++) { + ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i); + // If there is a client in the list that is no longer available, create a new one to replace him. + if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) { + clients.set(i, buildReferenceCountExchangeClient(url)); + continue; + } + + referenceCountExchangeClient.incrementAndGetCount(); + } + } + + /** + * I understand that the purpose of the remove operation here is to avoid the expired url key + * always occupying this memory space. + */ locks.remove(key); - return client; + + return clients; + } + } + + /** + * Check if the client list is all available + * + * @param referenceCountExchangeClients + * @return true-available,false-unavailable + */ + private boolean checkClientCanUse(List referenceCountExchangeClients) { + if (CollectionUtils.isEmpty(referenceCountExchangeClients)) { + return false; + } + + for (ReferenceCountExchangeClient referenceCountExchangeClient : referenceCountExchangeClients) { + // As long as one client is not available, you need to replace the unavailable client with the available one. + if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) { + return false; + } } + + return true; + } + + /** + * Add client references in bulk + * + * @param referenceCountExchangeClients + */ + private void batchClientRefIncr(List referenceCountExchangeClients) { + if (CollectionUtils.isEmpty(referenceCountExchangeClients)) { + return; + } + + for (ReferenceCountExchangeClient referenceCountExchangeClient : referenceCountExchangeClients) { + if (referenceCountExchangeClient != null) { + referenceCountExchangeClient.incrementAndGetCount(); + } + } + } + + /** + * Bulk build client + * + * @param url + * @param connectNum + * @return + */ + private List buildReferenceCountExchangeClientList(URL url, int connectNum) { + List clients = new CopyOnWriteArrayList<>(); + + for (int i = 0; i < connectNum; i++) { + clients.add(buildReferenceCountExchangeClient(url)); + } + + return clients; + } + + /** + * Build a single client + * + * @param url + * @return + */ + private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) { + ExchangeClient exchangeClient = initClient(url); + + return new ReferenceCountExchangeClient(exchangeClient); } /** * Create new connection + * + * @param url */ private ExchangeClient initClient(URL url) { @@ -440,12 +569,15 @@ private ExchangeClient initClient(URL url) { // connection should be lazy if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { client = new LazyConnectExchangeClient(url, requestHandler); + } else { client = Exchangers.connect(url, requestHandler); } + } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } + return client; } @@ -453,46 +585,64 @@ private ExchangeClient initClient(URL url) { public void destroy() { for (String key : new ArrayList<>(serverMap.keySet())) { ExchangeServer server = serverMap.remove(key); - if (server != null) { - try { - if (logger.isInfoEnabled()) { - logger.info("Close dubbo server: " + server.getLocalAddress()); - } - server.close(ConfigurationUtils.getServerShutdownTimeout()); - } catch (Throwable t) { - logger.warn(t.getMessage(), t); + + if (server == null) { + continue; + } + + try { + if (logger.isInfoEnabled()) { + logger.info("Close dubbo server: " + server.getLocalAddress()); } + + server.close(ConfigurationUtils.getServerShutdownTimeout()); + + } catch (Throwable t) { + logger.warn(t.getMessage(), t); } } for (String key : new ArrayList<>(referenceClientMap.keySet())) { - ExchangeClient client = referenceClientMap.remove(key); - if (client != null) { - try { - if (logger.isInfoEnabled()) { - logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); - } - client.close(ConfigurationUtils.getServerShutdownTimeout()); - } catch (Throwable t) { - logger.warn(t.getMessage(), t); - } + List clients = referenceClientMap.remove(key); + + if (CollectionUtils.isEmpty(clients)) { + continue; } - } - for (String key : new ArrayList<>(ghostClientMap.keySet())) { - ExchangeClient client = ghostClientMap.remove(key); - if (client != null) { - try { - if (logger.isInfoEnabled()) { - logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); - } - client.close(ConfigurationUtils.getServerShutdownTimeout()); - } catch (Throwable t) { - logger.warn(t.getMessage(), t); - } + for (ReferenceCountExchangeClient client : clients) { + closeReferenceCountExchangeClient(client); } } + stubServiceMethodsMap.clear(); super.destroy(); } + + /** + * close ReferenceCountExchangeClient + * + * @param client + */ + private void closeReferenceCountExchangeClient(ReferenceCountExchangeClient client) { + if (client == null) { + return; + } + + try { + if (logger.isInfoEnabled()) { + logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); + } + + client.close(ConfigurationUtils.getServerShutdownTimeout()); + + // TODO + /** + * At this time, ReferenceCountExchangeClient#client has been replaced with LazyConnectExchangeClient. + * Do you need to call client.close again to ensure that LazyConnectExchangeClient is also closed? + */ + + } catch (Throwable t) { + logger.warn(t.getMessage(), t); + } + } } diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java index eaebb1985f3..f2bc4534fdc 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java @@ -65,7 +65,6 @@ public LazyConnectExchangeClient(URL url, ExchangeHandler requestHandler) { this.requestWithWarning = url.getParameter(REQUEST_WITH_WARNING_KEY, false); } - private void initClient() throws RemotingException { if (client != null) { return; diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java index 834711515f8..7a720480463 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java @@ -26,7 +26,6 @@ import org.apache.dubbo.remoting.exchange.ResponseFuture; import java.net.InetSocketAddress; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; /** @@ -38,19 +37,12 @@ final class ReferenceCountExchangeClient implements ExchangeClient { private final URL url; private final AtomicInteger referenceCount = new AtomicInteger(0); - // private final ExchangeHandler handler; - private final ConcurrentMap ghostClientMap; private ExchangeClient client; - - public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap ghostClientMap) { + public ReferenceCountExchangeClient(ExchangeClient client) { this.client = client; referenceCount.incrementAndGet(); this.url = client.getUrl(); - if (ghostClientMap == null) { - throw new IllegalStateException("ghostClientMap can not be null, url: " + url); - } - this.ghostClientMap = ghostClientMap; } @Override @@ -151,10 +143,12 @@ public void close(int timeout) { if (referenceCount.decrementAndGet() <= 0) { if (timeout == 0) { client.close(); + } else { client.close(timeout); } - client = replaceWithLazyClient(); + + replaceWithLazyClient(); } } @@ -163,8 +157,13 @@ public void startClose() { client.startClose(); } - // ghost client - private LazyConnectExchangeClient replaceWithLazyClient() { + /** + * when closing the client, the client needs to be set to LazyConnectExchangeClient, and if a new call is made, + * the client will "resurrect". + * + * @return + */ + private void replaceWithLazyClient() { // this is a defensive operation to avoid client is closed by accident, the initial state of the client is false URL lazyUrl = url.addParameter(Constants.LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.FALSE) .addParameter(Constants.RECONNECT_KEY, Boolean.FALSE) @@ -173,14 +172,12 @@ private LazyConnectExchangeClient replaceWithLazyClient() { .addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true) .addParameter("_client_memo", "referencecounthandler.replacewithlazyclient"); - String key = url.getAddress(); - // in worst case there's only one ghost connection. - LazyConnectExchangeClient gclient = ghostClientMap.get(key); - if (gclient == null || gclient.isClosed()) { - gclient = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler()); - ghostClientMap.put(key, gclient); + /** + * the order of judgment in the if statement cannot be changed. + */ + if (!(client instanceof LazyConnectExchangeClient) || client.isClosed()) { + client = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler()); } - return gclient; } @Override @@ -192,3 +189,4 @@ public void incrementAndGetCount() { referenceCount.incrementAndGet(); } } + diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java index c34f6d9f148..b77e5e18b3c 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java @@ -27,7 +27,6 @@ import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.ProxyFactory; import org.apache.dubbo.rpc.protocol.dubbo.support.ProtocolUtils; - import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -35,6 +34,12 @@ import org.junit.jupiter.api.Test; import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + public class ReferenceCountExchangeClientTest { @@ -80,7 +85,7 @@ public void setUp() throws Exception { */ @Test public void test_share_connect() { - init(0); + init(0, 1); Assertions.assertEquals(demoClient.getLocalAddress(), helloClient.getLocalAddress()); Assertions.assertEquals(demoClient, helloClient); destoy(); @@ -91,18 +96,43 @@ public void test_share_connect() { */ @Test public void test_not_share_connect() { - init(1); + init(1, 1); Assertions.assertNotSame(demoClient.getLocalAddress(), helloClient.getLocalAddress()); Assertions.assertNotSame(demoClient, helloClient); destoy(); } + /** + * test using multiple shared connections + */ + @Test + public void test_mult_share_connect() { + // here a three shared connection is established between a consumer process and a provider process. + final int shareConnectionNum = 3; + + init(0, shareConnectionNum); + + List helloReferenceClientList = getReferenceClientList(helloServiceInvoker); + Assertions.assertEquals(shareConnectionNum, helloReferenceClientList.size()); + + List demoReferenceClientList = getReferenceClientList(demoServiceInvoker); + Assertions.assertEquals(shareConnectionNum, demoReferenceClientList.size()); + + // because helloServiceInvoker and demoServiceInvoker use share connect, so client list must be equal + Assertions.assertTrue(Objects.equals(helloReferenceClientList, demoReferenceClientList)); + + Assertions.assertEquals(demoClient.getLocalAddress(), helloClient.getLocalAddress()); + Assertions.assertEquals(demoClient, helloClient); + + destoy(); + } + /** * test counter won't count down incorrectly when invoker is destroyed for multiple times */ @Test public void test_multi_destory() { - init(0); + init(0, 1); DubboAppender.doStart(); DubboAppender.clear(); demoServiceInvoker.destroy(); @@ -119,16 +149,19 @@ public void test_multi_destory() { */ @Test public void test_counter_error() { - init(0); + init(0, 1); DubboAppender.doStart(); DubboAppender.clear(); + // because the two interfaces are initialized, the ReferenceCountExchangeClient reference counter is 2 ReferenceCountExchangeClient client = getReferenceClient(helloServiceInvoker); + // close once, counter counts down from 2 to 1, no warning occurs client.close(); Assertions.assertEquals("hello", helloService.hello()); Assertions.assertEquals(0, LogUtil.findMessage(errorMsg), "should not warning message"); - // counter is incorrect, invocation still succeeds + + // generally a client can only be closed once, here it is closed twice, counter is incorrect client.close(); // wait close done. @@ -138,6 +171,7 @@ public void test_counter_error() { Assertions.fail(); } + // due to the effect of LazyConnectExchangeClient, the client will be "revived" whenever there is a call. Assertions.assertEquals("hello", helloService.hello()); Assertions.assertEquals(1, LogUtil.findMessage(errorMsg), "should warning message"); @@ -150,7 +184,17 @@ public void test_counter_error() { // status switch to available once invoke again Assertions.assertEquals(true, helloServiceInvoker.isAvailable(), "client status available"); + /** + * This is the third time to close the same client. Under normal circumstances, + * a client value should be closed once (that is, the shutdown operation is irreversible). + * After closing, the value of the reference counter of the client has become -1. + * + * But this is a bit special, because after the client is closed twice, there are several calls to helloService, + * that is, the client inside the ReferenceCountExchangeClient is actually active, so the third shutdown here is still effective, + * let the resurrection After the client is really closed. + */ client.close(); + // client has been replaced with lazy client. lazy client is fetched from referenceclientmap, and since it's // been invoked once, it's close status is false Assertions.assertEquals(false, client.isClosed(), "client status close"); @@ -159,10 +203,13 @@ public void test_counter_error() { } @SuppressWarnings("unchecked") - private void init(int connections) { + private void init(int connections, int shareConnections) { + Assertions.assertTrue(connections >= 0); + Assertions.assertTrue(shareConnections >= 1); + int port = NetUtils.getAvailablePort(); - URL demoUrl = URL.valueOf("dubbo://127.0.0.1:" + port + "/demo?" + Constants.CONNECTIONS_KEY + "=" + connections); - URL helloUrl = URL.valueOf("dubbo://127.0.0.1:" + port + "/hello?" + Constants.CONNECTIONS_KEY + "=" + connections); + URL demoUrl = URL.valueOf("dubbo://127.0.0.1:" + port + "/demo?" + Constants.CONNECTIONS_KEY + "=" + connections + "&" + Constants.SHARE_CONNECTIONS_KEY + "=" + shareConnections); + URL helloUrl = URL.valueOf("dubbo://127.0.0.1:" + port + "/hello?" + Constants.CONNECTIONS_KEY + "=" + connections + "&" + Constants.SHARE_CONNECTIONS_KEY + "=" + shareConnections); demoExporter = export(new DemoServiceImpl(), IDemoService.class, demoUrl); helloExporter = export(new HelloServiceImpl(), IHelloService.class, helloUrl); @@ -204,17 +251,42 @@ private ExchangeClient getClient(Invoker invoker) { } private ReferenceCountExchangeClient getReferenceClient(Invoker invoker) { - return (ReferenceCountExchangeClient) getInvokerClient(invoker); + return getReferenceClientList(invoker).get(0); + } + + private List getReferenceClientList(Invoker invoker) { + List invokerClientList = getInvokerClientList(invoker); + + List referenceCountExchangeClientList = new ArrayList<>(invokerClientList.size()); + for (ExchangeClient exchangeClient : invokerClientList) { + Assertions.assertTrue(exchangeClient instanceof ReferenceCountExchangeClient); + referenceCountExchangeClientList.add((ReferenceCountExchangeClient) exchangeClient); + } + + return referenceCountExchangeClientList; } private ExchangeClient getInvokerClient(Invoker invoker) { + return getInvokerClientList(invoker).get(0); + } + + private List getInvokerClientList(Invoker invoker) { @SuppressWarnings("rawtypes") DubboInvoker dInvoker = (DubboInvoker) invoker; try { Field clientField = DubboInvoker.class.getDeclaredField("clients"); clientField.setAccessible(true); ExchangeClient[] clients = (ExchangeClient[]) clientField.get(dInvoker); - return clients[0]; + + List clientList = new ArrayList(clients.length); + for (ExchangeClient client : clients) { + clientList.add(client); + } + + // sorting makes it easy to compare between lists + Collections.sort(clientList, Comparator.comparing(c -> Integer.valueOf(Objects.hashCode(c)))); + + return clientList; } catch (Exception e) { e.printStackTrace();