Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detach Transport from TransportService #31727

Merged
merged 5 commits into from
Jul 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testScheduledPing() throws Exception {
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));

serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
serviceA.registerRequestHandler("internal:sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<TransportRequest.Empty>() {
@Override
public void messageReceived(TransportRequest.Empty request, TransportChannel channel, Task task) {
Expand All @@ -104,7 +104,7 @@ public void messageReceived(TransportRequest.Empty request, TransportChannel cha

int rounds = scaledRandomIntBetween(100, 5000);
for (int i = 0; i < rounds; i++) {
serviceB.submitRequest(nodeA, "sayHello",
serviceB.submitRequest(nodeA, "internal:sayHello",
TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(randomBoolean()).build(),
new TransportResponseHandler<TransportResponse.Empty>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -93,13 +94,20 @@ public void close() {
abstract void handleTransportDisconnect(DiscoveryNode node);

private class FDConnectionListener implements TransportConnectionListener {
@Override
public void onNodeConnected(DiscoveryNode node) {
}

@Override
public void onNodeDisconnected(DiscoveryNode node) {
handleTransportDisconnect(node);
AbstractRunnable runnable = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("failed to handle transport disconnect for node: {}", node);
}

@Override
protected void doRun() {
handleTransportDisconnect(node);
}
};
threadPool.generic().execute(runnable);
}
}

Expand Down
137 changes: 111 additions & 26 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.compress.Compressor;
Expand Down Expand Up @@ -98,10 +99,10 @@
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -205,7 +206,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected final NetworkService networkService;
protected final Set<ProfileSettings> profileSettings;

private volatile TransportService transportService;
private final DelegatingTransportConnectionListener transportListener = new DelegatingTransportConnectionListener();

private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
// node id to actual channel
Expand All @@ -225,12 +226,13 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected final ConnectionProfile defaultConnectionProfile;

private final ConcurrentMap<Long, HandshakeResponseHandler> pendingHandshakes = new ConcurrentHashMap<>();
private final AtomicLong requestIdGenerator = new AtomicLong();
private final CounterMetric numHandshakes = new CounterMetric();
private static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake";

private final MeanMetric readBytesMetric = new MeanMetric();
private final MeanMetric transmittedBytesMetric = new MeanMetric();
private volatile Map<String, RequestHandlerRegistry> requestHandlers = Collections.emptyMap();
private final ResponseHandlers responseHandlers = new ResponseHandlers();

public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
Expand Down Expand Up @@ -287,18 +289,28 @@ protected void doStart() {
}
}

@Override
public void addConnectionListener(TransportConnectionListener listener) {
transportListener.listeners.add(listener);
}

@Override
public boolean removeConnectionListener(TransportConnectionListener listener) {
return transportListener.listeners.remove(listener);
}

@Override
public CircuitBreaker getInFlightRequestBreaker() {
// We always obtain a fresh breaker to reflect changes to the breaker configuration.
return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
}

@Override
public void setTransportService(TransportService service) {
if (service.getRequestHandler(HANDSHAKE_ACTION_NAME) != null) {
throw new IllegalStateException(HANDSHAKE_ACTION_NAME + " is a reserved request handler and must not be registered");
public synchronized <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
if (requestHandlers.containsKey(reg.getAction())) {
throw new IllegalArgumentException("transport handlers for action " + reg.getAction() + " is already registered");
}
this.transportService = service;
requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
}

private static class HandshakeResponseHandler implements TransportResponseHandler<VersionHandshakeResponse> {
Expand Down Expand Up @@ -482,7 +494,7 @@ public void close() {
boolean block = lifecycle.stopped() && Transports.isTransportThread(Thread.currentThread()) == false;
CloseableChannel.closeChannels(channels, block);
} finally {
transportService.onConnectionClosed(this);
transportListener.onConnectionClosed(this);
}
}
}
Expand Down Expand Up @@ -538,7 +550,7 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
logger.debug("connected to node [{}]", node);
}
try {
transportService.onNodeConnected(node);
transportListener.onNodeConnected(node);
} finally {
if (nodeChannels.isClosed()) {
// we got closed concurrently due to a disconnect or some other event on the channel.
Expand All @@ -550,7 +562,7 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
// try to remove it first either way one of the two wins even if the callback has run before we even added the
// tuple to the map since in that case we remove it here again
if (connectedNodes.remove(node, nodeChannels)) {
transportService.onNodeDisconnected(node);
transportListener.onNodeDisconnected(node);
}
throw new NodeNotConnectedException(node, "connection concurrently closed");
}
Expand Down Expand Up @@ -652,7 +664,7 @@ public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile c
// At this point we should construct the connection, notify the transport service, and attach close listeners to the
// underlying channels.
nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
transportService.onConnectionOpened(nodeChannels);
transportListener.onConnectionOpened(nodeChannels);
final NodeChannels finalNodeChannels = nodeChannels;
final AtomicBoolean runOnce = new AtomicBoolean(false);
Consumer<TcpChannel> onClose = c -> {
Expand Down Expand Up @@ -695,7 +707,7 @@ private void disconnectFromNodeCloseAndNotify(DiscoveryNode node, NodeChannels n
if (closeLock.readLock().tryLock()) {
try {
if (connectedNodes.remove(node, nodeChannels)) {
transportService.onNodeDisconnected(node);
transportListener.onNodeDisconnected(node);
}
} finally {
closeLock.readLock().unlock();
Expand All @@ -722,7 +734,7 @@ public void disconnectFromNode(DiscoveryNode node) {
} finally {
closeLock.readLock().unlock();
if (nodeChannels != null) { // if we found it and removed it we close and notify
IOUtils.closeWhileHandlingException(nodeChannels, () -> transportService.onNodeDisconnected(node));
IOUtils.closeWhileHandlingException(nodeChannels, () -> transportListener.onNodeDisconnected(node));
}
}
}
Expand Down Expand Up @@ -979,7 +991,7 @@ protected final void doStop() {
Map.Entry<DiscoveryNode, NodeChannels> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
transportService.onNodeDisconnected(next.getKey());
transportListener.onNodeDisconnected(next.getKey());
} finally {
iterator.remove();
}
Expand Down Expand Up @@ -1133,7 +1145,7 @@ private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel cha
final TransportRequestOptions finalOptions = options;
// this might be called in a different thread
SendListener onRequestSent = new SendListener(channel, stream,
() -> transportService.onRequestSent(node, requestId, action, request, finalOptions), message.length());
() -> transportListener.onRequestSent(node, requestId, action, request, finalOptions), message.length());
internalSendMessage(channel, message, onRequestSent);
addedReleaseListener = true;
} finally {
Expand Down Expand Up @@ -1187,7 +1199,7 @@ public void sendErrorResponse(
final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length());
CompositeBytesReference message = new CompositeBytesReference(header, bytes);
SendListener onResponseSent = new SendListener(channel, null,
() -> transportService.onResponseSent(requestId, action, error), message.length());
() -> transportListener.onResponseSent(requestId, action, error), message.length());
internalSendMessage(channel, message, onResponseSent);
}
}
Expand Down Expand Up @@ -1236,7 +1248,7 @@ private void sendResponse(
final TransportResponseOptions finalOptions = options;
// this might be called in a different thread
SendListener listener = new SendListener(channel, stream,
() -> transportService.onResponseSent(requestId, action, response, finalOptions), message.length());
() -> transportListener.onResponseSent(requestId, action, response, finalOptions), message.length());
internalSendMessage(channel, message, listener);
addedReleaseListener = true;
} finally {
Expand Down Expand Up @@ -1492,7 +1504,7 @@ public final void messageReceived(BytesReference reference, TcpChannel channel)
if (isHandshake) {
handler = pendingHandshakes.remove(requestId);
} else {
TransportResponseHandler theHandler = transportService.onResponseReceived(requestId);
TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, transportListener);
if (theHandler == null && TransportStatus.isError(status)) {
handler = pendingHandshakes.remove(requestId);
} else {
Expand Down Expand Up @@ -1599,15 +1611,15 @@ protected String handleRequest(TcpChannel channel, String profileName, final Str
features = Collections.emptySet();
}
final String action = stream.readString();
transportService.onRequestReceived(requestId, action);
transportListener.onRequestReceived(requestId, action);
TransportChannel transportChannel = null;
try {
if (TransportStatus.isHandshake(status)) {
final VersionHandshakeResponse response = new VersionHandshakeResponse(getCurrentVersion());
sendResponse(version, features, channel, response, requestId, HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY,
TransportStatus.setHandshake((byte) 0));
} else {
final RequestHandlerRegistry reg = transportService.getRequestHandler(action);
final RequestHandlerRegistry reg = getRequestHandler(action);
if (reg == null) {
throw new ActionNotFoundTransportException(action);
}
Expand Down Expand Up @@ -1714,7 +1726,7 @@ public void writeTo(StreamOutput out) throws IOException {
protected Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout)
throws IOException, InterruptedException {
numHandshakes.inc();
final long requestId = newRequestId();
final long requestId = responseHandlers.newRequestId();
final HandshakeResponseHandler handler = new HandshakeResponseHandler(channel);
AtomicReference<Version> versionRef = handler.versionRef;
AtomicReference<Exception> exceptionRef = handler.exceptionRef;
Expand Down Expand Up @@ -1764,11 +1776,6 @@ final long getNumHandshakes() {
return numHandshakes.count(); // for testing
}

@Override
public long newRequestId() {
return requestIdGenerator.incrementAndGet();
}

/**
* Called once the channel is closed for instance due to a disconnect or a closed socket etc.
*/
Expand Down Expand Up @@ -1912,4 +1919,82 @@ public ProfileSettings(Settings settings, String profileName) {
PUBLISH_PORT_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
}
}

private static final class DelegatingTransportConnectionListener implements TransportConnectionListener {
private final List<TransportConnectionListener> listeners = new CopyOnWriteArrayList<>();

@Override
public void onRequestReceived(long requestId, String action) {
for (TransportConnectionListener listener : listeners) {
listener.onRequestReceived(requestId, action);
}
}

@Override
public void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {
for (TransportConnectionListener listener : listeners) {
listener.onResponseSent(requestId, action, response, finalOptions);
}
}

@Override
public void onResponseSent(long requestId, String action, Exception error) {
for (TransportConnectionListener listener : listeners) {
listener.onResponseSent(requestId, action, error);
}
}

@Override
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions finalOptions) {
for (TransportConnectionListener listener : listeners) {
listener.onRequestSent(node, requestId, action, request, finalOptions);
}
}

@Override
public void onNodeDisconnected(DiscoveryNode key) {
for (TransportConnectionListener listener : listeners) {
listener.onNodeDisconnected(key);
}
}

@Override
public void onConnectionOpened(Connection nodeChannels) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionOpened(nodeChannels);
}
}

@Override
public void onNodeConnected(DiscoveryNode node) {
for (TransportConnectionListener listener : listeners) {
listener.onNodeConnected(node);
}
}

@Override
public void onConnectionClosed(Connection nodeChannels) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionClosed(nodeChannels);
}
}

@Override
public void onResponseReceived(long requestId, ResponseContext holder) {
for (TransportConnectionListener listener : listeners) {
listener.onResponseReceived(requestId, holder);
}
}
}

@Override
public final ResponseHandlers getResponseHandlers() {
return responseHandlers;
}

@Override
public final RequestHandlerRegistry getRequestHandler(String action) {
return requestHandlers.get(action);
}
}
Loading