diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java index 948082991c7..3bab435eb6c 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java @@ -143,7 +143,13 @@ public class Constants { public static final int DEFAULT_ALIVE = 60 * 1000; - public static final int DEFAULT_CONNECTIONS = 0; + /** + * By default, a consumer JVM instance and a provider JVM instance share a long TCP connection (except when connections are set), + * which can set the number of long TCP connections shared to avoid the bottleneck of sharing a single long TCP connection. + */ + public static final String DEFAULT_SHARE_CONNECTIONS = "1"; + + public static final String SHARE_CONNECTIONS_KEY = "shareconnections"; public static final int DEFAULT_ACCEPTS = 0; diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ConsumerConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ConsumerConfig.java index ce106550601..3a556e7f8c4 100644 --- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ConsumerConfig.java +++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ConsumerConfig.java @@ -57,6 +57,12 @@ public class ConsumerConfig extends AbstractReferenceConfig { */ private Integer queues; + /** + * By default, a TCP long-connection communication is shared between the consumer process and the provider process. + * This property can be set to share multiple TCP long-connection communications. Note that only the dubbo protocol takes effect. + */ + private Integer shareconnections; + @Override public void setTimeout(Integer timeout) { super.setTimeout(timeout); @@ -118,4 +124,12 @@ public Integer getQueues() { public void setQueues(Integer queues) { this.queues = queues; } + + public Integer getShareconnections() { + return shareconnections; + } + + public void setShareconnections(Integer shareconnections) { + this.shareconnections = shareconnections; + } } diff --git a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd index c5b890c7fc4..41af239deb5 100644 --- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd +++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd @@ -891,6 +891,12 @@ + + + + + + diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java index 4138398f235..2afdc4d0f02 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java @@ -174,17 +174,22 @@ public void send(Object message, boolean sent) throws RemotingException { } protected void connect() throws RemotingException { + connectLock.lock(); + try { + if (isConnected()) { return; } doConnect(); + if (!isConnected()) { throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: Connect wait timeout: " + getConnectTimeout() + "ms."); + } else { if (logger.isInfoEnabled()) { logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " @@ -192,12 +197,15 @@ protected void connect() throws RemotingException { + ", channel is " + this.getChannel()); } } + } catch (RemotingException e) { throw e; + } catch (Throwable e) { throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: " + e.getMessage(), e); + } finally { connectLock.unlock(); } @@ -241,11 +249,13 @@ public void reconnect() throws RemotingException { @Override public void close() { + try { super.close(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } + try { if (executor != null) { ExecutorUtil.shutdownNow(executor, 100); @@ -253,11 +263,13 @@ public void close() { } catch (Throwable e) { logger.warn(e.getMessage(), e); } + try { disconnect(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } + try { doClose(); } catch (Throwable e) { @@ -310,5 +322,4 @@ public String toString() { * @return channel */ protected abstract Channel getChannel(); - } diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index e9f42dd56f4..f284dd40dd6 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -22,7 +22,9 @@ import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.serialize.support.SerializableClassRegistry; import org.apache.dubbo.common.serialize.support.SerializationOptimizer; +import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.common.utils.ConfigUtils; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.remoting.Channel; @@ -49,11 +51,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; /** * dubbo protocol support. @@ -65,6 +69,7 @@ public class DubboProtocol extends AbstractProtocol { public static final int DEFAULT_PORT = 20880; private static final String IS_CALLBACK_SERVICE_INVOKE = "_isCallBackServiceInvoke"; private static DubboProtocol INSTANCE; + /** * */ @@ -72,8 +77,7 @@ public class DubboProtocol extends AbstractProtocol { /** * */ - private final Map referenceClientMap = new ConcurrentHashMap<>(); - private final ConcurrentMap ghostClientMap = new ConcurrentHashMap<>(); + private final Map> referenceClientMap = new ConcurrentHashMap<>(); private final ConcurrentMap locks = new ConcurrentHashMap<>(); private final Set optimizers = new ConcurrentHashSet<>(); /** @@ -81,55 +85,60 @@ public class DubboProtocol extends AbstractProtocol { * servicekey-stubmethods */ private final ConcurrentMap stubServiceMethodsMap = new ConcurrentHashMap<>(); + private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public CompletableFuture reply(ExchangeChannel channel, Object message) throws RemotingException { - if (message instanceof Invocation) { - Invocation inv = (Invocation) message; - Invoker invoker = getInvoker(channel, inv); - // need to consider backward-compatibility if it's a callback - if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { - String methodsStr = invoker.getUrl().getParameters().get("methods"); - boolean hasMethod = false; - if (methodsStr == null || !methodsStr.contains(",")) { - hasMethod = inv.getMethodName().equals(methodsStr); - } else { - String[] methods = methodsStr.split(","); - for (String method : methods) { - if (inv.getMethodName().equals(method)) { - hasMethod = true; - break; - } + + if (!(message instanceof Invocation)) { + throw new RemotingException(channel, "Unsupported request: " + + (message == null ? null : (message.getClass().getName() + ": " + message)) + + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); + } + + Invocation inv = (Invocation) message; + Invoker invoker = getInvoker(channel, inv); + // need to consider backward-compatibility if it's a callback + if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { + String methodsStr = invoker.getUrl().getParameters().get("methods"); + boolean hasMethod = false; + if (methodsStr == null || !methodsStr.contains(",")) { + hasMethod = inv.getMethodName().equals(methodsStr); + } else { + String[] methods = methodsStr.split(","); + for (String method : methods) { + if (inv.getMethodName().equals(method)) { + hasMethod = true; + break; } } - if (!hasMethod) { - logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() - + " not found in callback service interface ,invoke will be ignored." - + " please update the api interface. url is:" - + invoker.getUrl()) + " ,invocation is :" + inv); - return null; - } } - RpcContext rpcContext = RpcContext.getContext(); - rpcContext.setRemoteAddress(channel.getRemoteAddress()); - Result result = invoker.invoke(inv); - - if (result instanceof AsyncRpcResult) { - return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r); - } else { - return CompletableFuture.completedFuture(result); + if (!hasMethod) { + logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + + " not found in callback service interface ,invoke will be ignored." + + " please update the api interface. url is:" + + invoker.getUrl()) + " ,invocation is :" + inv); + return null; } } - throw new RemotingException(channel, "Unsupported request: " - + (message == null ? null : (message.getClass().getName() + ": " + message)) - + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); + RpcContext rpcContext = RpcContext.getContext(); + rpcContext.setRemoteAddress(channel.getRemoteAddress()); + Result result = invoker.invoke(inv); + + if (result instanceof AsyncRpcResult) { + return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r); + + } else { + return CompletableFuture.completedFuture(result); + } } @Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); + } else { super.received(channel, message); } @@ -164,6 +173,7 @@ private Invocation createInvocation(Channel channel, URL url, String methodKey) if (method == null || method.length() == 0) { return null; } + RpcInvocation invocation = new RpcInvocation(method, new Class[0], new Object[0]); invocation.setAttachment(Constants.PATH_KEY, url.getPath()); invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY)); @@ -172,6 +182,7 @@ private Invocation createInvocation(Channel channel, URL url, String methodKey) if (url.getParameter(Constants.STUB_EVENT_KEY, false)) { invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()); } + return invocation; } }; @@ -185,6 +196,7 @@ public static DubboProtocol getDubboProtocol() { // load ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(DubboProtocol.NAME); } + return INSTANCE; } @@ -213,24 +225,26 @@ Invoker getInvoker(Channel channel, Invocation inv) throws RemotingException boolean isStubServiceInvoke = false; int port = channel.getLocalAddress().getPort(); String path = inv.getAttachments().get(Constants.PATH_KEY); + // if it's callback service on client side isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY)); if (isStubServiceInvoke) { port = channel.getRemoteAddress().getPort(); } + //callback isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke; if (isCallBackServiceInvoke) { path += "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY); inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString()); } - String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); + String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); DubboExporter exporter = (DubboExporter) exporterMap.get(serviceKey); if (exporter == null) { - throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + - exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv); + throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv); } return exporter.getInvoker(); @@ -264,6 +278,7 @@ public Exporter export(Invoker invoker) throws RpcException { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } + } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } @@ -271,6 +286,7 @@ public Exporter export(Invoker invoker) throws RpcException { openServer(url); optimizeSerialization(url); + return exporter; } @@ -313,6 +329,7 @@ private ExchangeServer createServer(URL url) { } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } + str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); @@ -320,6 +337,7 @@ private ExchangeServer createServer(URL url) { throw new RpcException("Unsupported client type: " + str); } } + return server; } @@ -348,10 +366,13 @@ private void optimizeSerialization(URL url) throws RpcException { } optimizers.add(className); + } catch (ClassNotFoundException e) { throw new RpcException("Cannot find the serialization optimizer class: " + className, e); + } catch (InstantiationException e) { throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e); + } catch (IllegalAccessException e) { throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e); } @@ -360,65 +381,173 @@ private void optimizeSerialization(URL url) throws RpcException { @Override public Invoker refer(Class serviceType, URL url) throws RpcException { optimizeSerialization(url); + // create rpc invoker. DubboInvoker invoker = new DubboInvoker(serviceType, url, getClients(url), invokers); invokers.add(invoker); + return invoker; } private ExchangeClient[] getClients(URL url) { // whether to share connection - boolean serviceShareConnect = false; + + boolean useShareConnect = false; + int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); + List shareClients = null; // if not configured, connection is shared, otherwise, one connection for one service if (connections == 0) { - serviceShareConnect = true; - connections = 1; + useShareConnect = true; + + /** + * The xml configuration should have a higher priority than properties. + */ + String shareConnectionsStr = url.getParameter(Constants.SHARE_CONNECTIONS_KEY, (String) null); + connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(Constants.SHARE_CONNECTIONS_KEY, + Constants.DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr); + shareClients = getSharedClient(url, connections); } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { - if (serviceShareConnect) { - clients[i] = getSharedClient(url); + if (useShareConnect) { + clients[i] = shareClients.get(i); + } else { clients[i] = initClient(url); } } + return clients; } /** * Get shared connection + * + * @param url + * @param connectNum connectNum must be greater than or equal to 1 */ - private ExchangeClient getSharedClient(URL url) { + private List getSharedClient(URL url, int connectNum) { String key = url.getAddress(); - ReferenceCountExchangeClient client = referenceClientMap.get(key); - if (client != null) { - if (!client.isClosed()) { - client.incrementAndGetCount(); - return client; - } else { - referenceClientMap.remove(key); - } + List clients = referenceClientMap.get(key); + + if (checkClientCanUse(clients)) { + batchClientRefIncr(clients); + return clients; } locks.putIfAbsent(key, new Object()); synchronized (locks.get(key)) { - if (referenceClientMap.containsKey(key)) { - return referenceClientMap.get(key); + clients = referenceClientMap.get(key); + // dubbo check + if (checkClientCanUse(clients)) { + batchClientRefIncr(clients); + return clients; } - ExchangeClient exchangeClient = initClient(url); - client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); - referenceClientMap.put(key, client); - ghostClientMap.remove(key); + // connectNum must be greater than or equal to 1 + connectNum = Math.max(connectNum, 1); + + // If the clients is empty, then the first initialization is + if (CollectionUtils.isEmpty(clients)) { + clients = buildReferenceCountExchangeClientList(url, connectNum); + referenceClientMap.put(key, clients); + + } else { + for (int i = 0; i < clients.size(); i++) { + ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i); + // If there is a client in the list that is no longer available, create a new one to replace him. + if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) { + clients.set(i, buildReferenceCountExchangeClient(url)); + continue; + } + + referenceCountExchangeClient.incrementAndGetCount(); + } + } + + /** + * I understand that the purpose of the remove operation here is to avoid the expired url key + * always occupying this memory space. + */ locks.remove(key); - return client; + + return clients; + } + } + + /** + * Check if the client list is all available + * + * @param referenceCountExchangeClients + * @return true-available,false-unavailable + */ + private boolean checkClientCanUse(List referenceCountExchangeClients) { + if (CollectionUtils.isEmpty(referenceCountExchangeClients)) { + return false; + } + + for (ReferenceCountExchangeClient referenceCountExchangeClient : referenceCountExchangeClients) { + // As long as one client is not available, you need to replace the unavailable client with the available one. + if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) { + return false; + } } + + return true; + } + + /** + * Add client references in bulk + * + * @param referenceCountExchangeClients + */ + private void batchClientRefIncr(List referenceCountExchangeClients) { + if (CollectionUtils.isEmpty(referenceCountExchangeClients)) { + return; + } + + for (ReferenceCountExchangeClient referenceCountExchangeClient : referenceCountExchangeClients) { + if (referenceCountExchangeClient != null) { + referenceCountExchangeClient.incrementAndGetCount(); + } + } + } + + /** + * Bulk build client + * + * @param url + * @param connectNum + * @return + */ + private List buildReferenceCountExchangeClientList(URL url, int connectNum) { + List clients = new CopyOnWriteArrayList<>(); + + for (int i = 0; i < connectNum; i++) { + clients.add(buildReferenceCountExchangeClient(url)); + } + + return clients; + } + + /** + * Build a single client + * + * @param url + * @return + */ + private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) { + ExchangeClient exchangeClient = initClient(url); + + return new ReferenceCountExchangeClient(exchangeClient); } /** * Create new connection + * + * @param url */ private ExchangeClient initClient(URL url) { @@ -440,12 +569,15 @@ private ExchangeClient initClient(URL url) { // connection should be lazy if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { client = new LazyConnectExchangeClient(url, requestHandler); + } else { client = Exchangers.connect(url, requestHandler); } + } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } + return client; } @@ -453,46 +585,64 @@ private ExchangeClient initClient(URL url) { public void destroy() { for (String key : new ArrayList<>(serverMap.keySet())) { ExchangeServer server = serverMap.remove(key); - if (server != null) { - try { - if (logger.isInfoEnabled()) { - logger.info("Close dubbo server: " + server.getLocalAddress()); - } - server.close(ConfigurationUtils.getServerShutdownTimeout()); - } catch (Throwable t) { - logger.warn(t.getMessage(), t); + + if (server == null) { + continue; + } + + try { + if (logger.isInfoEnabled()) { + logger.info("Close dubbo server: " + server.getLocalAddress()); } + + server.close(ConfigurationUtils.getServerShutdownTimeout()); + + } catch (Throwable t) { + logger.warn(t.getMessage(), t); } } for (String key : new ArrayList<>(referenceClientMap.keySet())) { - ExchangeClient client = referenceClientMap.remove(key); - if (client != null) { - try { - if (logger.isInfoEnabled()) { - logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); - } - client.close(ConfigurationUtils.getServerShutdownTimeout()); - } catch (Throwable t) { - logger.warn(t.getMessage(), t); - } + List clients = referenceClientMap.remove(key); + + if (CollectionUtils.isEmpty(clients)) { + continue; } - } - for (String key : new ArrayList<>(ghostClientMap.keySet())) { - ExchangeClient client = ghostClientMap.remove(key); - if (client != null) { - try { - if (logger.isInfoEnabled()) { - logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); - } - client.close(ConfigurationUtils.getServerShutdownTimeout()); - } catch (Throwable t) { - logger.warn(t.getMessage(), t); - } + for (ReferenceCountExchangeClient client : clients) { + closeReferenceCountExchangeClient(client); } } + stubServiceMethodsMap.clear(); super.destroy(); } + + /** + * close ReferenceCountExchangeClient + * + * @param client + */ + private void closeReferenceCountExchangeClient(ReferenceCountExchangeClient client) { + if (client == null) { + return; + } + + try { + if (logger.isInfoEnabled()) { + logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); + } + + client.close(ConfigurationUtils.getServerShutdownTimeout()); + + // TODO + /** + * At this time, ReferenceCountExchangeClient#client has been replaced with LazyConnectExchangeClient. + * Do you need to call client.close again to ensure that LazyConnectExchangeClient is also closed? + */ + + } catch (Throwable t) { + logger.warn(t.getMessage(), t); + } + } } diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java index eaebb1985f3..f2bc4534fdc 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java @@ -65,7 +65,6 @@ public LazyConnectExchangeClient(URL url, ExchangeHandler requestHandler) { this.requestWithWarning = url.getParameter(REQUEST_WITH_WARNING_KEY, false); } - private void initClient() throws RemotingException { if (client != null) { return; diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java index 834711515f8..7a720480463 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java @@ -26,7 +26,6 @@ import org.apache.dubbo.remoting.exchange.ResponseFuture; import java.net.InetSocketAddress; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; /** @@ -38,19 +37,12 @@ final class ReferenceCountExchangeClient implements ExchangeClient { private final URL url; private final AtomicInteger referenceCount = new AtomicInteger(0); - // private final ExchangeHandler handler; - private final ConcurrentMap ghostClientMap; private ExchangeClient client; - - public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap ghostClientMap) { + public ReferenceCountExchangeClient(ExchangeClient client) { this.client = client; referenceCount.incrementAndGet(); this.url = client.getUrl(); - if (ghostClientMap == null) { - throw new IllegalStateException("ghostClientMap can not be null, url: " + url); - } - this.ghostClientMap = ghostClientMap; } @Override @@ -151,10 +143,12 @@ public void close(int timeout) { if (referenceCount.decrementAndGet() <= 0) { if (timeout == 0) { client.close(); + } else { client.close(timeout); } - client = replaceWithLazyClient(); + + replaceWithLazyClient(); } } @@ -163,8 +157,13 @@ public void startClose() { client.startClose(); } - // ghost client - private LazyConnectExchangeClient replaceWithLazyClient() { + /** + * when closing the client, the client needs to be set to LazyConnectExchangeClient, and if a new call is made, + * the client will "resurrect". + * + * @return + */ + private void replaceWithLazyClient() { // this is a defensive operation to avoid client is closed by accident, the initial state of the client is false URL lazyUrl = url.addParameter(Constants.LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.FALSE) .addParameter(Constants.RECONNECT_KEY, Boolean.FALSE) @@ -173,14 +172,12 @@ private LazyConnectExchangeClient replaceWithLazyClient() { .addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true) .addParameter("_client_memo", "referencecounthandler.replacewithlazyclient"); - String key = url.getAddress(); - // in worst case there's only one ghost connection. - LazyConnectExchangeClient gclient = ghostClientMap.get(key); - if (gclient == null || gclient.isClosed()) { - gclient = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler()); - ghostClientMap.put(key, gclient); + /** + * the order of judgment in the if statement cannot be changed. + */ + if (!(client instanceof LazyConnectExchangeClient) || client.isClosed()) { + client = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler()); } - return gclient; } @Override @@ -192,3 +189,4 @@ public void incrementAndGetCount() { referenceCount.incrementAndGet(); } } + diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java index c34f6d9f148..b77e5e18b3c 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java @@ -27,7 +27,6 @@ import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.ProxyFactory; import org.apache.dubbo.rpc.protocol.dubbo.support.ProtocolUtils; - import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -35,6 +34,12 @@ import org.junit.jupiter.api.Test; import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + public class ReferenceCountExchangeClientTest { @@ -80,7 +85,7 @@ public void setUp() throws Exception { */ @Test public void test_share_connect() { - init(0); + init(0, 1); Assertions.assertEquals(demoClient.getLocalAddress(), helloClient.getLocalAddress()); Assertions.assertEquals(demoClient, helloClient); destoy(); @@ -91,18 +96,43 @@ public void test_share_connect() { */ @Test public void test_not_share_connect() { - init(1); + init(1, 1); Assertions.assertNotSame(demoClient.getLocalAddress(), helloClient.getLocalAddress()); Assertions.assertNotSame(demoClient, helloClient); destoy(); } + /** + * test using multiple shared connections + */ + @Test + public void test_mult_share_connect() { + // here a three shared connection is established between a consumer process and a provider process. + final int shareConnectionNum = 3; + + init(0, shareConnectionNum); + + List helloReferenceClientList = getReferenceClientList(helloServiceInvoker); + Assertions.assertEquals(shareConnectionNum, helloReferenceClientList.size()); + + List demoReferenceClientList = getReferenceClientList(demoServiceInvoker); + Assertions.assertEquals(shareConnectionNum, demoReferenceClientList.size()); + + // because helloServiceInvoker and demoServiceInvoker use share connect, so client list must be equal + Assertions.assertTrue(Objects.equals(helloReferenceClientList, demoReferenceClientList)); + + Assertions.assertEquals(demoClient.getLocalAddress(), helloClient.getLocalAddress()); + Assertions.assertEquals(demoClient, helloClient); + + destoy(); + } + /** * test counter won't count down incorrectly when invoker is destroyed for multiple times */ @Test public void test_multi_destory() { - init(0); + init(0, 1); DubboAppender.doStart(); DubboAppender.clear(); demoServiceInvoker.destroy(); @@ -119,16 +149,19 @@ public void test_multi_destory() { */ @Test public void test_counter_error() { - init(0); + init(0, 1); DubboAppender.doStart(); DubboAppender.clear(); + // because the two interfaces are initialized, the ReferenceCountExchangeClient reference counter is 2 ReferenceCountExchangeClient client = getReferenceClient(helloServiceInvoker); + // close once, counter counts down from 2 to 1, no warning occurs client.close(); Assertions.assertEquals("hello", helloService.hello()); Assertions.assertEquals(0, LogUtil.findMessage(errorMsg), "should not warning message"); - // counter is incorrect, invocation still succeeds + + // generally a client can only be closed once, here it is closed twice, counter is incorrect client.close(); // wait close done. @@ -138,6 +171,7 @@ public void test_counter_error() { Assertions.fail(); } + // due to the effect of LazyConnectExchangeClient, the client will be "revived" whenever there is a call. Assertions.assertEquals("hello", helloService.hello()); Assertions.assertEquals(1, LogUtil.findMessage(errorMsg), "should warning message"); @@ -150,7 +184,17 @@ public void test_counter_error() { // status switch to available once invoke again Assertions.assertEquals(true, helloServiceInvoker.isAvailable(), "client status available"); + /** + * This is the third time to close the same client. Under normal circumstances, + * a client value should be closed once (that is, the shutdown operation is irreversible). + * After closing, the value of the reference counter of the client has become -1. + * + * But this is a bit special, because after the client is closed twice, there are several calls to helloService, + * that is, the client inside the ReferenceCountExchangeClient is actually active, so the third shutdown here is still effective, + * let the resurrection After the client is really closed. + */ client.close(); + // client has been replaced with lazy client. lazy client is fetched from referenceclientmap, and since it's // been invoked once, it's close status is false Assertions.assertEquals(false, client.isClosed(), "client status close"); @@ -159,10 +203,13 @@ public void test_counter_error() { } @SuppressWarnings("unchecked") - private void init(int connections) { + private void init(int connections, int shareConnections) { + Assertions.assertTrue(connections >= 0); + Assertions.assertTrue(shareConnections >= 1); + int port = NetUtils.getAvailablePort(); - URL demoUrl = URL.valueOf("dubbo://127.0.0.1:" + port + "/demo?" + Constants.CONNECTIONS_KEY + "=" + connections); - URL helloUrl = URL.valueOf("dubbo://127.0.0.1:" + port + "/hello?" + Constants.CONNECTIONS_KEY + "=" + connections); + URL demoUrl = URL.valueOf("dubbo://127.0.0.1:" + port + "/demo?" + Constants.CONNECTIONS_KEY + "=" + connections + "&" + Constants.SHARE_CONNECTIONS_KEY + "=" + shareConnections); + URL helloUrl = URL.valueOf("dubbo://127.0.0.1:" + port + "/hello?" + Constants.CONNECTIONS_KEY + "=" + connections + "&" + Constants.SHARE_CONNECTIONS_KEY + "=" + shareConnections); demoExporter = export(new DemoServiceImpl(), IDemoService.class, demoUrl); helloExporter = export(new HelloServiceImpl(), IHelloService.class, helloUrl); @@ -204,17 +251,42 @@ private ExchangeClient getClient(Invoker invoker) { } private ReferenceCountExchangeClient getReferenceClient(Invoker invoker) { - return (ReferenceCountExchangeClient) getInvokerClient(invoker); + return getReferenceClientList(invoker).get(0); + } + + private List getReferenceClientList(Invoker invoker) { + List invokerClientList = getInvokerClientList(invoker); + + List referenceCountExchangeClientList = new ArrayList<>(invokerClientList.size()); + for (ExchangeClient exchangeClient : invokerClientList) { + Assertions.assertTrue(exchangeClient instanceof ReferenceCountExchangeClient); + referenceCountExchangeClientList.add((ReferenceCountExchangeClient) exchangeClient); + } + + return referenceCountExchangeClientList; } private ExchangeClient getInvokerClient(Invoker invoker) { + return getInvokerClientList(invoker).get(0); + } + + private List getInvokerClientList(Invoker invoker) { @SuppressWarnings("rawtypes") DubboInvoker dInvoker = (DubboInvoker) invoker; try { Field clientField = DubboInvoker.class.getDeclaredField("clients"); clientField.setAccessible(true); ExchangeClient[] clients = (ExchangeClient[]) clientField.get(dInvoker); - return clients[0]; + + List clientList = new ArrayList(clients.length); + for (ExchangeClient client : clients) { + clientList.add(client); + } + + // sorting makes it easy to compare between lists + Collections.sort(clientList, Comparator.comparing(c -> Integer.valueOf(Objects.hashCode(c)))); + + return clientList; } catch (Exception e) { e.printStackTrace();