Skip to content

Commit

Permalink
Implement socket and server ChannelContexts (#28275)
Browse files Browse the repository at this point in the history
This commit is related to #27260. Currently have a channel context that
implements reading and writing logic for socket channels. Additionally,
we have exception contexts to handle exceptions. And accepting contexts
to handle accepted channels. This PR introduces a ChannelContext that
handles close and exception handling for all channel types.
Additionally, it has implementers that provide specific functionality
for socket channels (read and writing). And specific functionality for
server channels (accepting).
  • Loading branch information
Tim-Brooks authored Jan 18, 2018
1 parent de9d903 commit a6a57a7
Show file tree
Hide file tree
Showing 25 changed files with 393 additions and 332 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void closeFromSelector() throws IOException {
selector.assertOnSelectorThread();
if (closeContext.isDone() == false) {
try {
closeRawChannel();
socketChannel.close();
closeContext.complete(null);
} catch (IOException e) {
closeContext.completeExceptionally(e);
Expand Down Expand Up @@ -119,13 +119,13 @@ public void addCloseListener(BiConsumer<Void, Throwable> listener) {
closeContext.whenComplete(listener);
}

@Override
public void close() {
getContext().closeChannel();
}

// Package visibility for testing
void setSelectionKey(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
// Package visibility for testing

void closeRawChannel() throws IOException {
socketChannel.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected void acceptChannel(NioServerSocketChannel nioServerChannel) throws IOE
ChannelFactory<?, ?> channelFactory = nioServerChannel.getChannelFactory();
SocketSelector selector = selectorSupplier.get();
NioSocketChannel nioSocketChannel = channelFactory.acceptNioChannel(nioServerChannel, selector);
nioServerChannel.getAcceptContext().accept(nioSocketChannel);
nioServerChannel.getContext().acceptChannel(nioSocketChannel);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,30 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;

public class BytesChannelContext implements ChannelContext {
public class BytesChannelContext extends SocketChannelContext {

private final NioSocketChannel channel;
private final ReadConsumer readConsumer;
private final InboundChannelBuffer channelBuffer;
private final LinkedList<BytesWriteOperation> queued = new LinkedList<>();
private final AtomicBoolean isClosing = new AtomicBoolean(false);
private boolean peerClosed = false;
private boolean ioException = false;

public BytesChannelContext(NioSocketChannel channel, ReadConsumer readConsumer, InboundChannelBuffer channelBuffer) {
this.channel = channel;
public BytesChannelContext(NioSocketChannel channel, BiConsumer<NioSocketChannel, Exception> exceptionHandler,
ReadConsumer readConsumer, InboundChannelBuffer channelBuffer) {
super(channel, exceptionHandler);
this.readConsumer = readConsumer;
this.channelBuffer = channelBuffer;
}

@Override
public void channelRegistered() throws IOException {}

@Override
public int read() throws IOException {
if (channelBuffer.getRemaining() == 0) {
// Requiring one additional byte will ensure that a new page is allocated.
channelBuffer.ensureCapacity(channelBuffer.getCapacity() + 1);
}

int bytesRead;
try {
bytesRead = channel.read(channelBuffer.sliceBuffersFrom(channelBuffer.getIndex()));
} catch (IOException ex) {
ioException = true;
throw ex;
}
int bytesRead = readFromChannel(channelBuffer.sliceBuffersFrom(channelBuffer.getIndex()));

if (bytesRead == -1) {
peerClosed = true;
if (bytesRead == 0) {
return 0;
}

Expand Down Expand Up @@ -90,7 +78,6 @@ public void sendMessage(ByteBuffer[] buffers, BiConsumer<Void, Throwable> listen
return;
}

// TODO: Eval if we will allow writes from sendMessage
selector.queueWriteInChannelBuffer(writeOperation);
}

Expand Down Expand Up @@ -126,28 +113,38 @@ public void closeChannel() {

@Override
public boolean selectorShouldClose() {
return peerClosed || ioException || isClosing.get();
return isPeerClosed() || hasIOException() || isClosing.get();
}

@Override
public void closeFromSelector() {
public void closeFromSelector() throws IOException {
channel.getSelector().assertOnSelectorThread();
// Set to true in order to reject new writes before queuing with selector
isClosing.set(true);
channelBuffer.close();
for (BytesWriteOperation op : queued) {
channel.getSelector().executeFailedListener(op.getListener(), new ClosedChannelException());
if (channel.isOpen()) {
IOException channelCloseException = null;
try {
channel.closeFromSelector();
} catch (IOException e) {
channelCloseException = e;
}
// Set to true in order to reject new writes before queuing with selector
isClosing.set(true);
channelBuffer.close();
for (BytesWriteOperation op : queued) {
channel.getSelector().executeFailedListener(op.getListener(), new ClosedChannelException());
}
queued.clear();
if (channelCloseException != null) {
throw channelCloseException;
}
}
queued.clear();
}

private void singleFlush(BytesWriteOperation headOp) throws IOException {
try {
int written = channel.write(headOp.getBuffersToWrite());
int written = flushToChannel(headOp.getBuffersToWrite());
headOp.incrementIndex(written);
} catch (IOException e) {
channel.getSelector().executeFailedListener(headOp.getListener(), e);
ioException = true;
throw e;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,62 +20,26 @@
package org.elasticsearch.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.BiConsumer;

/**
* This context should implement the specific logic for a channel. When a channel receives a notification
* that it is ready to perform certain operations (read, write, etc) the {@link ChannelContext} will be
* called. This context will need to implement all protocol related logic. Additionally, if any special
* close behavior is required, it should be implemented in this context.
*
* The only methods of the context that should ever be called from a non-selector thread are
* {@link #closeChannel()} and {@link #sendMessage(ByteBuffer[], BiConsumer)}.
*/
public interface ChannelContext {

void channelRegistered() throws IOException;

int read() throws IOException;

void sendMessage(ByteBuffer[] buffers, BiConsumer<Void, Throwable> listener);

void queueWriteOperation(WriteOperation writeOperation);

void flushChannel() throws IOException;

boolean hasQueuedWriteOps();
/**
* This method cleans up any context resources that need to be released when a channel is closed. It
* should only be called by the selector thread.
*
* @throws IOException during channel / context close
*/
void closeFromSelector() throws IOException;

/**
* Schedules a channel to be closed by the selector event loop with which it is registered.
* <p>
*
* If the channel is open and the state can be transitioned to closed, the close operation will
* be scheduled with the event loop.
* <p>
* If the channel is already set to closed, it is assumed that it is already scheduled to be closed.
* <p>
*
* Depending on the underlying protocol of the channel, a close operation might simply close the socket
* channel or may involve reading and writing messages.
*/
void closeChannel();

/**
* This method indicates if a selector should close this channel.
*
* @return a boolean indicating if the selector should close
*/
boolean selectorShouldClose();

/**
* This method cleans up any context resources that need to be released when a channel is closed. It
* should only be called by the selector thread.
*
* @throws IOException during channel / context close
*/
void closeFromSelector() throws IOException;

@FunctionalInterface
interface ReadConsumer {
int consumeReads(InboundChannelBuffer channelBuffer) throws IOException;
}
void handleException(Exception e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ private Socket internalCreateChannel(SocketSelector selector, SocketChannel rawC
try {
Socket channel = createChannel(selector, rawChannel);
assert channel.getContext() != null : "channel context should have been set on channel";
assert channel.getExceptionContext() != null : "exception handler should have been set on channel";
return channel;
} catch (Exception e) {
closeRawChannel(rawChannel, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ protected void uncaughtException(Exception exception) {
*/
protected void handleClose(NioChannel channel) {
try {
channel.closeFromSelector();
channel.getContext().closeFromSelector();
} catch (IOException e) {
closeException(channel, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public InboundChannelBuffer(Supplier<Page> pageSupplier) {
ensureCapacity(PAGE_SIZE);
}

public static InboundChannelBuffer allocatingInstance() {
return new InboundChannelBuffer(() -> new Page(ByteBuffer.allocate(PAGE_SIZE), () -> {}));
}

@Override
public void close() {
if (isClosed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public interface NioChannel {

InetSocketAddress getLocalAddress();

void close();

void closeFromSelector() throws IOException;

void register() throws ClosedChannelException;
Expand All @@ -42,6 +44,8 @@ public interface NioChannel {

NetworkChannel getRawChannel();

ChannelContext getContext();

/**
* Adds a close listener to the channel. Multiple close listeners can be added. There is no guarantee
* about the order in which close listeners will be executed. If the channel is already closed, the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@

import java.io.IOException;
import java.nio.channels.ServerSocketChannel;
import java.util.function.Consumer;
import java.util.concurrent.atomic.AtomicBoolean;

public class NioServerSocketChannel extends AbstractNioChannel<ServerSocketChannel> {

private final ChannelFactory<?, ?> channelFactory;
private Consumer<NioSocketChannel> acceptContext;
private ServerChannelContext context;
private final AtomicBoolean contextSet = new AtomicBoolean(false);

public NioServerSocketChannel(ServerSocketChannel socketChannel, ChannelFactory<?, ?> channelFactory, AcceptingSelector selector)
throws IOException {
Expand All @@ -39,17 +40,22 @@ public NioServerSocketChannel(ServerSocketChannel socketChannel, ChannelFactory<
}

/**
* This method sets the accept context for a server socket channel. The accept context is called when a
* new channel is accepted. The parameter passed to the context is the new channel.
* This method sets the context for a server socket channel. The context is called when a new channel is
* accepted, an exception occurs, or it is time to close the channel.
*
* @param acceptContext to call
* @param context to call
*/
public void setAcceptContext(Consumer<NioSocketChannel> acceptContext) {
this.acceptContext = acceptContext;
public void setContext(ServerChannelContext context) {
if (contextSet.compareAndSet(false, true)) {
this.context = context;
} else {
throw new IllegalStateException("Context on this channel were already set. It should only be once.");
}
}

public Consumer<NioSocketChannel> getAcceptContext() {
return acceptContext;
@Override
public ServerChannelContext getContext() {
return context;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@

package org.elasticsearch.nio;

import org.elasticsearch.nio.utils.ExceptionsHelper;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
Expand All @@ -35,9 +32,8 @@ public class NioSocketChannel extends AbstractNioChannel<SocketChannel> {
private final InetSocketAddress remoteAddress;
private final CompletableFuture<Void> connectContext = new CompletableFuture<>();
private final SocketSelector socketSelector;
private final AtomicBoolean contextsSet = new AtomicBoolean(false);
private ChannelContext context;
private BiConsumer<NioSocketChannel, Exception> exceptionContext;
private final AtomicBoolean contextSet = new AtomicBoolean(false);
private SocketChannelContext context;
private Exception connectException;

public NioSocketChannel(SocketChannel socketChannel, SocketSelector selector) throws IOException {
Expand All @@ -46,25 +42,6 @@ public NioSocketChannel(SocketChannel socketChannel, SocketSelector selector) th
this.socketSelector = selector;
}

@Override
public void closeFromSelector() throws IOException {
getSelector().assertOnSelectorThread();
if (isOpen()) {
ArrayList<IOException> closingExceptions = new ArrayList<>(2);
try {
super.closeFromSelector();
} catch (IOException e) {
closingExceptions.add(e);
}
try {
context.closeFromSelector();
} catch (IOException e) {
closingExceptions.add(e);
}
ExceptionsHelper.rethrowAndSuppress(closingExceptions);
}
}

@Override
public SocketSelector getSelector() {
return socketSelector;
Expand Down Expand Up @@ -94,23 +71,19 @@ public int read(ByteBuffer[] buffers) throws IOException {
}
}

public void setContexts(ChannelContext context, BiConsumer<NioSocketChannel, Exception> exceptionContext) {
if (contextsSet.compareAndSet(false, true)) {
public void setContext(SocketChannelContext context) {
if (contextSet.compareAndSet(false, true)) {
this.context = context;
this.exceptionContext = exceptionContext;
} else {
throw new IllegalStateException("Contexts on this channel were already set. They should only be once.");
throw new IllegalStateException("Context on this channel were already set. It should only be once.");
}
}

public ChannelContext getContext() {
@Override
public SocketChannelContext getContext() {
return context;
}

public BiConsumer<NioSocketChannel, Exception> getExceptionContext() {
return exceptionContext;
}

public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}
Expand Down
Loading

0 comments on commit a6a57a7

Please sign in to comment.