Skip to content

Commit

Permalink
Cleaning up
Browse files Browse the repository at this point in the history
Signed-off-by: Vacha Shah <[email protected]>
  • Loading branch information
VachaShah committed May 7, 2024
1 parent c781428 commit c904eda
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 16 deletions.
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/transport/Header.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ Tuple<Map<String, String>, Map<String, Set<String>>> getHeaders() {
return headers;
}

public void finishParsingHeader(StreamInput input) throws IOException {
void finishParsingHeader(StreamInput input) throws IOException {
this.headers = ThreadContext.readHeadersFromStream(input);

if (isRequest()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private int headerBytesToRead(BytesReference reference) {
}

// exposed for use in tests
public static Header readHeader(Version version, int networkMessageSize, BytesReference bytesReference) throws IOException {
static Header readHeader(Version version, int networkMessageSize, BytesReference bytesReference) throws IOException {
try (StreamInput streamInput = bytesReference.streamInput()) {
streamInput.skip(TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE);
long requestId = streamInput.readLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.transport.TransportMessage;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;
Expand All @@ -58,7 +59,8 @@ protected abstract BytesReference serialize(
boolean handshake,
boolean compress,
String action,
long requestId
long requestId,
Writeable transportMessage
) throws IOException;

@Override
Expand All @@ -73,13 +75,15 @@ public void testDecode() throws IOException {
long requestId = randomNonNegativeLong();
final String headerKey = randomAlphaOfLength(10);
final String headerValue = randomAlphaOfLength(20);
final BytesReference totalBytes;
if (isRequest) {
threadContext.putHeader(headerKey, headerValue);
totalBytes = serialize(isRequest, Version.CURRENT, false, false, action, requestId, new TestRequest(randomAlphaOfLength(100)));
} else {
threadContext.addResponseHeader(headerKey, headerValue);
totalBytes = serialize(isRequest, Version.CURRENT, false, false, action, requestId, new TestResponse(randomAlphaOfLength(100)));
}

final BytesReference totalBytes = serialize(isRequest, Version.CURRENT, false, false, action, requestId);
int totalHeaderSize = TcpHeader.headerSize(Version.CURRENT) + totalBytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);
final BytesReference messageBytes = totalBytes.slice(totalHeaderSize, totalBytes.length() - totalHeaderSize);

Expand Down Expand Up @@ -128,7 +132,15 @@ public void testDecodeHandshakeCompatibility() throws IOException {
threadContext.putHeader(headerKey, headerValue);
Version handshakeCompatVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion();

final BytesReference bytes = serialize(true, handshakeCompatVersion, true, false, action, requestId);
final BytesReference bytes = serialize(
true,
handshakeCompatVersion,
true,
false,
action,
requestId,
new TestRequest(randomAlphaOfLength(100))
);
int totalHeaderSize = TcpHeader.headerSize(handshakeCompatVersion) + bytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);

InboundDecoder decoder = new InboundDecoder(Version.CURRENT, PageCacheRecycler.NON_RECYCLING_INSTANCE);
Expand Down Expand Up @@ -159,16 +171,15 @@ public void testCompressedDecode() throws IOException {
threadContext.addResponseHeader(headerKey, headerValue);
}
TransportMessage transportMessage;
BytesReference totalBytes;
final BytesReference totalBytes;
if (isRequest) {
transportMessage = new TestRequest(randomAlphaOfLength(100));
totalBytes = serialize(true, Version.CURRENT, false, true, action, requestId);
totalBytes = serialize(true, Version.CURRENT, false, true, action, requestId, transportMessage);
} else {
transportMessage = new TestResponse(randomAlphaOfLength(100));
totalBytes = serialize(false, Version.CURRENT, false, true, action, requestId);
totalBytes = serialize(false, Version.CURRENT, false, true, action, requestId, transportMessage);
}

// final BytesReference totalBytes = message.serialize(new BytesStreamOutput());
final BytesStreamOutput out = new BytesStreamOutput();
transportMessage.writeTo(out);
final BytesReference uncompressedBytes = out.bytes();
Expand Down Expand Up @@ -219,7 +230,15 @@ public void testCompressedDecodeHandshakeCompatibility() throws IOException {
threadContext.putHeader(headerKey, headerValue);
Version handshakeCompatVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion();

final BytesReference bytes = serialize(true, handshakeCompatVersion, true, true, action, requestId);
final BytesReference bytes = serialize(
true,
handshakeCompatVersion,
true,
true,
action,
requestId,
new TestRequest(randomAlphaOfLength(100))
);
int totalHeaderSize = TcpHeader.headerSize(handshakeCompatVersion) + bytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);

InboundDecoder decoder = new InboundDecoder(Version.CURRENT, PageCacheRecycler.NON_RECYCLING_INSTANCE);
Expand All @@ -243,7 +262,15 @@ public void testVersionIncompatibilityDecodeException() throws IOException {
long requestId = randomNonNegativeLong();
Version incompatibleVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion();

final BytesReference bytes = serialize(true, incompatibleVersion, false, true, action, requestId);
final BytesReference bytes = serialize(
true,
incompatibleVersion,
false,
true,
action,
requestId,
new TestRequest(randomAlphaOfLength(100))
);

InboundDecoder decoder = new InboundDecoder(Version.CURRENT, PageCacheRecycler.NON_RECYCLING_INSTANCE);
final ArrayList<Object> fragments = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@
import org.opensearch.Version;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.transport.InboundDecoderTests;
import org.opensearch.transport.TestRequest;
import org.opensearch.transport.TestResponse;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -27,14 +26,15 @@ protected BytesReference serialize(
boolean handshake,
boolean compress,
String action,
long requestId
long requestId,
Writeable transportMessage
) throws IOException {
NativeOutboundMessage message;
if (isRequest) {
message = new NativeOutboundMessage.Request(
threadContext,
new String[0],
new TestRequest(randomAlphaOfLength(100)),
transportMessage,
version,
action,
requestId,
Expand All @@ -45,7 +45,7 @@ protected BytesReference serialize(
message = new NativeOutboundMessage.Response(
threadContext,
Collections.emptySet(),
new TestResponse(randomAlphaOfLength(100)),
transportMessage,
version,
requestId,
handshake,
Expand Down

0 comments on commit c904eda

Please sign in to comment.