diff --git a/server/src/main/java/org/opensearch/transport/BytesTransportRequest.java b/server/src/main/java/org/opensearch/transport/BytesTransportRequest.java index 3fd84432639d4..9102e35f2fed4 100644 --- a/server/src/main/java/org/opensearch/transport/BytesTransportRequest.java +++ b/server/src/main/java/org/opensearch/transport/BytesTransportRequest.java @@ -47,7 +47,7 @@ */ public class BytesTransportRequest extends TransportRequest { - BytesReference bytes; + public BytesReference bytes; Version version; public BytesTransportRequest(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java b/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java index 57707d3b44477..30b4652a82663 100644 --- a/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java +++ b/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java @@ -58,13 +58,13 @@ * * @opensearch.internal */ -final class CompressibleBytesOutputStream extends StreamOutput { +public final class CompressibleBytesOutputStream extends StreamOutput { private final OutputStream stream; private final BytesStream bytesStreamOutput; private final boolean shouldCompress; - CompressibleBytesOutputStream(BytesStream bytesStreamOutput, boolean shouldCompress) throws IOException { + public CompressibleBytesOutputStream(BytesStream bytesStreamOutput, boolean shouldCompress) throws IOException { this.bytesStreamOutput = bytesStreamOutput; this.shouldCompress = shouldCompress; if (shouldCompress) { @@ -80,7 +80,7 @@ final class CompressibleBytesOutputStream extends StreamOutput { * @return bytes underlying the stream * @throws IOException if an exception occurs when writing or flushing */ - BytesReference materializeBytes() throws IOException { + public BytesReference materializeBytes() throws IOException { // If we are using compression the stream needs to be closed to ensure that EOS marker bytes are written. // The actual ReleasableBytesStreamOutput will not be closed yet as it is wrapped in flushOnCloseStream when // passed to the deflater stream. diff --git a/server/src/main/java/org/opensearch/transport/NetworkMessage.java b/server/src/main/java/org/opensearch/transport/NetworkMessage.java index f02d664b65929..f6f3057e041db 100644 --- a/server/src/main/java/org/opensearch/transport/NetworkMessage.java +++ b/server/src/main/java/org/opensearch/transport/NetworkMessage.java @@ -48,7 +48,7 @@ public abstract class NetworkMessage { protected final long requestId; protected final byte status; - NetworkMessage(ThreadContext threadContext, Version version, byte status, long requestId) { + public NetworkMessage(ThreadContext threadContext, Version version, byte status, long requestId) { this.threadContext = threadContext.captureAsWriteable(); this.version = version; this.requestId = requestId; diff --git a/server/src/main/java/org/opensearch/transport/OutboundHandler.java b/server/src/main/java/org/opensearch/transport/OutboundHandler.java index b83dbdd0effe4..4c9d0ca3afd7b 100644 --- a/server/src/main/java/org/opensearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/opensearch/transport/OutboundHandler.java @@ -32,28 +32,22 @@ package org.opensearch.transport; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Version; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.common.CheckedSupplier; -import org.opensearch.common.io.stream.ReleasableBytesStreamOutput; -import org.opensearch.common.lease.Releasable; -import org.opensearch.common.lease.Releasables; import org.opensearch.common.network.CloseableChannel; -import org.opensearch.common.transport.NetworkExceptionHelper; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.action.NotifyOnceListener; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.transport.TransportResponse; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.OutboundMessageHandler.SendContext; +import org.opensearch.transport.nativeprotocol.NativeInboundMessage; +import org.opensearch.transport.nativeprotocol.NativeOutboundMessageHandler; import java.io.IOException; +import java.util.Map; import java.util.Set; /** @@ -63,15 +57,12 @@ */ final class OutboundHandler { - private static final Logger logger = LogManager.getLogger(OutboundHandler.class); - private final String nodeName; private final Version version; - private final String[] features; private final StatsTracker statsTracker; private final ThreadPool threadPool; - private final BigArrays bigArrays; private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER; + private final Map protocolMessageHandlers; OutboundHandler( String nodeName, @@ -83,14 +74,16 @@ final class OutboundHandler { ) { this.nodeName = nodeName; this.version = version; - this.features = features; this.statsTracker = statsTracker; this.threadPool = threadPool; - this.bigArrays = bigArrays; + this.protocolMessageHandlers = Map.of( + NativeInboundMessage.NATIVE_PROTOCOL, + new NativeOutboundMessageHandler(features, statsTracker, threadPool, bigArrays) + ); } void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener listener) { - SendContext sendContext = new SendContext(channel, () -> bytes, listener); + SendContext sendContext = new SendContext(channel, () -> bytes, listener, statsTracker); try { internalSend(channel, sendContext); } catch (IOException e) { @@ -115,18 +108,18 @@ void sendRequest( final boolean isHandshake ) throws IOException, TransportException { Version version = Version.min(this.version, channelVersion); - OutboundMessage.Request message = new OutboundMessage.Request( - threadPool.getThreadContext(), - features, + // TODO: Add logic for protocols in transport message + OutboundMessageHandler outboundMessageHandler = protocolMessageHandlers.get("native"); + ProtocolOutboundMessage message = outboundMessageHandler.convertRequestToOutboundMessage( request, - version, action, requestId, isHandshake, - compressRequest + compressRequest, + version ); ActionListener listener = ActionListener.wrap(() -> messageListener.onRequestSent(node, requestId, action, request, options)); - sendMessage(channel, message, listener); + outboundMessageHandler.sendMessage(channel, message, listener); } /** @@ -146,17 +139,18 @@ void sendResponse( final boolean isHandshake ) throws IOException { Version version = Version.min(this.version, nodeVersion); - OutboundMessage.Response message = new OutboundMessage.Response( - threadPool.getThreadContext(), - features, + // TODO: Add logic for protocols in transport message + OutboundMessageHandler outboundMessageHandler = protocolMessageHandlers.get("native"); + ProtocolOutboundMessage message = outboundMessageHandler.convertResponseToOutboundMessage( response, - version, + features, requestId, isHandshake, - compress + compress, + version ); ActionListener listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, response)); - sendMessage(channel, message, listener); + outboundMessageHandler.sendMessage(channel, message, listener); } /** @@ -173,23 +167,18 @@ void sendErrorResponse( Version version = Version.min(this.version, nodeVersion); TransportAddress address = new TransportAddress(channel.getLocalAddress()); RemoteTransportException tx = new RemoteTransportException(nodeName, address, action, error); - OutboundMessage.Response message = new OutboundMessage.Response( - threadPool.getThreadContext(), - features, + // TODO: Add logic for protocols in transport message + OutboundMessageHandler outboundMessageHandler = protocolMessageHandlers.get("native"); + ProtocolOutboundMessage message = outboundMessageHandler.convertErrorResponseToOutboundMessage( tx, - version, + features, requestId, false, - false + false, + version ); ActionListener listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, error)); - sendMessage(channel, message, listener); - } - - private void sendMessage(TcpChannel channel, OutboundMessage networkMessage, ActionListener listener) throws IOException { - MessageSerializer serializer = new MessageSerializer(networkMessage, bigArrays); - SendContext sendContext = new SendContext(channel, serializer, listener, serializer); - internalSend(channel, sendContext); + outboundMessageHandler.sendMessage(channel, message, listener); } private void internalSend(TcpChannel channel, SendContext sendContext) throws IOException { @@ -213,94 +202,4 @@ void setMessageListener(TransportMessageListener listener) { } } - /** - * Internal message serializer - * - * @opensearch.internal - */ - private static class MessageSerializer implements CheckedSupplier, Releasable { - - private final OutboundMessage message; - private final BigArrays bigArrays; - private volatile ReleasableBytesStreamOutput bytesStreamOutput; - - private MessageSerializer(OutboundMessage message, BigArrays bigArrays) { - this.message = message; - this.bigArrays = bigArrays; - } - - @Override - public BytesReference get() throws IOException { - bytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays); - return message.serialize(bytesStreamOutput); - } - - @Override - public void close() { - IOUtils.closeWhileHandlingException(bytesStreamOutput); - } - } - - private class SendContext extends NotifyOnceListener implements CheckedSupplier { - - private final TcpChannel channel; - private final CheckedSupplier messageSupplier; - private final ActionListener listener; - private final Releasable optionalReleasable; - private long messageSize = -1; - - private SendContext( - TcpChannel channel, - CheckedSupplier messageSupplier, - ActionListener listener - ) { - this(channel, messageSupplier, listener, null); - } - - private SendContext( - TcpChannel channel, - CheckedSupplier messageSupplier, - ActionListener listener, - Releasable optionalReleasable - ) { - this.channel = channel; - this.messageSupplier = messageSupplier; - this.listener = listener; - this.optionalReleasable = optionalReleasable; - } - - public BytesReference get() throws IOException { - BytesReference message; - try { - message = messageSupplier.get(); - messageSize = message.length(); - TransportLogger.logOutboundMessage(channel, message); - return message; - } catch (Exception e) { - onFailure(e); - throw e; - } - } - - @Override - protected void innerOnResponse(Void v) { - assert messageSize != -1 : "If onResponse is being called, the message should have been serialized"; - statsTracker.markBytesWritten(messageSize); - closeAndCallback(() -> listener.onResponse(v)); - } - - @Override - protected void innerOnFailure(Exception e) { - if (NetworkExceptionHelper.isCloseConnectionException(e)) { - logger.debug(() -> new ParameterizedMessage("send message failed [channel: {}]", channel), e); - } else { - logger.warn(() -> new ParameterizedMessage("send message failed [channel: {}]", channel), e); - } - closeAndCallback(() -> listener.onFailure(e)); - } - - private void closeAndCallback(Runnable runnable) { - Releasables.close(optionalReleasable, runnable::run); - } - } } diff --git a/server/src/main/java/org/opensearch/transport/OutboundMessageHandler.java b/server/src/main/java/org/opensearch/transport/OutboundMessageHandler.java new file mode 100644 index 0000000000000..8897b3c3b0a15 --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/OutboundMessageHandler.java @@ -0,0 +1,151 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.Version; +import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lease.Releasables; +import org.opensearch.common.network.CloseableChannel; +import org.opensearch.common.transport.NetworkExceptionHelper; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.action.NotifyOnceListener; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.transport.TransportResponse; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Set; + +/** + * Interface for outbound message handler. Can be implemented for different transport protocols. + * + * @opensearch.internal + */ +public interface OutboundMessageHandler { + + static final Logger logger = LogManager.getLogger(OutboundHandler.class); + + void sendMessage(TcpChannel channel, ProtocolOutboundMessage networkMessage, ActionListener listener) throws IOException; + + ProtocolOutboundMessage convertRequestToOutboundMessage( + final TransportRequest request, + final String action, + final long requestId, + final boolean isHandshake, + final boolean compressRequest, + final Version channelVersion + ); + + ProtocolOutboundMessage convertResponseToOutboundMessage( + final TransportResponse response, + final Set features, + final long requestId, + final boolean isHandshake, + final boolean compressRequest, + final Version channelVersion + ); + + ProtocolOutboundMessage convertErrorResponseToOutboundMessage( + final RemoteTransportException tx, + final Set features, + final long requestId, + final boolean isHandshake, + final boolean compress, + final Version channelVersion + ); + + public default void internalSend(TcpChannel channel, SendContext sendContext, ThreadPool threadPool) throws IOException { + channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis()); + BytesReference reference = sendContext.get(); + // stash thread context so that channel event loop is not polluted by thread context + try (ThreadContext.StoredContext existing = threadPool.getThreadContext().stashContext()) { + channel.sendMessage(reference, sendContext); + } catch (RuntimeException ex) { + sendContext.onFailure(ex); + CloseableChannel.closeChannel(channel); + throw ex; + } + } + + /** + * Context for sending a message. + */ + public class SendContext extends NotifyOnceListener implements CheckedSupplier { + + private final TcpChannel channel; + private final CheckedSupplier messageSupplier; + private final ActionListener listener; + private final Releasable optionalReleasable; + private final StatsTracker statsTracker; + private long messageSize = -1; + + public SendContext( + TcpChannel channel, + CheckedSupplier messageSupplier, + ActionListener listener, + StatsTracker statsTracker + ) { + this(channel, messageSupplier, listener, null, statsTracker); + } + + public SendContext( + TcpChannel channel, + CheckedSupplier messageSupplier, + ActionListener listener, + Releasable optionalReleasable, + StatsTracker statsTracker + ) { + this.channel = channel; + this.messageSupplier = messageSupplier; + this.listener = listener; + this.optionalReleasable = optionalReleasable; + this.statsTracker = statsTracker; + } + + public BytesReference get() throws IOException { + BytesReference message; + try { + message = messageSupplier.get(); + messageSize = message.length(); + TransportLogger.logOutboundMessage(channel, message); + return message; + } catch (Exception e) { + onFailure(e); + throw e; + } + } + + @Override + protected void innerOnResponse(Void v) { + assert messageSize != -1 : "If onResponse is being called, the message should have been serialized"; + statsTracker.markBytesWritten(messageSize); + closeAndCallback(() -> listener.onResponse(v)); + } + + @Override + protected void innerOnFailure(Exception e) { + if (NetworkExceptionHelper.isCloseConnectionException(e)) { + logger.debug(() -> new ParameterizedMessage("send message failed [channel: {}]", channel), e); + } else { + logger.warn(() -> new ParameterizedMessage("send message failed [channel: {}]", channel), e); + } + closeAndCallback(() -> listener.onFailure(e)); + } + + private void closeAndCallback(Runnable runnable) { + Releasables.close(optionalReleasable, runnable::run); + } + } + +} diff --git a/server/src/main/java/org/opensearch/transport/ProtocolOutboundMessage.java b/server/src/main/java/org/opensearch/transport/ProtocolOutboundMessage.java new file mode 100644 index 0000000000000..3914a1392aacd --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/ProtocolOutboundMessage.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.bytes.BytesReference; + +import java.io.IOException; + +/** + * Base class for outbound data as a message. + * Different implementations are used for different protocols. + * + * @opensearch.internal + */ +public interface ProtocolOutboundMessage { + + BytesReference serialize(BytesStreamOutput bytesStream) throws IOException; + +} diff --git a/server/src/main/java/org/opensearch/transport/TransportStatus.java b/server/src/main/java/org/opensearch/transport/TransportStatus.java index dab572949e688..b09c8a51dd2b0 100644 --- a/server/src/main/java/org/opensearch/transport/TransportStatus.java +++ b/server/src/main/java/org/opensearch/transport/TransportStatus.java @@ -80,7 +80,7 @@ static boolean isHandshake(byte value) { // pkg private since it's only used int return (value & STATUS_HANDSHAKE) != 0; } - static byte setHandshake(byte value) { // pkg private since it's only used internally + public static byte setHandshake(byte value) { // pkg private since it's only used internally value |= STATUS_HANDSHAKE; return value; } diff --git a/server/src/main/java/org/opensearch/transport/OutboundMessage.java b/server/src/main/java/org/opensearch/transport/nativeprotocol/NativeOutboundMessage.java similarity index 87% rename from server/src/main/java/org/opensearch/transport/OutboundMessage.java rename to server/src/main/java/org/opensearch/transport/nativeprotocol/NativeOutboundMessage.java index 3dafc6fb5eea9..962618e7041e7 100644 --- a/server/src/main/java/org/opensearch/transport/OutboundMessage.java +++ b/server/src/main/java/org/opensearch/transport/nativeprotocol/NativeOutboundMessage.java @@ -29,7 +29,7 @@ * GitHub history for details. */ -package org.opensearch.transport; +package org.opensearch.transport.nativeprotocol; import org.opensearch.Version; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -39,6 +39,13 @@ import org.opensearch.core.common.bytes.CompositeBytesReference; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.transport.BytesTransportRequest; +import org.opensearch.transport.CompressibleBytesOutputStream; +import org.opensearch.transport.NetworkMessage; +import org.opensearch.transport.ProtocolOutboundMessage; +import org.opensearch.transport.RemoteTransportException; +import org.opensearch.transport.TcpHeader; +import org.opensearch.transport.TransportStatus; import java.io.IOException; import java.util.Set; @@ -48,16 +55,17 @@ * * @opensearch.internal */ -abstract class OutboundMessage extends NetworkMessage { +public abstract class NativeOutboundMessage extends NetworkMessage implements ProtocolOutboundMessage { private final Writeable message; - OutboundMessage(ThreadContext threadContext, Version version, byte status, long requestId, Writeable message) { + NativeOutboundMessage(ThreadContext threadContext, Version version, byte status, long requestId, Writeable message) { super(threadContext, version, status, requestId); this.message = message; } - BytesReference serialize(BytesStreamOutput bytesStream) throws IOException { + @Override + public BytesReference serialize(BytesStreamOutput bytesStream) throws IOException { bytesStream.setVersion(version); bytesStream.skip(TcpHeader.headerSize(version)); @@ -119,12 +127,12 @@ protected BytesReference writeMessage(CompressibleBytesOutputStream stream) thro * * @opensearch.internal */ - static class Request extends OutboundMessage { + public static class Request extends NativeOutboundMessage { private final String[] features; private final String action; - Request( + public Request( ThreadContext threadContext, String[] features, Writeable message, @@ -149,7 +157,7 @@ protected void writeVariableHeader(StreamOutput stream) throws IOException { private static byte setStatus(boolean compress, boolean isHandshake, Writeable message) { byte status = 0; status = TransportStatus.setRequest(status); - if (compress && OutboundMessage.canCompress(message)) { + if (compress && NativeOutboundMessage.canCompress(message)) { status = TransportStatus.setCompress(status); } if (isHandshake) { @@ -165,11 +173,11 @@ private static byte setStatus(boolean compress, boolean isHandshake, Writeable m * * @opensearch.internal */ - static class Response extends OutboundMessage { + public static class Response extends NativeOutboundMessage { private final Set features; - Response( + public Response( ThreadContext threadContext, Set features, Writeable message, diff --git a/server/src/main/java/org/opensearch/transport/nativeprotocol/NativeOutboundMessageHandler.java b/server/src/main/java/org/opensearch/transport/nativeprotocol/NativeOutboundMessageHandler.java new file mode 100644 index 0000000000000..2f4f076a1c34b --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/nativeprotocol/NativeOutboundMessageHandler.java @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.nativeprotocol; + +import org.opensearch.Version; +import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.io.stream.ReleasableBytesStreamOutput; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.transport.TransportResponse; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.OutboundMessageHandler; +import org.opensearch.transport.ProtocolOutboundMessage; +import org.opensearch.transport.RemoteTransportException; +import org.opensearch.transport.StatsTracker; +import org.opensearch.transport.TcpChannel; +import org.opensearch.transport.TransportRequest; + +import java.io.IOException; +import java.util.Set; + +/** + * Outbound message handler for native transport protocol. + * + * @opensearch.internal + */ +public class NativeOutboundMessageHandler implements OutboundMessageHandler { + + private final String[] features; + private final StatsTracker statsTracker; + private final ThreadPool threadPool; + private final BigArrays bigArrays; + + public NativeOutboundMessageHandler(String[] features, StatsTracker statsTracker, ThreadPool threadPool, BigArrays bigArrays) { + this.features = features; + this.statsTracker = statsTracker; + this.threadPool = threadPool; + this.bigArrays = bigArrays; + } + + @Override + public void sendMessage(TcpChannel channel, ProtocolOutboundMessage networkMessage, ActionListener listener) throws IOException { + NativeOutboundMessage nativeOutboundMessage = (NativeOutboundMessage) networkMessage; + MessageSerializer serializer = new MessageSerializer(nativeOutboundMessage, bigArrays); + SendContext sendContext = new SendContext(channel, serializer, listener, serializer, statsTracker); + internalSend(channel, sendContext, threadPool); + } + + @Override + public NativeOutboundMessage.Request convertRequestToOutboundMessage( + final TransportRequest request, + final String action, + final long requestId, + final boolean isHandshake, + final boolean compressRequest, + final Version version + ) { + return new NativeOutboundMessage.Request( + threadPool.getThreadContext(), + features, + request, + version, + action, + requestId, + isHandshake, + compressRequest + ); + } + + @Override + public NativeOutboundMessage.Response convertResponseToOutboundMessage( + final TransportResponse response, + final Set features, + final long requestId, + final boolean isHandshake, + final boolean compress, + final Version version + ) { + return new NativeOutboundMessage.Response( + threadPool.getThreadContext(), + features, + response, + version, + requestId, + isHandshake, + compress + ); + } + + @Override + public NativeOutboundMessage.Response convertErrorResponseToOutboundMessage( + final RemoteTransportException tx, + final Set features, + final long requestId, + final boolean isHandshake, + final boolean compress, + final Version version + ) { + return new NativeOutboundMessage.Response(threadPool.getThreadContext(), features, tx, version, requestId, false, false); + } + + /** + * Internal message serializer + * + * @opensearch.internal + */ + private static class MessageSerializer implements CheckedSupplier, Releasable { + + private final NativeOutboundMessage message; + private final BigArrays bigArrays; + private volatile ReleasableBytesStreamOutput bytesStreamOutput; + + private MessageSerializer(NativeOutboundMessage message, BigArrays bigArrays) { + this.message = message; + this.bigArrays = bigArrays; + } + + @Override + public BytesReference get() throws IOException { + bytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays); + return message.serialize(bytesStreamOutput); + } + + @Override + public void close() { + IOUtils.closeWhileHandlingException(bytesStreamOutput); + } + } + +} diff --git a/server/src/test/java/org/opensearch/transport/InboundDecoderTests.java b/server/src/test/java/org/opensearch/transport/InboundDecoderTests.java index 4d671443f396e..7bd9b9712c27e 100644 --- a/server/src/test/java/org/opensearch/transport/InboundDecoderTests.java +++ b/server/src/test/java/org/opensearch/transport/InboundDecoderTests.java @@ -42,6 +42,7 @@ import org.opensearch.core.transport.TransportMessage; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; +import org.opensearch.transport.nativeprotocol.NativeOutboundMessage; import java.io.IOException; import java.util.ArrayList; @@ -70,9 +71,9 @@ public void testDecode() throws IOException { } else { threadContext.addResponseHeader(headerKey, headerValue); } - OutboundMessage message; + NativeOutboundMessage message; if (isRequest) { - message = new OutboundMessage.Request( + message = new NativeOutboundMessage.Request( threadContext, new String[0], new TestRequest(randomAlphaOfLength(100)), @@ -83,7 +84,7 @@ public void testDecode() throws IOException { false ); } else { - message = new OutboundMessage.Response( + message = new NativeOutboundMessage.Response( threadContext, Collections.emptySet(), new TestResponse(randomAlphaOfLength(100)), @@ -142,7 +143,7 @@ public void testDecodeHandshakeCompatibility() throws IOException { final String headerValue = randomAlphaOfLength(20); threadContext.putHeader(headerKey, headerValue); Version handshakeCompatVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion(); - OutboundMessage message = new OutboundMessage.Request( + NativeOutboundMessage message = new NativeOutboundMessage.Request( threadContext, new String[0], new TestRequest(randomAlphaOfLength(100)), @@ -183,11 +184,11 @@ public void testCompressedDecode() throws IOException { } else { threadContext.addResponseHeader(headerKey, headerValue); } - OutboundMessage message; + NativeOutboundMessage message; TransportMessage transportMessage; if (isRequest) { transportMessage = new TestRequest(randomAlphaOfLength(100)); - message = new OutboundMessage.Request( + message = new NativeOutboundMessage.Request( threadContext, new String[0], transportMessage, @@ -199,7 +200,7 @@ public void testCompressedDecode() throws IOException { ); } else { transportMessage = new TestResponse(randomAlphaOfLength(100)); - message = new OutboundMessage.Response( + message = new NativeOutboundMessage.Response( threadContext, Collections.emptySet(), transportMessage, @@ -260,7 +261,7 @@ public void testCompressedDecodeHandshakeCompatibility() throws IOException { final String headerValue = randomAlphaOfLength(20); threadContext.putHeader(headerKey, headerValue); Version handshakeCompatVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion(); - OutboundMessage message = new OutboundMessage.Request( + NativeOutboundMessage message = new NativeOutboundMessage.Request( threadContext, new String[0], new TestRequest(randomAlphaOfLength(100)), @@ -294,7 +295,7 @@ public void testVersionIncompatibilityDecodeException() throws IOException { String action = "test-request"; long requestId = randomNonNegativeLong(); Version incompatibleVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion(); - OutboundMessage message = new OutboundMessage.Request( + NativeOutboundMessage message = new NativeOutboundMessage.Request( threadContext, new String[0], new TestRequest(randomAlphaOfLength(100)), diff --git a/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java b/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java index 2dde27d62e759..4bdf72a266070 100644 --- a/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java @@ -57,6 +57,7 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.nativeprotocol.NativeInboundMessage; +import org.opensearch.transport.nativeprotocol.NativeOutboundMessage; import org.junit.After; import org.junit.Before; @@ -195,7 +196,7 @@ public TestResponse read(StreamInput in) throws IOException { ); requestHandlers.registerHandler(registry); String requestValue = randomAlphaOfLength(10); - OutboundMessage.Request request = new OutboundMessage.Request( + NativeOutboundMessage.Request request = new NativeOutboundMessage.Request( threadPool.getThreadContext(), new String[0], new TestRequest(requestValue), @@ -393,7 +394,7 @@ public TestResponse read(StreamInput in) throws IOException { requestHandlers.registerHandler(registry); String requestValue = randomAlphaOfLength(10); - OutboundMessage.Request request = new OutboundMessage.Request( + NativeOutboundMessage.Request request = new NativeOutboundMessage.Request( threadPool.getThreadContext(), new String[0], new TestRequest(requestValue), @@ -469,7 +470,7 @@ public TestResponse read(StreamInput in) throws IOException { requestHandlers.registerHandler(registry); String requestValue = randomAlphaOfLength(10); - OutboundMessage.Request request = new OutboundMessage.Request( + NativeOutboundMessage.Request request = new NativeOutboundMessage.Request( threadPool.getThreadContext(), new String[0], new TestRequest(requestValue), @@ -547,7 +548,7 @@ public TestResponse read(StreamInput in) throws IOException { ); requestHandlers.registerHandler(registry); String requestValue = randomAlphaOfLength(10); - OutboundMessage.Request request = new OutboundMessage.Request( + NativeOutboundMessage.Request request = new NativeOutboundMessage.Request( threadPool.getThreadContext(), new String[0], new TestRequest(requestValue), @@ -643,7 +644,7 @@ public TestResponse read(StreamInput in) throws IOException { ); requestHandlers.registerHandler(registry); String requestValue = randomAlphaOfLength(10); - OutboundMessage.Request request = new OutboundMessage.Request( + NativeOutboundMessage.Request request = new NativeOutboundMessage.Request( threadPool.getThreadContext(), new String[0], new TestRequest(requestValue), diff --git a/server/src/test/java/org/opensearch/transport/InboundPipelineTests.java b/server/src/test/java/org/opensearch/transport/InboundPipelineTests.java index d54f7e6fd2c2b..5c4cabf952b39 100644 --- a/server/src/test/java/org/opensearch/transport/InboundPipelineTests.java +++ b/server/src/test/java/org/opensearch/transport/InboundPipelineTests.java @@ -50,6 +50,7 @@ import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.transport.nativeprotocol.NativeInboundMessage; +import org.opensearch.transport.nativeprotocol.NativeOutboundMessage; import java.io.IOException; import java.util.ArrayList; @@ -128,11 +129,11 @@ public void testPipelineHandling() throws IOException { final MessageData messageData; Exception expectedExceptionClass = null; - OutboundMessage message; + NativeOutboundMessage message; if (isRequest) { if (rarely()) { messageData = new MessageData(version, requestId, true, isCompressed, breakThisAction, null); - message = new OutboundMessage.Request( + message = new NativeOutboundMessage.Request( threadContext, new String[0], new TestRequest(value), @@ -145,7 +146,7 @@ public void testPipelineHandling() throws IOException { expectedExceptionClass = new CircuitBreakingException("", CircuitBreaker.Durability.PERMANENT); } else { messageData = new MessageData(version, requestId, true, isCompressed, actionName, value); - message = new OutboundMessage.Request( + message = new NativeOutboundMessage.Request( threadContext, new String[0], new TestRequest(value), @@ -158,7 +159,7 @@ public void testPipelineHandling() throws IOException { } } else { messageData = new MessageData(version, requestId, false, isCompressed, null, value); - message = new OutboundMessage.Response( + message = new NativeOutboundMessage.Response( threadContext, Collections.emptySet(), new TestResponse(value), @@ -231,9 +232,9 @@ public void testDecodeExceptionIsPropagated() throws IOException { final boolean isRequest = randomBoolean(); final long requestId = randomNonNegativeLong(); - OutboundMessage message; + NativeOutboundMessage message; if (isRequest) { - message = new OutboundMessage.Request( + message = new NativeOutboundMessage.Request( threadContext, new String[0], new TestRequest(value), @@ -244,7 +245,7 @@ public void testDecodeExceptionIsPropagated() throws IOException { false ); } else { - message = new OutboundMessage.Response( + message = new NativeOutboundMessage.Response( threadContext, Collections.emptySet(), new TestResponse(value), @@ -285,9 +286,9 @@ public void testEnsureBodyIsNotPrematurelyReleased() throws IOException { final boolean isRequest = randomBoolean(); final long requestId = randomNonNegativeLong(); - OutboundMessage message; + NativeOutboundMessage message; if (isRequest) { - message = new OutboundMessage.Request( + message = new NativeOutboundMessage.Request( threadContext, new String[0], new TestRequest(value), @@ -298,7 +299,7 @@ public void testEnsureBodyIsNotPrematurelyReleased() throws IOException { false ); } else { - message = new OutboundMessage.Response( + message = new NativeOutboundMessage.Response( threadContext, Collections.emptySet(), new TestResponse(value), diff --git a/server/src/test/java/org/opensearch/transport/TransportLoggerTests.java b/server/src/test/java/org/opensearch/transport/TransportLoggerTests.java index 05296e9308657..fac471af5d9a8 100644 --- a/server/src/test/java/org/opensearch/transport/TransportLoggerTests.java +++ b/server/src/test/java/org/opensearch/transport/TransportLoggerTests.java @@ -43,6 +43,7 @@ import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.junit.annotations.TestLogging; +import org.opensearch.transport.nativeprotocol.NativeOutboundMessage; import java.io.IOException; @@ -93,7 +94,7 @@ public void testLoggingHandler() throws Exception { private BytesReference buildRequest() throws IOException { boolean compress = randomBoolean(); try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { - OutboundMessage.Request request = new OutboundMessage.Request( + NativeOutboundMessage.Request request = new NativeOutboundMessage.Request( new ThreadContext(Settings.EMPTY), new String[0], new ClusterStatsRequest(),