diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AbstractNioChannel.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AbstractNioChannel.java index e3dcbad024cb2..14e2365eb7e82 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AbstractNioChannel.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AbstractNioChannel.java @@ -79,7 +79,7 @@ public void closeFromSelector() throws IOException { selector.assertOnSelectorThread(); if (closeContext.isDone() == false) { try { - closeRawChannel(); + socketChannel.close(); closeContext.complete(null); } catch (IOException e) { closeContext.completeExceptionally(e); @@ -119,13 +119,13 @@ public void addCloseListener(BiConsumer listener) { closeContext.whenComplete(listener); } + @Override + public void close() { + getContext().closeChannel(); + } + // Package visibility for testing void setSelectionKey(SelectionKey selectionKey) { this.selectionKey = selectionKey; } - // Package visibility for testing - - void closeRawChannel() throws IOException { - socketChannel.close(); - } } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java index a5727d9ef597a..eb5194f21ef3b 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java @@ -67,7 +67,7 @@ protected void acceptChannel(NioServerSocketChannel nioServerChannel) throws IOE ChannelFactory channelFactory = nioServerChannel.getChannelFactory(); SocketSelector selector = selectorSupplier.get(); NioSocketChannel nioSocketChannel = channelFactory.acceptNioChannel(nioServerChannel, selector); - nioServerChannel.getAcceptContext().accept(nioSocketChannel); + nioServerChannel.getContext().acceptChannel(nioSocketChannel); } /** diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/BytesChannelContext.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/BytesChannelContext.java index 893c6986bdda7..5d77675aa4819 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/BytesChannelContext.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/BytesChannelContext.java @@ -26,25 +26,20 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -public class BytesChannelContext implements ChannelContext { +public class BytesChannelContext extends SocketChannelContext { - private final NioSocketChannel channel; private final ReadConsumer readConsumer; private final InboundChannelBuffer channelBuffer; private final LinkedList queued = new LinkedList<>(); private final AtomicBoolean isClosing = new AtomicBoolean(false); - private boolean peerClosed = false; - private boolean ioException = false; - public BytesChannelContext(NioSocketChannel channel, ReadConsumer readConsumer, InboundChannelBuffer channelBuffer) { - this.channel = channel; + public BytesChannelContext(NioSocketChannel channel, BiConsumer exceptionHandler, + ReadConsumer readConsumer, InboundChannelBuffer channelBuffer) { + super(channel, exceptionHandler); this.readConsumer = readConsumer; this.channelBuffer = channelBuffer; } - @Override - public void channelRegistered() throws IOException {} - @Override public int read() throws IOException { if (channelBuffer.getRemaining() == 0) { @@ -52,16 +47,9 @@ public int read() throws IOException { channelBuffer.ensureCapacity(channelBuffer.getCapacity() + 1); } - int bytesRead; - try { - bytesRead = channel.read(channelBuffer.sliceBuffersFrom(channelBuffer.getIndex())); - } catch (IOException ex) { - ioException = true; - throw ex; - } + int bytesRead = readFromChannel(channelBuffer.sliceBuffersFrom(channelBuffer.getIndex())); - if (bytesRead == -1) { - peerClosed = true; + if (bytesRead == 0) { return 0; } @@ -90,7 +78,6 @@ public void sendMessage(ByteBuffer[] buffers, BiConsumer listen return; } - // TODO: Eval if we will allow writes from sendMessage selector.queueWriteInChannelBuffer(writeOperation); } @@ -126,28 +113,38 @@ public void closeChannel() { @Override public boolean selectorShouldClose() { - return peerClosed || ioException || isClosing.get(); + return isPeerClosed() || hasIOException() || isClosing.get(); } @Override - public void closeFromSelector() { + public void closeFromSelector() throws IOException { channel.getSelector().assertOnSelectorThread(); - // Set to true in order to reject new writes before queuing with selector - isClosing.set(true); - channelBuffer.close(); - for (BytesWriteOperation op : queued) { - channel.getSelector().executeFailedListener(op.getListener(), new ClosedChannelException()); + if (channel.isOpen()) { + IOException channelCloseException = null; + try { + channel.closeFromSelector(); + } catch (IOException e) { + channelCloseException = e; + } + // Set to true in order to reject new writes before queuing with selector + isClosing.set(true); + channelBuffer.close(); + for (BytesWriteOperation op : queued) { + channel.getSelector().executeFailedListener(op.getListener(), new ClosedChannelException()); + } + queued.clear(); + if (channelCloseException != null) { + throw channelCloseException; + } } - queued.clear(); } private void singleFlush(BytesWriteOperation headOp) throws IOException { try { - int written = channel.write(headOp.getBuffersToWrite()); + int written = flushToChannel(headOp.getBuffersToWrite()); headOp.incrementIndex(written); } catch (IOException e) { channel.getSelector().executeFailedListener(headOp.getListener(), e); - ioException = true; throw e; } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelContext.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelContext.java index 10afd53621dd8..fa664484c1c59 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelContext.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelContext.java @@ -20,62 +20,26 @@ package org.elasticsearch.nio; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.function.BiConsumer; -/** - * This context should implement the specific logic for a channel. When a channel receives a notification - * that it is ready to perform certain operations (read, write, etc) the {@link ChannelContext} will be - * called. This context will need to implement all protocol related logic. Additionally, if any special - * close behavior is required, it should be implemented in this context. - * - * The only methods of the context that should ever be called from a non-selector thread are - * {@link #closeChannel()} and {@link #sendMessage(ByteBuffer[], BiConsumer)}. - */ public interface ChannelContext { - - void channelRegistered() throws IOException; - - int read() throws IOException; - - void sendMessage(ByteBuffer[] buffers, BiConsumer listener); - - void queueWriteOperation(WriteOperation writeOperation); - - void flushChannel() throws IOException; - - boolean hasQueuedWriteOps(); + /** + * This method cleans up any context resources that need to be released when a channel is closed. It + * should only be called by the selector thread. + * + * @throws IOException during channel / context close + */ + void closeFromSelector() throws IOException; /** * Schedules a channel to be closed by the selector event loop with which it is registered. - *

+ * * If the channel is open and the state can be transitioned to closed, the close operation will * be scheduled with the event loop. - *

- * If the channel is already set to closed, it is assumed that it is already scheduled to be closed. - *

+ * * Depending on the underlying protocol of the channel, a close operation might simply close the socket * channel or may involve reading and writing messages. */ void closeChannel(); - /** - * This method indicates if a selector should close this channel. - * - * @return a boolean indicating if the selector should close - */ - boolean selectorShouldClose(); - - /** - * This method cleans up any context resources that need to be released when a channel is closed. It - * should only be called by the selector thread. - * - * @throws IOException during channel / context close - */ - void closeFromSelector() throws IOException; - - @FunctionalInterface - interface ReadConsumer { - int consumeReads(InboundChannelBuffer channelBuffer) throws IOException; - } + void handleException(Exception e); } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java index a9909587453be..5fc3f46f998e6 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java @@ -89,7 +89,6 @@ private Socket internalCreateChannel(SocketSelector selector, SocketChannel rawC try { Socket channel = createChannel(selector, rawChannel); assert channel.getContext() != null : "channel context should have been set on channel"; - assert channel.getExceptionContext() != null : "exception handler should have been set on channel"; return channel; } catch (Exception e) { closeRawChannel(rawChannel, e); diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java index 42bc0555d509c..7cba9b998b311 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java @@ -69,7 +69,7 @@ protected void uncaughtException(Exception exception) { */ protected void handleClose(NioChannel channel) { try { - channel.closeFromSelector(); + channel.getContext().closeFromSelector(); } catch (IOException e) { closeException(channel, e); } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java index 07b6b68908bd1..f671b39d4d61b 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java @@ -59,6 +59,10 @@ public InboundChannelBuffer(Supplier pageSupplier) { ensureCapacity(PAGE_SIZE); } + public static InboundChannelBuffer allocatingInstance() { + return new InboundChannelBuffer(() -> new Page(ByteBuffer.allocate(PAGE_SIZE), () -> {})); + } + @Override public void close() { if (isClosed.compareAndSet(false, true)) { diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioChannel.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioChannel.java index 438c013ecd0aa..690e3d3b38bda 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioChannel.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioChannel.java @@ -32,6 +32,8 @@ public interface NioChannel { InetSocketAddress getLocalAddress(); + void close(); + void closeFromSelector() throws IOException; void register() throws ClosedChannelException; @@ -42,6 +44,8 @@ public interface NioChannel { NetworkChannel getRawChannel(); + ChannelContext getContext(); + /** * Adds a close listener to the channel. Multiple close listeners can be added. There is no guarantee * about the order in which close listeners will be executed. If the channel is already closed, the diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioServerSocketChannel.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioServerSocketChannel.java index 8eb904dc74179..3d1748e413ac7 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioServerSocketChannel.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioServerSocketChannel.java @@ -21,12 +21,13 @@ import java.io.IOException; import java.nio.channels.ServerSocketChannel; -import java.util.function.Consumer; +import java.util.concurrent.atomic.AtomicBoolean; public class NioServerSocketChannel extends AbstractNioChannel { private final ChannelFactory channelFactory; - private Consumer acceptContext; + private ServerChannelContext context; + private final AtomicBoolean contextSet = new AtomicBoolean(false); public NioServerSocketChannel(ServerSocketChannel socketChannel, ChannelFactory channelFactory, AcceptingSelector selector) throws IOException { @@ -39,17 +40,22 @@ public NioServerSocketChannel(ServerSocketChannel socketChannel, ChannelFactory< } /** - * This method sets the accept context for a server socket channel. The accept context is called when a - * new channel is accepted. The parameter passed to the context is the new channel. + * This method sets the context for a server socket channel. The context is called when a new channel is + * accepted, an exception occurs, or it is time to close the channel. * - * @param acceptContext to call + * @param context to call */ - public void setAcceptContext(Consumer acceptContext) { - this.acceptContext = acceptContext; + public void setContext(ServerChannelContext context) { + if (contextSet.compareAndSet(false, true)) { + this.context = context; + } else { + throw new IllegalStateException("Context on this channel were already set. It should only be once."); + } } - public Consumer getAcceptContext() { - return acceptContext; + @Override + public ServerChannelContext getContext() { + return context; } @Override diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioSocketChannel.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioSocketChannel.java index c9ea14446d935..aba98ff0cbff0 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioSocketChannel.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioSocketChannel.java @@ -19,13 +19,10 @@ package org.elasticsearch.nio; -import org.elasticsearch.nio.utils.ExceptionsHelper; - import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -import java.util.ArrayList; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; @@ -35,9 +32,8 @@ public class NioSocketChannel extends AbstractNioChannel { private final InetSocketAddress remoteAddress; private final CompletableFuture connectContext = new CompletableFuture<>(); private final SocketSelector socketSelector; - private final AtomicBoolean contextsSet = new AtomicBoolean(false); - private ChannelContext context; - private BiConsumer exceptionContext; + private final AtomicBoolean contextSet = new AtomicBoolean(false); + private SocketChannelContext context; private Exception connectException; public NioSocketChannel(SocketChannel socketChannel, SocketSelector selector) throws IOException { @@ -46,25 +42,6 @@ public NioSocketChannel(SocketChannel socketChannel, SocketSelector selector) th this.socketSelector = selector; } - @Override - public void closeFromSelector() throws IOException { - getSelector().assertOnSelectorThread(); - if (isOpen()) { - ArrayList closingExceptions = new ArrayList<>(2); - try { - super.closeFromSelector(); - } catch (IOException e) { - closingExceptions.add(e); - } - try { - context.closeFromSelector(); - } catch (IOException e) { - closingExceptions.add(e); - } - ExceptionsHelper.rethrowAndSuppress(closingExceptions); - } - } - @Override public SocketSelector getSelector() { return socketSelector; @@ -94,23 +71,19 @@ public int read(ByteBuffer[] buffers) throws IOException { } } - public void setContexts(ChannelContext context, BiConsumer exceptionContext) { - if (contextsSet.compareAndSet(false, true)) { + public void setContext(SocketChannelContext context) { + if (contextSet.compareAndSet(false, true)) { this.context = context; - this.exceptionContext = exceptionContext; } else { - throw new IllegalStateException("Contexts on this channel were already set. They should only be once."); + throw new IllegalStateException("Context on this channel were already set. It should only be once."); } } - public ChannelContext getContext() { + @Override + public SocketChannelContext getContext() { return context; } - public BiConsumer getExceptionContext() { - return exceptionContext; - } - public InetSocketAddress getRemoteAddress() { return remoteAddress; } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ServerChannelContext.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ServerChannelContext.java new file mode 100644 index 0000000000000..551cab48e0577 --- /dev/null +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ServerChannelContext.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.nio; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +public class ServerChannelContext implements ChannelContext { + + private final NioServerSocketChannel channel; + private final Consumer acceptor; + private final BiConsumer exceptionHandler; + private final AtomicBoolean isClosing = new AtomicBoolean(false); + + public ServerChannelContext(NioServerSocketChannel channel, Consumer acceptor, + BiConsumer exceptionHandler) { + this.channel = channel; + this.acceptor = acceptor; + this.exceptionHandler = exceptionHandler; + } + + public void acceptChannel(NioSocketChannel acceptedChannel) { + acceptor.accept(acceptedChannel); + } + + @Override + public void closeFromSelector() throws IOException { + channel.closeFromSelector(); + } + + @Override + public void closeChannel() { + if (isClosing.compareAndSet(false, true)) { + channel.getSelector().queueChannelClose(channel); + } + } + + @Override + public void handleException(Exception e) { + exceptionHandler.accept(channel, e); + } +} diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java new file mode 100644 index 0000000000000..62f82e8995d16 --- /dev/null +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java @@ -0,0 +1,129 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.nio; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.function.BiConsumer; + +/** + * This context should implement the specific logic for a channel. When a channel receives a notification + * that it is ready to perform certain operations (read, write, etc) the {@link SocketChannelContext} will + * be called. This context will need to implement all protocol related logic. Additionally, if any special + * close behavior is required, it should be implemented in this context. + * + * The only methods of the context that should ever be called from a non-selector thread are + * {@link #closeChannel()} and {@link #sendMessage(ByteBuffer[], BiConsumer)}. + */ +public abstract class SocketChannelContext implements ChannelContext { + + protected final NioSocketChannel channel; + private final BiConsumer exceptionHandler; + private boolean ioException; + private boolean peerClosed; + + protected SocketChannelContext(NioSocketChannel channel, BiConsumer exceptionHandler) { + this.channel = channel; + this.exceptionHandler = exceptionHandler; + } + + @Override + public void handleException(Exception e) { + exceptionHandler.accept(channel, e); + } + + public void channelRegistered() throws IOException {} + + public abstract int read() throws IOException; + + public abstract void sendMessage(ByteBuffer[] buffers, BiConsumer listener); + + public abstract void queueWriteOperation(WriteOperation writeOperation); + + public abstract void flushChannel() throws IOException; + + public abstract boolean hasQueuedWriteOps(); + + /** + * This method indicates if a selector should close this channel. + * + * @return a boolean indicating if the selector should close + */ + public abstract boolean selectorShouldClose(); + + protected boolean hasIOException() { + return ioException; + } + + protected boolean isPeerClosed() { + return peerClosed; + } + + protected int readFromChannel(ByteBuffer buffer) throws IOException { + try { + int bytesRead = channel.read(buffer); + if (bytesRead < 0) { + peerClosed = true; + bytesRead = 0; + } + return bytesRead; + } catch (IOException e) { + ioException = true; + throw e; + } + } + + protected int readFromChannel(ByteBuffer[] buffers) throws IOException { + try { + int bytesRead = channel.read(buffers); + if (bytesRead < 0) { + peerClosed = true; + bytesRead = 0; + } + return bytesRead; + } catch (IOException e) { + ioException = true; + throw e; + } + } + + protected int flushToChannel(ByteBuffer buffer) throws IOException { + try { + return channel.write(buffer); + } catch (IOException e) { + ioException = true; + throw e; + } + } + + protected int flushToChannel(ByteBuffer[] buffers) throws IOException { + try { + return channel.write(buffers); + } catch (IOException e) { + ioException = true; + throw e; + } + } + + @FunctionalInterface + public interface ReadConsumer { + int consumeReads(InboundChannelBuffer channelBuffer) throws IOException; + } +} diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java index d5977cee851ed..b1192f11eb120 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java @@ -44,7 +44,7 @@ public SocketEventHandler(Logger logger) { * @param channel that was registered */ protected void handleRegistration(NioSocketChannel channel) throws IOException { - ChannelContext context = channel.getContext(); + SocketChannelContext context = channel.getContext(); context.channelRegistered(); if (context.hasQueuedWriteOps()) { SelectionKeyUtils.setConnectReadAndWriteInterested(channel); @@ -61,7 +61,7 @@ protected void handleRegistration(NioSocketChannel channel) throws IOException { */ protected void registrationException(NioSocketChannel channel, Exception exception) { logger.debug(() -> new ParameterizedMessage("failed to register socket channel: {}", channel), exception); - exceptionCaught(channel, exception); + channel.getContext().handleException(exception); } /** @@ -82,7 +82,7 @@ protected void handleConnect(NioSocketChannel channel) { */ protected void connectException(NioSocketChannel channel, Exception exception) { logger.debug(() -> new ParameterizedMessage("failed to connect to socket channel: {}", channel), exception); - exceptionCaught(channel, exception); + channel.getContext().handleException(exception); } /** @@ -103,7 +103,7 @@ protected void handleRead(NioSocketChannel channel) throws IOException { */ protected void readException(NioSocketChannel channel, Exception exception) { logger.debug(() -> new ParameterizedMessage("exception while reading from socket channel: {}", channel), exception); - exceptionCaught(channel, exception); + channel.getContext().handleException(exception); } /** @@ -113,7 +113,7 @@ protected void readException(NioSocketChannel channel, Exception exception) { * @param channel that can be written to */ protected void handleWrite(NioSocketChannel channel) throws IOException { - ChannelContext channelContext = channel.getContext(); + SocketChannelContext channelContext = channel.getContext(); channelContext.flushChannel(); } @@ -125,20 +125,7 @@ protected void handleWrite(NioSocketChannel channel) throws IOException { */ protected void writeException(NioSocketChannel channel, Exception exception) { logger.debug(() -> new ParameterizedMessage("exception while writing to socket channel: {}", channel), exception); - exceptionCaught(channel, exception); - } - - /** - * This method is called when handling an event from a channel fails due to an unexpected exception. - * An example would be if checking ready ops on a {@link java.nio.channels.SelectionKey} threw - * {@link java.nio.channels.CancelledKeyException}. - * - * @param channel that caused the exception - * @param exception that was thrown - */ - protected void genericChannelException(NioChannel channel, Exception exception) { - super.genericChannelException(channel, exception); - exceptionCaught((NioSocketChannel) channel, exception); + channel.getContext().handleException(exception); } /** @@ -167,8 +154,4 @@ protected void postHandling(NioSocketChannel channel) { } } } - - private void exceptionCaught(NioSocketChannel channel, Exception e) { - channel.getExceptionContext().accept(channel, e); - } } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java index e35aa7b4d226b..2de48fb8899e2 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java @@ -122,7 +122,7 @@ public void queueWrite(WriteOperation writeOperation) { public void queueWriteInChannelBuffer(WriteOperation writeOperation) { assertOnSelectorThread(); NioSocketChannel channel = writeOperation.getChannel(); - ChannelContext context = channel.getContext(); + SocketChannelContext context = channel.getContext(); try { SelectionKeyUtils.setWriteInterested(channel); context.queueWriteOperation(writeOperation); diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/WriteOperation.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/WriteOperation.java index 09800d981bd2d..d2dfe4f37a007 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/WriteOperation.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/WriteOperation.java @@ -24,7 +24,7 @@ /** * This is a basic write operation that can be queued with a channel. The only requirements of a write * operation is that is has a listener and a reference to its channel. The actual conversion of the write - * operation implementation to bytes will be performed by the {@link ChannelContext}. + * operation implementation to bytes will be performed by the {@link SocketChannelContext}. */ public interface WriteOperation { diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java index 1f51fdc2017ae..23ab3bb3e1d62 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java @@ -27,8 +27,6 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; -import java.util.function.BiConsumer; -import java.util.function.Consumer; import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; @@ -41,21 +39,21 @@ public class AcceptorEventHandlerTests extends ESTestCase { private SocketSelector socketSelector; private ChannelFactory channelFactory; private NioServerSocketChannel channel; - private Consumer acceptedChannelCallback; + private ServerChannelContext context; @Before @SuppressWarnings("unchecked") public void setUpHandler() throws IOException { channelFactory = mock(ChannelFactory.class); socketSelector = mock(SocketSelector.class); - acceptedChannelCallback = mock(Consumer.class); + context = mock(ServerChannelContext.class); ArrayList selectors = new ArrayList<>(); selectors.add(socketSelector); handler = new AcceptorEventHandler(logger, new RoundRobinSupplier<>(selectors.toArray(new SocketSelector[selectors.size()]))); AcceptingSelector selector = mock(AcceptingSelector.class); channel = new DoNotRegisterServerChannel(mock(ServerSocketChannel.class), channelFactory, selector); - channel.setAcceptContext(acceptedChannelCallback); + channel.setContext(context); channel.register(); } @@ -80,11 +78,11 @@ public void testHandleAcceptCallsChannelFactory() throws IOException { @SuppressWarnings("unchecked") public void testHandleAcceptCallsServerAcceptCallback() throws IOException { NioSocketChannel childChannel = new NioSocketChannel(mock(SocketChannel.class), socketSelector); - childChannel.setContexts(mock(ChannelContext.class), mock(BiConsumer.class)); + childChannel.setContext(mock(SocketChannelContext.class)); when(channelFactory.acceptNioChannel(same(channel), same(socketSelector))).thenReturn(childChannel); handler.acceptChannel(channel); - verify(acceptedChannelCallback).accept(childChannel); + verify(context).acceptChannel(childChannel); } } diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/BytesChannelContextTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/BytesChannelContextTests.java index db0e6ae80badf..68ae1f2e50304 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/BytesChannelContextTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/BytesChannelContextTests.java @@ -40,7 +40,7 @@ public class BytesChannelContextTests extends ESTestCase { - private ChannelContext.ReadConsumer readConsumer; + private SocketChannelContext.ReadConsumer readConsumer; private NioSocketChannel channel; private BytesChannelContext context; private InboundChannelBuffer channelBuffer; @@ -51,16 +51,14 @@ public class BytesChannelContextTests extends ESTestCase { @Before @SuppressWarnings("unchecked") public void init() { - readConsumer = mock(ChannelContext.ReadConsumer.class); + readConsumer = mock(SocketChannelContext.ReadConsumer.class); messageLength = randomInt(96) + 20; selector = mock(SocketSelector.class); listener = mock(BiConsumer.class); channel = mock(NioSocketChannel.class); - Supplier pageSupplier = () -> - new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {}); - channelBuffer = new InboundChannelBuffer(pageSupplier); - context = new BytesChannelContext(channel, readConsumer, channelBuffer); + channelBuffer = InboundChannelBuffer.allocatingInstance(); + context = new BytesChannelContext(channel, null, readConsumer, channelBuffer); when(channel.getSelector()).thenReturn(selector); when(selector.isOnCurrentThread()).thenReturn(true); @@ -153,11 +151,12 @@ public void testReadLessThanZeroMeansReadyForClose() throws IOException { } public void testCloseClosesChannelBuffer() throws IOException { + when(channel.isOpen()).thenReturn(true); Runnable closer = mock(Runnable.class); Supplier pageSupplier = () -> new InboundChannelBuffer.Page(ByteBuffer.allocate(1 << 14), closer); InboundChannelBuffer buffer = new InboundChannelBuffer(pageSupplier); buffer.ensureCapacity(1); - BytesChannelContext context = new BytesChannelContext(channel, readConsumer, buffer); + BytesChannelContext context = new BytesChannelContext(channel, null, readConsumer, buffer); context.closeFromSelector(); verify(closer).run(); } @@ -218,6 +217,7 @@ public void testWriteOpsClearedOnClose() throws Exception { assertTrue(context.hasQueuedWriteOps()); + when(channel.isOpen()).thenReturn(true); context.closeFromSelector(); verify(selector).executeFailedListener(same(listener), any(ClosedChannelException.class)); diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelFactoryTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelFactoryTests.java index e3f42139fd80e..1c8a8a130ccfa 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelFactoryTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelFactoryTests.java @@ -28,7 +28,6 @@ import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.util.function.BiConsumer; import static org.mockito.Matchers.any; import static org.mockito.Matchers.same; @@ -139,7 +138,7 @@ private static class TestChannelFactory extends ChannelFactory() { - @Override - public void onResponse(Void o) { - isClosed.set(true); - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - isClosed.set(true); - latch.countDown(); - } - })); - - assertTrue(channel.isOpen()); - assertFalse(closedRawChannel.get()); - assertFalse(isClosed.get()); - - PlainActionFuture closeFuture = PlainActionFuture.newFuture(); - channel.addCloseListener(ActionListener.toBiConsumer(closeFuture)); - selector.queueChannelClose(channel); - closeFuture.actionGet(); - - - assertTrue(closedRawChannel.get()); - assertFalse(channel.isOpen()); - latch.await(); - assertTrue(isClosed.get()); - } - - private class DoNotCloseServerChannel extends DoNotRegisterServerChannel { - - private DoNotCloseServerChannel(ServerSocketChannel channel, ChannelFactory channelFactory, AcceptingSelector selector) - throws IOException { - super(channel, channelFactory, selector); - } - - @Override - void closeRawChannel() throws IOException { - closedRawChannel.set(true); + try (ServerSocketChannel rawChannel = ServerSocketChannel.open()) { + NioServerSocketChannel channel = new NioServerSocketChannel(rawChannel, mock(ChannelFactory.class), selector); + channel.setContext(new ServerChannelContext(channel, mock(Consumer.class), mock(BiConsumer.class))); + channel.addCloseListener(ActionListener.toBiConsumer(new ActionListener() { + @Override + public void onResponse(Void o) { + isClosed.set(true); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + isClosed.set(true); + latch.countDown(); + } + })); + + assertTrue(channel.isOpen()); + assertTrue(rawChannel.isOpen()); + assertFalse(isClosed.get()); + + PlainActionFuture closeFuture = PlainActionFuture.newFuture(); + channel.addCloseListener(ActionListener.toBiConsumer(closeFuture)); + selector.queueChannelClose(channel); + closeFuture.actionGet(); + + + assertFalse(rawChannel.isOpen()); + assertFalse(channel.isOpen()); + latch.await(); + assertTrue(isClosed.get()); } } } diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioSocketChannelTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioSocketChannelTests.java index dd0956458fad3..bbda9233bbb80 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioSocketChannelTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioSocketChannelTests.java @@ -35,14 +35,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class NioSocketChannelTests extends ESTestCase { private SocketSelector selector; - private AtomicBoolean closedRawChannel; private Thread thread; @Before @@ -50,7 +48,6 @@ public class NioSocketChannelTests extends ESTestCase { public void startSelector() throws IOException { selector = new SocketSelector(new SocketEventHandler(logger)); thread = new Thread(selector::runLoop); - closedRawChannel = new AtomicBoolean(false); thread.start(); FutureUtils.get(selector.isRunningFuture()); } @@ -66,80 +63,46 @@ public void testClose() throws Exception { AtomicBoolean isClosed = new AtomicBoolean(false); CountDownLatch latch = new CountDownLatch(1); - NioSocketChannel socketChannel = new DoNotCloseChannel(mock(SocketChannel.class), selector); - socketChannel.setContexts(mock(ChannelContext.class), mock(BiConsumer.class)); - socketChannel.addCloseListener(ActionListener.toBiConsumer(new ActionListener() { - @Override - public void onResponse(Void o) { - isClosed.set(true); - latch.countDown(); - } - @Override - public void onFailure(Exception e) { - isClosed.set(true); - latch.countDown(); - } - })); - - assertTrue(socketChannel.isOpen()); - assertFalse(closedRawChannel.get()); - assertFalse(isClosed.get()); - - PlainActionFuture closeFuture = PlainActionFuture.newFuture(); - socketChannel.addCloseListener(ActionListener.toBiConsumer(closeFuture)); - selector.queueChannelClose(socketChannel); - closeFuture.actionGet(); - - assertTrue(closedRawChannel.get()); - assertFalse(socketChannel.isOpen()); - latch.await(); - assertTrue(isClosed.get()); - } - - @SuppressWarnings("unchecked") - public void testCloseContextExceptionDoesNotStopClose() throws Exception { - AtomicBoolean isClosed = new AtomicBoolean(false); - CountDownLatch latch = new CountDownLatch(1); - - IOException ioException = new IOException(); - NioSocketChannel socketChannel = new DoNotCloseChannel(mock(SocketChannel.class), selector); - ChannelContext context = mock(ChannelContext.class); - doThrow(ioException).when(context).closeFromSelector(); - socketChannel.setContexts(context, mock(BiConsumer.class)); - socketChannel.addCloseListener(ActionListener.toBiConsumer(new ActionListener() { - @Override - public void onResponse(Void o) { - isClosed.set(true); - latch.countDown(); - } - @Override - public void onFailure(Exception e) { - isClosed.set(true); - latch.countDown(); - } - })); - - assertTrue(socketChannel.isOpen()); - assertFalse(closedRawChannel.get()); - assertFalse(isClosed.get()); - - PlainActionFuture closeFuture = PlainActionFuture.newFuture(); - socketChannel.addCloseListener(ActionListener.toBiConsumer(closeFuture)); - selector.queueChannelClose(socketChannel); - closeFuture.actionGet(); - - assertTrue(closedRawChannel.get()); - assertFalse(socketChannel.isOpen()); - latch.await(); - assertTrue(isClosed.get()); + try(SocketChannel rawChannel = SocketChannel.open()) { + NioSocketChannel socketChannel = new NioSocketChannel(rawChannel, selector); + socketChannel.setContext(new BytesChannelContext(socketChannel, mock(BiConsumer.class), + mock(SocketChannelContext.ReadConsumer.class), InboundChannelBuffer.allocatingInstance())); + socketChannel.addCloseListener(ActionListener.toBiConsumer(new ActionListener() { + @Override + public void onResponse(Void o) { + isClosed.set(true); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + isClosed.set(true); + latch.countDown(); + } + })); + + assertTrue(socketChannel.isOpen()); + assertTrue(rawChannel.isOpen()); + assertFalse(isClosed.get()); + + PlainActionFuture closeFuture = PlainActionFuture.newFuture(); + socketChannel.addCloseListener(ActionListener.toBiConsumer(closeFuture)); + selector.queueChannelClose(socketChannel); + closeFuture.actionGet(); + + assertFalse(rawChannel.isOpen()); + assertFalse(socketChannel.isOpen()); + latch.await(); + assertTrue(isClosed.get()); + } } @SuppressWarnings("unchecked") public void testConnectSucceeds() throws Exception { SocketChannel rawChannel = mock(SocketChannel.class); when(rawChannel.finishConnect()).thenReturn(true); - NioSocketChannel socketChannel = new DoNotCloseChannel(rawChannel, selector); - socketChannel.setContexts(mock(ChannelContext.class), mock(BiConsumer.class)); + NioSocketChannel socketChannel = new DoNotRegisterChannel(rawChannel, selector); + socketChannel.setContext(mock(SocketChannelContext.class)); selector.scheduleForRegistration(socketChannel); PlainActionFuture connectFuture = PlainActionFuture.newFuture(); @@ -148,15 +111,14 @@ public void testConnectSucceeds() throws Exception { assertTrue(socketChannel.isConnectComplete()); assertTrue(socketChannel.isOpen()); - assertFalse(closedRawChannel.get()); } @SuppressWarnings("unchecked") public void testConnectFails() throws Exception { SocketChannel rawChannel = mock(SocketChannel.class); when(rawChannel.finishConnect()).thenThrow(new ConnectException()); - NioSocketChannel socketChannel = new DoNotCloseChannel(rawChannel, selector); - socketChannel.setContexts(mock(ChannelContext.class), mock(BiConsumer.class)); + NioSocketChannel socketChannel = new DoNotRegisterChannel(rawChannel, selector); + socketChannel.setContext(mock(SocketChannelContext.class)); selector.scheduleForRegistration(socketChannel); PlainActionFuture connectFuture = PlainActionFuture.newFuture(); @@ -168,16 +130,4 @@ public void testConnectFails() throws Exception { // Even if connection fails the channel is 'open' until close() is called assertTrue(socketChannel.isOpen()); } - - private class DoNotCloseChannel extends DoNotRegisterChannel { - - private DoNotCloseChannel(SocketChannel channel, SocketSelector selector) throws IOException { - super(channel, selector); - } - - @Override - void closeRawChannel() throws IOException { - closedRawChannel.set(true); - } - } } diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java index e0f833c9051d0..d74214636dbdd 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java @@ -53,9 +53,8 @@ public void setUpHandler() throws IOException { channel = new DoNotRegisterChannel(rawChannel, socketSelector); when(rawChannel.finishConnect()).thenReturn(true); - Supplier pageSupplier = () -> new InboundChannelBuffer.Page(ByteBuffer.allocate(1 << 14), () -> {}); - InboundChannelBuffer buffer = new InboundChannelBuffer(pageSupplier); - channel.setContexts(new BytesChannelContext(channel, mock(ChannelContext.ReadConsumer.class), buffer), exceptionHandler); + InboundChannelBuffer buffer = InboundChannelBuffer.allocatingInstance(); + channel.setContext(new BytesChannelContext(channel, exceptionHandler, mock(SocketChannelContext.ReadConsumer.class), buffer)); channel.register(); channel.finishConnect(); @@ -64,7 +63,7 @@ public void setUpHandler() throws IOException { public void testRegisterCallsContext() throws IOException { NioSocketChannel channel = mock(NioSocketChannel.class); - ChannelContext channelContext = mock(ChannelContext.class); + SocketChannelContext channelContext = mock(SocketChannelContext.class); when(channel.getContext()).thenReturn(channelContext); when(channel.getSelectionKey()).thenReturn(new TestSelectionKey(0)); handler.handleRegistration(channel); @@ -102,8 +101,8 @@ public void testConnectExceptionCallsExceptionHandler() throws IOException { public void testHandleReadDelegatesToContext() throws IOException { NioSocketChannel channel = new DoNotRegisterChannel(rawChannel, mock(SocketSelector.class)); - ChannelContext context = mock(ChannelContext.class); - channel.setContexts(context, exceptionHandler); + SocketChannelContext context = mock(SocketChannelContext.class); + channel.setContext(context); when(context.read()).thenReturn(1); handler.handleRead(channel); @@ -124,19 +123,19 @@ public void testWriteExceptionCallsExceptionHandler() { public void testPostHandlingCallWillCloseTheChannelIfReady() throws IOException { NioSocketChannel channel = mock(NioSocketChannel.class); - ChannelContext context = mock(ChannelContext.class); + SocketChannelContext context = mock(SocketChannelContext.class); when(channel.getSelectionKey()).thenReturn(new TestSelectionKey(0)); when(channel.getContext()).thenReturn(context); when(context.selectorShouldClose()).thenReturn(true); handler.postHandling(channel); - verify(channel).closeFromSelector(); + verify(context).closeFromSelector(); } public void testPostHandlingCallWillNotCloseTheChannelIfNotReady() throws IOException { NioSocketChannel channel = mock(NioSocketChannel.class); - ChannelContext context = mock(ChannelContext.class); + SocketChannelContext context = mock(SocketChannelContext.class); when(channel.getSelectionKey()).thenReturn(new TestSelectionKey(0)); when(channel.getContext()).thenReturn(context); @@ -149,8 +148,8 @@ public void testPostHandlingCallWillNotCloseTheChannelIfNotReady() throws IOExce public void testPostHandlingWillAddWriteIfNecessary() throws IOException { NioSocketChannel channel = new DoNotRegisterChannel(rawChannel, mock(SocketSelector.class)); channel.setSelectionKey(new TestSelectionKey(SelectionKey.OP_READ)); - ChannelContext context = mock(ChannelContext.class); - channel.setContexts(context, null); + SocketChannelContext context = mock(SocketChannelContext.class); + channel.setContext(context); when(context.hasQueuedWriteOps()).thenReturn(true); @@ -162,8 +161,8 @@ public void testPostHandlingWillAddWriteIfNecessary() throws IOException { public void testPostHandlingWillRemoveWriteIfNecessary() throws IOException { NioSocketChannel channel = new DoNotRegisterChannel(rawChannel, mock(SocketSelector.class)); channel.setSelectionKey(new TestSelectionKey(SelectionKey.OP_READ | SelectionKey.OP_WRITE)); - ChannelContext context = mock(ChannelContext.class); - channel.setContexts(context, null); + SocketChannelContext context = mock(SocketChannelContext.class); + channel.setContext(context); when(context.hasQueuedWriteOps()).thenReturn(false); @@ -171,10 +170,4 @@ public void testPostHandlingWillRemoveWriteIfNecessary() throws IOException { handler.postHandling(channel); assertEquals(SelectionKey.OP_READ, channel.getSelectionKey().interestOps()); } - - private void setWriteAndRead(NioChannel channel) { - SelectionKeyUtils.setConnectAndReadInterested(channel); - SelectionKeyUtils.removeConnectInterested(channel); - SelectionKeyUtils.setWriteInterested(channel); - } } diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java index 9197fe38dbc0a..5992244b2f930 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java @@ -49,7 +49,7 @@ public class SocketSelectorTests extends ESTestCase { private SocketEventHandler eventHandler; private NioSocketChannel channel; private TestSelectionKey selectionKey; - private ChannelContext channelContext; + private SocketChannelContext channelContext; private BiConsumer listener; private ByteBuffer[] buffers = {ByteBuffer.allocate(1)}; private Selector rawSelector; @@ -60,7 +60,7 @@ public void setUp() throws Exception { super.setUp(); eventHandler = mock(SocketEventHandler.class); channel = mock(NioSocketChannel.class); - channelContext = mock(ChannelContext.class); + channelContext = mock(SocketChannelContext.class); listener = mock(BiConsumer.class); selectionKey = new TestSelectionKey(0); selectionKey.attach(channel); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index d25d3c5974ad8..acea1ca5d482e 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -31,14 +31,15 @@ import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.nio.SocketChannelContext; import org.elasticsearch.nio.AcceptingSelector; import org.elasticsearch.nio.AcceptorEventHandler; -import org.elasticsearch.nio.BytesChannelContext; -import org.elasticsearch.nio.ChannelContext; import org.elasticsearch.nio.ChannelFactory; import org.elasticsearch.nio.InboundChannelBuffer; +import org.elasticsearch.nio.BytesChannelContext; import org.elasticsearch.nio.NioGroup; import org.elasticsearch.nio.NioSocketChannel; +import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.nio.SocketEventHandler; import org.elasticsearch.nio.SocketSelector; import org.elasticsearch.threadpool.ThreadPool; @@ -52,6 +53,7 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.concurrent.ConcurrentMap; +import java.util.function.BiConsumer; import java.util.function.Supplier; import static org.elasticsearch.common.settings.Setting.intSetting; @@ -182,18 +184,21 @@ public TcpNioSocketChannel createChannel(SocketSelector selector, SocketChannel Recycler.V bytes = pageCacheRecycler.bytePage(false); return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close); }; - ChannelContext.ReadConsumer nioReadConsumer = channelBuffer -> + SocketChannelContext.ReadConsumer nioReadConsumer = channelBuffer -> consumeNetworkReads(nioChannel, BytesReference.fromByteBuffers(channelBuffer.sliceBuffersTo(channelBuffer.getIndex()))); - BytesChannelContext context = new BytesChannelContext(nioChannel, nioReadConsumer, new InboundChannelBuffer(pageSupplier)); - nioChannel.setContexts(context, NioTransport.this::exceptionCaught); + BiConsumer exceptionHandler = NioTransport.this::exceptionCaught; + BytesChannelContext context = new BytesChannelContext(nioChannel, exceptionHandler, nioReadConsumer, + new InboundChannelBuffer(pageSupplier)); + nioChannel.setContext(context); return nioChannel; } @Override public TcpNioServerSocketChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException { - TcpNioServerSocketChannel nioServerChannel = new TcpNioServerSocketChannel(profileName, channel, this, selector); - nioServerChannel.setAcceptContext(NioTransport.this::acceptChannel); - return nioServerChannel; + TcpNioServerSocketChannel nioChannel = new TcpNioServerSocketChannel(profileName, channel, this, selector); + ServerChannelContext context = new ServerChannelContext(nioChannel, NioTransport.this::acceptChannel, (c, e) -> {}); + nioChannel.setContext(context); + return nioChannel; } } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioServerSocketChannel.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioServerSocketChannel.java index f0d01bf5a7da6..683ae146cfb9c 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioServerSocketChannel.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioServerSocketChannel.java @@ -39,8 +39,8 @@ public class TcpNioServerSocketChannel extends NioServerSocketChannel implements private final String profile; public TcpNioServerSocketChannel(String profile, ServerSocketChannel socketChannel, - ChannelFactory channelFactory, - AcceptingSelector selector) throws IOException { + ChannelFactory channelFactory, + AcceptingSelector selector) throws IOException { super(socketChannel, channelFactory, selector); this.profile = profile; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index c5ec4c6bfb7aa..ec262261e54c8 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -32,12 +32,13 @@ import org.elasticsearch.nio.AcceptingSelector; import org.elasticsearch.nio.AcceptorEventHandler; import org.elasticsearch.nio.BytesChannelContext; -import org.elasticsearch.nio.ChannelContext; import org.elasticsearch.nio.ChannelFactory; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.NioGroup; import org.elasticsearch.nio.NioServerSocketChannel; import org.elasticsearch.nio.NioSocketChannel; +import org.elasticsearch.nio.ServerChannelContext; +import org.elasticsearch.nio.SocketChannelContext; import org.elasticsearch.nio.SocketSelector; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TcpChannel; @@ -161,17 +162,19 @@ public MockSocketChannel createChannel(SocketSelector selector, SocketChannel ch Recycler.V bytes = pageCacheRecycler.bytePage(false); return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close); }; - ChannelContext.ReadConsumer nioReadConsumer = channelBuffer -> + SocketChannelContext.ReadConsumer nioReadConsumer = channelBuffer -> consumeNetworkReads(nioChannel, BytesReference.fromByteBuffers(channelBuffer.sliceBuffersTo(channelBuffer.getIndex()))); - BytesChannelContext context = new BytesChannelContext(nioChannel, nioReadConsumer, new InboundChannelBuffer(pageSupplier)); - nioChannel.setContexts(context, MockNioTransport.this::exceptionCaught); + BytesChannelContext context = new BytesChannelContext(nioChannel, MockNioTransport.this::exceptionCaught, nioReadConsumer, + new InboundChannelBuffer(pageSupplier)); + nioChannel.setContext(context); return nioChannel; } @Override public MockServerChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException { MockServerChannel nioServerChannel = new MockServerChannel(profileName, channel, this, selector); - nioServerChannel.setAcceptContext(MockNioTransport.this::acceptChannel); + ServerChannelContext context = new ServerChannelContext(nioServerChannel, MockNioTransport.this::acceptChannel, (c, e) -> {}); + nioServerChannel.setContext(context); return nioServerChannel; } }