Skip to content

Commit

Permalink
Fix triple client connection shareing race condition (#14718)
Browse files Browse the repository at this point in the history
* fix triple client connection management issue, #14717, #14716

* fix triple client connection management issue, #14717, #14716

* add comment

* polish warning log format

* polish warning log format

* Make stub implement Destroyable

* format code
  • Loading branch information
chickenlj authored Sep 27, 2024
1 parent 5c011be commit e1cfc03
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.rpc.model.StubMethodDescriptor;
import org.apache.dubbo.rpc.model.StubServiceDescriptor;
import org.apache.dubbo.rpc.service.Destroyable;
import org.apache.dubbo.rpc.stub.BiStreamMethodHandler;
import org.apache.dubbo.rpc.stub.ServerStreamMethodHandler;
import org.apache.dubbo.rpc.stub.StubInvocationUtil;
Expand Down Expand Up @@ -130,13 +131,18 @@ public final class {{className}} {
{{/biStreamingWithoutClientStreamMethods}}
}

public static class {{interfaceClassName}}Stub implements {{interfaceClassName}}{
public static class {{interfaceClassName}}Stub implements {{interfaceClassName}}, Destroyable {
private final Invoker<{{interfaceClassName}}> invoker;

public {{interfaceClassName}}Stub(Invoker<{{interfaceClassName}}> invoker) {
this.invoker = invoker;
}

@Override
public void $destroy() {
invoker.destroy();
}

{{#unaryMethods}}
{{#javaDoc}}
{{{javaDoc}}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,17 @@ public final void increase() {
/**
* Increments the reference count by 1.
*/
public final AbstractConnectionClient retain() {
public final boolean retain() {
long oldCount = COUNTER_UPDATER.getAndIncrement(this);
if (oldCount <= 0) {
COUNTER_UPDATER.getAndDecrement(this);
throw new AssertionError("This instance has been destroyed");
logger.info(
"Retain failed, because connection " + remote
+ " has been destroyed but not yet removed, will create a new one instead."
+ " Check logs below to confirm that this connection finally gets removed to make sure there's no potential memory leak!");
return false;
}
return this;
return true;
}

/**
Expand All @@ -77,6 +81,7 @@ public boolean release() {
long remainingCount = COUNTER_UPDATER.decrementAndGet(this);

if (remainingCount == 0) {
logger.info("Destroying connection to {}, because the reference count reaches 0", remote);
destroy();
return true;
} else if (remainingCount <= -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.dubbo.remoting.api.connection;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.model.FrameworkModel;
Expand All @@ -26,6 +28,9 @@
import java.util.function.Consumer;

public class SingleProtocolConnectionManager implements ConnectionManager {
private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(SingleProtocolConnectionManager.class);

public static final String NAME = "single";

private final ConcurrentMap<String, AbstractConnectionClient> connections = new ConcurrentHashMap<>(16);
Expand All @@ -42,21 +47,34 @@ public AbstractConnectionClient connect(URL url, ChannelHandler handler) {
throw new IllegalArgumentException("url == null");
}
return connections.compute(url.getAddress(), (address, conn) -> {
String transport = url.getParameter(Constants.TRANSPORTER_KEY, "netty4");
if (conn == null) {
String transport = url.getParameter(Constants.TRANSPORTER_KEY, "netty4");
ConnectionManager manager = frameworkModel
.getExtensionLoader(ConnectionManager.class)
.getExtension(transport);
final AbstractConnectionClient connectionClient = manager.connect(url, handler);
connectionClient.addCloseListener(() -> connections.remove(address, connectionClient));
return connectionClient;
return createAbstractConnectionClient(url, handler, address, transport);
} else {
conn.retain();
boolean shouldReuse = conn.retain();
if (!shouldReuse) {
logger.info("Trying to create a new connection for {}.", address);
return createAbstractConnectionClient(url, handler, address, transport);
}
return conn;
}
});
}

private AbstractConnectionClient createAbstractConnectionClient(
URL url, ChannelHandler handler, String address, String transport) {
ConnectionManager manager =
frameworkModel.getExtensionLoader(ConnectionManager.class).getExtension(transport);
final AbstractConnectionClient connectionClient = manager.connect(url, handler);
connectionClient.addCloseListener(() -> {
logger.info(
"Remove closed connection (with reference count==0) for address {}, a new one will be created for upcoming RPC requests routing to this address.",
address);
connections.remove(address, connectionClient);
});
return connectionClient;
}

@Override
public void forEachConnection(Consumer<AbstractConnectionClient> connectionConsumer) {
connections.values().forEach(connectionConsumer);
Expand Down

0 comments on commit e1cfc03

Please sign in to comment.