diff --git a/build.gradle b/build.gradle index 8c992f2b..db612e37 100644 --- a/build.gradle +++ b/build.gradle @@ -44,6 +44,7 @@ configurations { dependencies { implementation "org.opensearch:opensearch:3.0.0-SNAPSHOT" + implementation "org.opensearch.plugin:transport-netty4-client:3.0.0-SNAPSHOT" implementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.17.1' implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.17.1' implementation "io.netty:netty-all:4.1.73.Final" @@ -51,15 +52,6 @@ dependencies { exclude module : 'hamcrest' exclude module : 'hamcrest-core' } - // Keeping it for now. Will remove it later once figure out the right dependencies -// implementation "io.netty:netty-buffer:4.1.73.Final" -// implementation "io.netty:netty-codec:4.1.73.Final" -// implementation "io.netty:netty-codec-http:4.1.73.Final" -// implementation "io.netty:netty-common:4.1.73.Final" -// implementation "io.netty:netty-handler:4.1.73.Final" -// implementation "io.netty:netty-resolver:4.1.73.Final" -// implementation "io.netty:netty-transportservice.transport:4.1.73.Final" - //implementation "org.apache.logging.log4j:log4j-1.2-api:2.17.1" implementation 'javax.xml.bind:jaxb-api:2.2.2' implementation 'com.fasterxml.jackson.core:jackson-databind: 2.12.6.1' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml: 2.12.6.1' @@ -80,9 +72,6 @@ task requireJavadoc(type: JavaExec) { args "--dont-require-private=true" // javadocs on trivial getters/setters optional args "--dont-require-trivial-properties" - // the netty4 package will eventually be published to mavenCentral - // See https://github.com/opensearch-project/OpenSearch/issues/3118 - args "--exclude=netty4" } check.dependsOn requireJavadoc diff --git a/gradle/formatting.gradle b/gradle/formatting.gradle index 538c0d15..16d698b0 100644 --- a/gradle/formatting.gradle +++ b/gradle/formatting.gradle @@ -25,7 +25,6 @@ allprojects { format("license", { licenseHeaderFile("${rootProject.file("formatter/license-header.txt")}", "package "); target("src/main/java/**/*.java") - targetExclude("**/netty4/*") }) } } diff --git a/src/main/java/org/opensearch/sdk/ExtensionsRunner.java b/src/main/java/org/opensearch/sdk/ExtensionsRunner.java index 712952ae..048c3402 100644 --- a/src/main/java/org/opensearch/sdk/ExtensionsRunner.java +++ b/src/main/java/org/opensearch/sdk/ExtensionsRunner.java @@ -30,11 +30,11 @@ import org.opensearch.indices.IndicesModule; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.transport.netty4.Netty4Transport; +import org.opensearch.transport.SharedGroupFactory; import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler; import org.opensearch.sdk.handlers.ClusterStateResponseHandler; import org.opensearch.sdk.handlers.LocalNodeResponseHandler; -import org.opensearch.sdk.netty4.Netty4Transport; -import org.opensearch.sdk.netty4.SharedGroupFactory; import org.opensearch.search.SearchModule; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.ClusterConnectionManager; diff --git a/src/main/java/org/opensearch/sdk/netty4/CopyBytesServerSocketChannel.java b/src/main/java/org/opensearch/sdk/netty4/CopyBytesServerSocketChannel.java deleted file mode 100644 index 7a04e04d..00000000 --- a/src/main/java/org/opensearch/sdk/netty4/CopyBytesServerSocketChannel.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Copyright 2012 The transportservice.Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ -package org.opensearch.sdk.netty4; - -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.util.internal.SocketUtils; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.nio.channels.SocketChannel; -import java.util.List; - -/** - * This class is adapted from {@link NioServerSocketChannel} class in the transportservice.netty4.Netty project. It overrides the - * channel read messages behavior to ensure that a {@link CopyBytesSocketChannel} socket channel is created. - */ -public class CopyBytesServerSocketChannel extends NioServerSocketChannel { - - private static final Logger logger = LogManager.getLogger(CopyBytesServerSocketChannel.class); - - @Override - protected int doReadMessages(List buf) throws Exception { - SocketChannel ch = SocketUtils.accept(javaChannel()); - - try { - if (ch != null) { - buf.add(new CopyBytesSocketChannel(this, ch)); - return 1; - } - } catch (Throwable t) { - logger.warn("Failed to create a new channel from an accepted socket.", t); - - try { - ch.close(); - } catch (Throwable t2) { - logger.warn("Failed to close a socket.", t2); - } - } - - return 0; - } -} diff --git a/src/main/java/org/opensearch/sdk/netty4/CopyBytesSocketChannel.java b/src/main/java/org/opensearch/sdk/netty4/CopyBytesSocketChannel.java deleted file mode 100644 index 3cc52ba7..00000000 --- a/src/main/java/org/opensearch/sdk/netty4/CopyBytesSocketChannel.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Copyright 2012 The transportservice.netty4.Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.sdk.netty4; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelOutboundBuffer; -import io.netty.channel.RecvByteBufAllocator; -import io.netty.channel.socket.nio.NioSocketChannel; - -import org.opensearch.common.SuppressForbidden; -import org.opensearch.common.unit.ByteSizeValue; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; - -import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD; - -/** - * This class is adapted from {@link NioSocketChannel} class in the transportservice.Netty project. It overrides the channel - * read/write behavior to ensure that the bytes are always copied to a thread-local direct bytes buffer. This - * happens BEFORE the call to the Java {@link SocketChannel} is issued. - * - * The purpose of this class is to allow the disabling of netty direct buffer pooling while allowing us to - * control how bytes end up being copied to direct memory. If we simply disabled netty pooling, we would rely - * on the JDK's internal thread local buffer pooling. Instead, this class allows us to create a one thread - * local buffer with a defined size. - */ -@SuppressForbidden(reason = "Channel#write") -public class CopyBytesSocketChannel extends Netty4NioSocketChannel { - - private static final int MAX_BYTES_PER_WRITE = StrictMath.toIntExact( - ByteSizeValue.parseBytesSizeValue( - System.getProperty("opensearch.transportservice.transport.buffer.size", "1m"), - "opensearch.transportservice.transport.buffer.size" - ).getBytes() - ); - - private static final ThreadLocal ioBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(MAX_BYTES_PER_WRITE)); - private final WriteConfig writeConfig = new WriteConfig(); - - public CopyBytesSocketChannel() { - super(); - } - - CopyBytesSocketChannel(Channel parent, SocketChannel socket) { - super(parent, socket); - } - - @Override - protected void doWrite(ChannelOutboundBuffer in) throws Exception { - int writeSpinCount = config().getWriteSpinCount(); - do { - if (in.isEmpty()) { - // All written so clear OP_WRITE - clearOpWrite(); - // Directly return here so incompleteWrite(...) is not called. - return; - } - - // Ensure the pending writes are made of ByteBufs only. - int maxBytesPerGatheringWrite = writeConfig.getMaxBytesPerGatheringWrite(); - ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); - int nioBufferCnt = in.nioBufferCount(); - - if (nioBufferCnt == 0) {// We have something else beside ByteBuffers to write so fallback to normal writes. - writeSpinCount -= doWrite0(in); - } else { - // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need - // to check if the total size of all the buffers is non-zero. - ByteBuffer ioBuffer = getIoBuffer(); - copyBytes(nioBuffers, nioBufferCnt, ioBuffer); - ioBuffer.flip(); - - int attemptedBytes = ioBuffer.remaining(); - final int localWrittenBytes = writeToSocketChannel(javaChannel(), ioBuffer); - if (localWrittenBytes <= 0) { - incompleteWrite(true); - return; - } - adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); - setWrittenBytes(nioBuffers, localWrittenBytes); - in.removeBytes(localWrittenBytes); - --writeSpinCount; - } - } while (writeSpinCount > 0); - - incompleteWrite(writeSpinCount < 0); - } - - @Override - protected int doReadBytes(ByteBuf byteBuf) throws Exception { - final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); - int writeableBytes = Math.min(byteBuf.writableBytes(), MAX_BYTES_PER_WRITE); - allocHandle.attemptedBytesRead(writeableBytes); - ByteBuffer ioBuffer = getIoBuffer(); - ioBuffer.limit(writeableBytes); - int bytesRead = readFromSocketChannel(javaChannel(), ioBuffer); - ioBuffer.flip(); - if (bytesRead > 0) { - byteBuf.writeBytes(ioBuffer); - } - return bytesRead; - } - - // Protected so that tests can verify behavior and simulate partial writes - protected int writeToSocketChannel(SocketChannel socketChannel, ByteBuffer ioBuffer) throws IOException { - return socketChannel.write(ioBuffer); - } - - // Protected so that tests can verify behavior - protected int readFromSocketChannel(SocketChannel socketChannel, ByteBuffer ioBuffer) throws IOException { - return socketChannel.read(ioBuffer); - } - - private static ByteBuffer getIoBuffer() { - ByteBuffer ioBuffer = CopyBytesSocketChannel.ioBuffer.get(); - ioBuffer.clear(); - return ioBuffer; - } - - private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) { - // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change - // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try - // make a best effort to adjust as OS behavior changes. - if (attempted == written) { - if (attempted << 1 > oldMaxBytesPerGatheringWrite) { - writeConfig.setMaxBytesPerGatheringWrite(attempted << 1); - } - } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) { - writeConfig.setMaxBytesPerGatheringWrite(attempted >>> 1); - } - } - - private static void copyBytes(ByteBuffer[] source, int nioBufferCnt, ByteBuffer destination) { - for (int i = 0; i < nioBufferCnt && destination.hasRemaining(); i++) { - ByteBuffer buffer = source[i]; - int nBytesToCopy = Math.min(destination.remaining(), buffer.remaining()); - if (buffer.hasArray()) { - destination.put(buffer.array(), buffer.arrayOffset() + buffer.position(), nBytesToCopy); - } else { - int initialLimit = buffer.limit(); - int initialPosition = buffer.position(); - buffer.limit(buffer.position() + nBytesToCopy); - destination.put(buffer); - buffer.position(initialPosition); - buffer.limit(initialLimit); - } - } - } - - private static void setWrittenBytes(ByteBuffer[] source, int bytesWritten) { - for (int i = 0; bytesWritten > 0; i++) { - ByteBuffer buffer = source[i]; - int nBytes = Math.min(buffer.remaining(), bytesWritten); - buffer.position(buffer.position() + nBytes); - bytesWritten = bytesWritten - nBytes; - } - } - - private final class WriteConfig { - - private volatile int maxBytesPerGatheringWrite = MAX_BYTES_PER_WRITE; - - private WriteConfig() { - calculateMaxBytesPerGatheringWrite(); - } - - void setMaxBytesPerGatheringWrite(int maxBytesPerGatheringWrite) { - this.maxBytesPerGatheringWrite = Math.min(maxBytesPerGatheringWrite, MAX_BYTES_PER_WRITE); - } - - int getMaxBytesPerGatheringWrite() { - return maxBytesPerGatheringWrite; - } - - private void calculateMaxBytesPerGatheringWrite() { - // Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide. - int newSendBufferSize = config().getSendBufferSize() << 1; - if (newSendBufferSize > 0) { - setMaxBytesPerGatheringWrite(config().getSendBufferSize() << 1); - } - } - } -} diff --git a/src/main/java/org/opensearch/sdk/netty4/Netty4MessageChannelHandler.java b/src/main/java/org/opensearch/sdk/netty4/Netty4MessageChannelHandler.java deleted file mode 100644 index 72ec3e41..00000000 --- a/src/main/java/org/opensearch/sdk/netty4/Netty4MessageChannelHandler.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.sdk.netty4; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; - -import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchException; -import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.bytes.ReleasableBytesReference; -import org.opensearch.common.util.PageCacheRecycler; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.InboundPipeline; -import org.opensearch.transport.Transport; -import org.opensearch.transport.Transports; - -import java.nio.channels.ClosedChannelException; -import java.util.ArrayDeque; -import java.util.Queue; - -/** - * A handler (must be the last one!) that does size based frame decoding and forwards the actual message - * to the relevant action. - */ -final class Netty4MessageChannelHandler extends ChannelDuplexHandler { - - private final Netty4Transport transport; - - private final Queue queuedWrites = new ArrayDeque<>(); - - private WriteOperation currentWrite; - private final InboundPipeline pipeline; - - Netty4MessageChannelHandler(PageCacheRecycler recycler, Netty4Transport transport) { - this.transport = transport; - final ThreadPool threadPool = transport.getThreadPool(); - final Transport.RequestHandlers requestHandlers = transport.getRequestHandlers(); - this.pipeline = new InboundPipeline( - transport.getVersion(), - transport.getStatsTracker(), - recycler, - threadPool::relativeTimeInMillis, - transport.getInflightBreaker(), - requestHandlers::getHandler, - transport::inboundMessage - ); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext()); - assert Transports.assertTransportThread(); - assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass(); - - final ByteBuf buffer = (ByteBuf) msg; - Netty4TcpChannel channel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get(); - final BytesReference wrapped = Netty4Utils.toBytesReference(buffer); - System.out.println("MESSAGE RECEIVED:" + wrapped.utf8ToString()); - try (ReleasableBytesReference reference = new ReleasableBytesReference(wrapped, buffer::release)) { - pipeline.handleBytes(channel, reference); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext()); - ExceptionsHelper.maybeDieOnAnotherThread(cause); - final Throwable unwrapped = ExceptionsHelper.unwrap(cause, OpenSearchException.class); - final Throwable newCause = unwrapped != null ? unwrapped : cause; - Netty4TcpChannel tcpChannel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get(); - if (newCause instanceof Error) { - transport.onException(tcpChannel, new Exception(newCause)); - } else { - transport.onException(tcpChannel, (Exception) newCause); - } - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - assert msg instanceof ByteBuf; - assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext()); - final boolean queued = queuedWrites.offer(new WriteOperation((ByteBuf) msg, promise)); - assert queued; - assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext()); - } - - @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) { - assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext()); - if (ctx.channel().isWritable()) { - doFlush(ctx); - } - ctx.fireChannelWritabilityChanged(); - } - - @Override - public void flush(ChannelHandlerContext ctx) { - assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext()); - Channel channel = ctx.channel(); - if (channel.isWritable() || channel.isActive() == false) { - doFlush(ctx); - } - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext()); - doFlush(ctx); - // Releasables.closeWhileHandlingException(pipeline); - super.channelInactive(ctx); - } - - private void doFlush(ChannelHandlerContext ctx) { - assert ctx.executor().inEventLoop(); - final Channel channel = ctx.channel(); - if (channel.isActive() == false) { - if (currentWrite != null) { - currentWrite.promise.tryFailure(new ClosedChannelException()); - } - failQueuedWrites(); - return; - } - while (channel.isWritable()) { - if (currentWrite == null) { - currentWrite = queuedWrites.poll(); - } - if (currentWrite == null) { - break; - } - final WriteOperation write = currentWrite; - if (write.buf.readableBytes() == 0) { - write.promise.trySuccess(); - currentWrite = null; - continue; - } - final int readableBytes = write.buf.readableBytes(); - final int bufferSize = Math.min(readableBytes, 1 << 18); - final int readerIndex = write.buf.readerIndex(); - final boolean sliced = readableBytes != bufferSize; - final ByteBuf writeBuffer; - if (sliced) { - writeBuffer = write.buf.retainedSlice(readerIndex, bufferSize); - write.buf.readerIndex(readerIndex + bufferSize); - } else { - writeBuffer = write.buf; - } - final ChannelFuture writeFuture = ctx.write(writeBuffer); - if (sliced == false || write.buf.readableBytes() == 0) { - currentWrite = null; - writeFuture.addListener(future -> { - assert ctx.executor().inEventLoop(); - if (future.isSuccess()) { - write.promise.trySuccess(); - } else { - write.promise.tryFailure(future.cause()); - } - }); - } else { - writeFuture.addListener(future -> { - assert ctx.executor().inEventLoop(); - if (future.isSuccess() == false) { - write.promise.tryFailure(future.cause()); - } - }); - } - ctx.flush(); - if (channel.isActive() == false) { - failQueuedWrites(); - return; - } - } - } - - private void failQueuedWrites() { - WriteOperation queuedWrite; - while ((queuedWrite = queuedWrites.poll()) != null) { - queuedWrite.promise.tryFailure(new ClosedChannelException()); - } - } - - private static final class WriteOperation { - - private final ByteBuf buf; - - private final ChannelPromise promise; - - WriteOperation(ByteBuf buf, ChannelPromise promise) { - this.buf = buf; - this.promise = promise; - } - } -} diff --git a/src/main/java/org/opensearch/sdk/netty4/Netty4NioSocketChannel.java b/src/main/java/org/opensearch/sdk/netty4/Netty4NioSocketChannel.java deleted file mode 100644 index 1c1d72be..00000000 --- a/src/main/java/org/opensearch/sdk/netty4/Netty4NioSocketChannel.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.sdk.netty4; - -import io.netty.channel.Channel; -import io.netty.channel.socket.nio.NioSocketChannel; - -import java.nio.channels.SocketChannel; - -/** - * Helper class to expose {@link #javaChannel()} method - */ -public class Netty4NioSocketChannel extends NioSocketChannel { - - public Netty4NioSocketChannel() { - super(); - } - - public Netty4NioSocketChannel(Channel parent, SocketChannel socket) { - super(parent, socket); - } - - @Override - public SocketChannel javaChannel() { - return super.javaChannel(); - } - -} diff --git a/src/main/java/org/opensearch/sdk/netty4/Netty4TcpChannel.java b/src/main/java/org/opensearch/sdk/netty4/Netty4TcpChannel.java deleted file mode 100644 index eb7bf298..00000000 --- a/src/main/java/org/opensearch/sdk/netty4/Netty4TcpChannel.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.sdk.netty4; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPromise; -import org.opensearch.ExceptionsHelper; -import org.opensearch.action.ActionListener; -import org.opensearch.common.Nullable; -import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.concurrent.CompletableContext; -import org.opensearch.transport.TcpChannel; -import org.opensearch.transport.TransportException; - -import java.net.InetSocketAddress; - -public class Netty4TcpChannel implements TcpChannel { - - private final Channel channel; - private final boolean isServer; - private final String profile; - private final CompletableContext connectContext; - private final CompletableContext closeContext = new CompletableContext<>(); - private final ChannelStats stats = new ChannelStats(); - - public Netty4TcpChannel(Channel channel, boolean isServer, String profile, @Nullable ChannelFuture connectFuture) { - this.channel = channel; - this.isServer = isServer; - this.profile = profile; - this.connectContext = new CompletableContext<>(); - addListener(this.channel.closeFuture(), closeContext); - addListener(connectFuture, connectContext); - } - - /** - * Adds a listener that completes the given {@link CompletableContext} to the given {@link ChannelFuture}. - * @param channelFuture Channel future - * @param context Context to complete - */ - public static void addListener(ChannelFuture channelFuture, CompletableContext context) { - channelFuture.addListener(f -> { - if (f.isSuccess()) { - context.complete(null); - } else { - Throwable cause = f.cause(); - if (cause instanceof Error) { - ExceptionsHelper.maybeDieOnAnotherThread(cause); - context.completeExceptionally(new Exception(cause)); - } else { - context.completeExceptionally((Exception) cause); - } - } - }); - } - - /** - * Creates a {@link ChannelPromise} for the given {@link Channel} and adds a listener that invokes the given {@link ActionListener} - * on its completion. - * @param listener lister to invoke - * @param channel channel - * @return write promise - */ - public static ChannelPromise addPromise(ActionListener listener, Channel channel) { - ChannelPromise writePromise = channel.newPromise(); - writePromise.addListener(f -> { - if (f.isSuccess()) { - listener.onResponse(null); - } else { - final Throwable cause = f.cause(); - ExceptionsHelper.maybeDieOnAnotherThread(cause); - if (cause instanceof Error) { - listener.onFailure(new Exception(cause)); - } else { - listener.onFailure((Exception) cause); - } - } - }); - return writePromise; - } - - @Override - public void close() { - channel.close(); - } - - @Override - public boolean isServerChannel() { - return isServer; - } - - @Override - public String getProfile() { - return profile; - } - - @Override - public void addCloseListener(ActionListener listener) { - closeContext.addListener(ActionListener.toBiConsumer(listener)); - } - - @Override - public void addConnectListener(ActionListener listener) { - connectContext.addListener(ActionListener.toBiConsumer(listener)); - } - - @Override - public ChannelStats getChannelStats() { - return stats; - } - - @Override - public boolean isOpen() { - return channel.isOpen(); - } - - @Override - public InetSocketAddress getLocalAddress() { - return (InetSocketAddress) channel.localAddress(); - } - - @Override - public InetSocketAddress getRemoteAddress() { - return (InetSocketAddress) channel.remoteAddress(); - } - - @Override - public void sendMessage(BytesReference reference, ActionListener listener) { - channel.writeAndFlush(Netty4Utils.toByteBuf(reference), addPromise(listener, channel)); - - if (channel.eventLoop().isShutdown()) { - listener.onFailure(new TransportException("Cannot send message, event loop is shutting down.")); - } - } - - public Channel getNettyChannel() { - return channel; - } - - @Override - public String toString() { - return "Netty4TcpChannel{" + "localAddress=" + getLocalAddress() + ", remoteAddress=" + channel.remoteAddress() + '}'; - } -} diff --git a/src/main/java/org/opensearch/sdk/netty4/Netty4TcpServerChannel.java b/src/main/java/org/opensearch/sdk/netty4/Netty4TcpServerChannel.java deleted file mode 100644 index ce9a52a6..00000000 --- a/src/main/java/org/opensearch/sdk/netty4/Netty4TcpServerChannel.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.sdk.netty4; - -import io.netty.channel.Channel; -import org.opensearch.action.ActionListener; -import org.opensearch.common.concurrent.CompletableContext; -import org.opensearch.transport.TcpServerChannel; - -import java.net.InetSocketAddress; - -public class Netty4TcpServerChannel implements TcpServerChannel { - - private final Channel channel; - private final CompletableContext closeContext = new CompletableContext<>(); - - public Netty4TcpServerChannel(Channel channel) { - this.channel = channel; - Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext); - } - - @Override - public InetSocketAddress getLocalAddress() { - return (InetSocketAddress) channel.localAddress(); - } - - @Override - public void close() { - channel.close(); - } - - @Override - public void addCloseListener(ActionListener listener) { - closeContext.addListener(ActionListener.toBiConsumer(listener)); - } - - @Override - public boolean isOpen() { - return channel.isOpen(); - } - - @Override - public String toString() { - return "Netty4TcpChannel{" + "localAddress=" + getLocalAddress() + '}'; - } -} diff --git a/src/main/java/org/opensearch/sdk/netty4/Netty4Transport.java b/src/main/java/org/opensearch/sdk/netty4/Netty4Transport.java deleted file mode 100644 index 96489f67..00000000 --- a/src/main/java/org/opensearch/sdk/netty4/Netty4Transport.java +++ /dev/null @@ -1,414 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.sdk.netty4; - -import io.netty.bootstrap.Bootstrap; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.AdaptiveRecvByteBufAllocator; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.FixedRecvByteBufAllocator; -import io.netty.channel.RecvByteBufAllocator; -import io.netty.channel.socket.nio.NioChannelOption; -import io.netty.util.AttributeKey; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.ExceptionsHelper; -import org.opensearch.Version; -import org.opensearch.common.SuppressForbidden; -import org.opensearch.common.io.stream.NamedWriteableRegistry; -import org.opensearch.common.lease.Releasables; -import org.opensearch.common.network.NetworkService; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Setting.Property; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.ByteSizeUnit; -import org.opensearch.common.unit.ByteSizeValue; -import org.opensearch.common.util.PageCacheRecycler; -import org.opensearch.common.util.concurrent.OpenSearchExecutors; -import org.opensearch.core.internal.net.NetUtils; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.indices.breaker.CircuitBreakerService; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TcpTransport; -import org.opensearch.transport.TransportSettings; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketOption; -import java.util.Map; - -import static org.opensearch.common.settings.Setting.byteSizeSetting; -import static org.opensearch.common.settings.Setting.intSetting; -import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; - -/** - * There are 4 types of connections per node, low/med/high/ping. Low if for batch oriented APIs (like recovery or - * batch) with high payload that will cause regular request. (like search or single index) to take - * longer. Med is for the typical search / single doc index. And High for things like cluster state. Ping is reserved for - * sending out ping requests to other nodes. - */ -public class Netty4Transport extends TcpTransport { - private static final Logger logger = LogManager.getLogger(Netty4Transport.class); - - public static final Setting WORKER_COUNT = new Setting<>( - "transport.netty.worker_count", - (s) -> Integer.toString(OpenSearchExecutors.allocatedProcessors(s)), - (s) -> Setting.parseInt(s, 1, "transport.netty.worker_count"), - Property.NodeScope - ); - - public static final Setting NETTY_RECEIVE_PREDICTOR_SIZE = Setting.byteSizeSetting( - "transport.netty.receive_predictor_size", - new ByteSizeValue(64, ByteSizeUnit.KB), - Property.NodeScope - ); - public static final Setting NETTY_RECEIVE_PREDICTOR_MIN = byteSizeSetting( - "transport.netty.receive_predictor_min", - NETTY_RECEIVE_PREDICTOR_SIZE, - Property.NodeScope - ); - public static final Setting NETTY_RECEIVE_PREDICTOR_MAX = byteSizeSetting( - "transport.netty.receive_predictor_max", - NETTY_RECEIVE_PREDICTOR_SIZE, - Property.NodeScope - ); - public static final Setting NETTY_BOSS_COUNT = intSetting("transport.netty.boss_count", 1, 1, Property.NodeScope); - - private final SharedGroupFactory sharedGroupFactory; - private final RecvByteBufAllocator recvByteBufAllocator; - private final ByteSizeValue receivePredictorMin; - private final ByteSizeValue receivePredictorMax; - private final Map serverBootstraps = newConcurrentMap(); - private volatile Bootstrap clientBootstrap; - private volatile SharedGroupFactory.SharedGroup sharedGroup; - - public Netty4Transport( - Settings settings, - Version version, - ThreadPool threadPool, - NetworkService networkService, - PageCacheRecycler pageCacheRecycler, - NamedWriteableRegistry namedWriteableRegistry, - CircuitBreakerService circuitBreakerService, - SharedGroupFactory sharedGroupFactory - ) { - super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); - Netty4Utils.setAvailableProcessors(OpenSearchExecutors.NODE_PROCESSORS_SETTING.get(settings)); - NettyAllocator.logAllocatorDescriptionIfNeeded(); - this.sharedGroupFactory = sharedGroupFactory; - - // See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one - this.receivePredictorMin = NETTY_RECEIVE_PREDICTOR_MIN.get(settings); - this.receivePredictorMax = NETTY_RECEIVE_PREDICTOR_MAX.get(settings); - if (receivePredictorMax.getBytes() == receivePredictorMin.getBytes()) { - recvByteBufAllocator = new FixedRecvByteBufAllocator((int) receivePredictorMax.getBytes()); - } else { - recvByteBufAllocator = new AdaptiveRecvByteBufAllocator( - (int) receivePredictorMin.getBytes(), - (int) receivePredictorMin.getBytes(), - (int) receivePredictorMax.getBytes() - ); - } - } - - @Override - protected void doStart() { - boolean success = false; - try { - sharedGroup = sharedGroupFactory.getTransportGroup(); - clientBootstrap = createClientBootstrap(sharedGroup); - if (NetworkService.NETWORK_SERVER.get(settings)) { - for (ProfileSettings profileSettings : profileSettings) { - createServerBootstrap(profileSettings, sharedGroup); - bindServer(profileSettings); - } - } - super.doStart(); - success = true; - } finally { - if (success == false) { - doStop(); - } - } - } - - private Bootstrap createClientBootstrap(SharedGroupFactory.SharedGroup sharedGroup) { - final Bootstrap bootstrap = new Bootstrap(); - bootstrap.group(sharedGroup.getLowLevelGroup()); - - // NettyAllocator will return the channel type designed to work with the configured allocator - assert Netty4NioSocketChannel.class.isAssignableFrom(NettyAllocator.getChannelType()); - bootstrap.channel(NettyAllocator.getChannelType()); - bootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator()); - - bootstrap.option(ChannelOption.TCP_NODELAY, TransportSettings.TCP_NO_DELAY.get(settings)); - bootstrap.option(ChannelOption.SO_KEEPALIVE, TransportSettings.TCP_KEEP_ALIVE.get(settings)); - if (TransportSettings.TCP_KEEP_ALIVE.get(settings)) { - // Note that Netty logs a warning if it can't set the option - if (TransportSettings.TCP_KEEP_IDLE.get(settings) >= 0) { - final SocketOption keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull(); - if (keepIdleOption != null) { - bootstrap.option(NioChannelOption.of(keepIdleOption), TransportSettings.TCP_KEEP_IDLE.get(settings)); - } - } - if (TransportSettings.TCP_KEEP_INTERVAL.get(settings) >= 0) { - final SocketOption keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull(); - if (keepIntervalOption != null) { - bootstrap.option(NioChannelOption.of(keepIntervalOption), TransportSettings.TCP_KEEP_INTERVAL.get(settings)); - } - } - if (TransportSettings.TCP_KEEP_COUNT.get(settings) >= 0) { - final SocketOption keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull(); - if (keepCountOption != null) { - bootstrap.option(NioChannelOption.of(keepCountOption), TransportSettings.TCP_KEEP_COUNT.get(settings)); - } - } - } - - final ByteSizeValue tcpSendBufferSize = TransportSettings.TCP_SEND_BUFFER_SIZE.get(settings); - if (tcpSendBufferSize.getBytes() > 0) { - bootstrap.option(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes())); - } - - final ByteSizeValue tcpReceiveBufferSize = TransportSettings.TCP_RECEIVE_BUFFER_SIZE.get(settings); - if (tcpReceiveBufferSize.getBytes() > 0) { - bootstrap.option(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes())); - } - - bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator); - - final boolean reuseAddress = TransportSettings.TCP_REUSE_ADDRESS.get(settings); - bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress); - - return bootstrap; - } - - private void createServerBootstrap(ProfileSettings profileSettings, SharedGroupFactory.SharedGroup sharedGroup) { - String name = profileSettings.profileName; - if (logger.isDebugEnabled()) { - logger.debug( - "using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], receive_predictor[{}->{}]", - name, - sharedGroupFactory.getTransportWorkerCount(), - profileSettings.portOrRange, - profileSettings.bindHosts, - profileSettings.publishHosts, - receivePredictorMin, - receivePredictorMax - ); - } - - final ServerBootstrap serverBootstrap = new ServerBootstrap(); - - serverBootstrap.group(sharedGroup.getLowLevelGroup()); - - // NettyAllocator will return the channel type designed to work with the configuredAllocator - serverBootstrap.channel(NettyAllocator.getServerChannelType()); - - // Set the allocators for both the server channel and the child channels created - serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator()); - serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator()); - - serverBootstrap.childHandler(getServerChannelInitializer(name)); - serverBootstrap.handler(new ServerChannelExceptionHandler()); - - serverBootstrap.childOption(ChannelOption.TCP_NODELAY, profileSettings.tcpNoDelay); - serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, profileSettings.tcpKeepAlive); - if (profileSettings.tcpKeepAlive) { - // Note that Netty logs a warning if it can't set the option - if (profileSettings.tcpKeepIdle >= 0) { - final SocketOption keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull(); - if (keepIdleOption != null) { - serverBootstrap.childOption(NioChannelOption.of(keepIdleOption), profileSettings.tcpKeepIdle); - } - } - if (profileSettings.tcpKeepInterval >= 0) { - final SocketOption keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull(); - if (keepIntervalOption != null) { - serverBootstrap.childOption(NioChannelOption.of(keepIntervalOption), profileSettings.tcpKeepInterval); - } - - } - if (profileSettings.tcpKeepCount >= 0) { - final SocketOption keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull(); - if (keepCountOption != null) { - serverBootstrap.childOption(NioChannelOption.of(keepCountOption), profileSettings.tcpKeepCount); - } - } - } - - if (profileSettings.sendBufferSize.getBytes() != -1) { - serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(profileSettings.sendBufferSize.getBytes())); - } - - if (profileSettings.receiveBufferSize.getBytes() != -1) { - serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(profileSettings.receiveBufferSize.bytesAsInt())); - } - - serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator); - serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator); - - serverBootstrap.option(ChannelOption.SO_REUSEADDR, profileSettings.reuseAddress); - serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, profileSettings.reuseAddress); - serverBootstrap.validate(); - - serverBootstraps.put(name, serverBootstrap); - } - - protected ChannelHandler getServerChannelInitializer(String name) { - return new ServerChannelInitializer(name); - } - - protected ChannelHandler getClientChannelInitializer(DiscoveryNode node) { - return new ClientChannelInitializer(); - } - - static final AttributeKey CHANNEL_KEY = AttributeKey.newInstance("es-channel"); - static final AttributeKey SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-server-channel"); - - @Override - protected Netty4TcpChannel initiateChannel(DiscoveryNode node) throws IOException { - InetSocketAddress address = node.getAddress().address(); - Bootstrap bootstrapWithHandler = clientBootstrap.clone(); - bootstrapWithHandler.handler(getClientChannelInitializer(node)); - bootstrapWithHandler.remoteAddress(address); - ChannelFuture connectFuture = bootstrapWithHandler.connect(); - - Channel channel = connectFuture.channel(); - if (channel == null) { - ExceptionsHelper.maybeDieOnAnotherThread(connectFuture.cause()); - throw new IOException(connectFuture.cause()); - } - - Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, false, "default", connectFuture); - channel.attr(CHANNEL_KEY).set(nettyChannel); - - return nettyChannel; - } - - @Override - protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) { - Channel channel = serverBootstraps.get(name).bind(address).syncUninterruptibly().channel(); - Netty4TcpServerChannel esChannel = new Netty4TcpServerChannel(channel); - channel.attr(SERVER_CHANNEL_KEY).set(esChannel); - return esChannel; - } - - @Override - @SuppressForbidden(reason = "debug") - protected void stopInternal() { - Releasables.close(() -> { - if (sharedGroup != null) { - sharedGroup.shutdown(); - } - }, serverBootstraps::clear, () -> clientBootstrap = null); - } - - protected class ClientChannelInitializer extends ChannelInitializer { - - @Override - protected void initChannel(Channel ch) throws Exception { - addClosedExceptionLogger(ch); - assert ch instanceof Netty4NioSocketChannel; - NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel()); - ch.pipeline().addLast("logging", new OpenSearchLoggingHandler()); - // using a dot as a prefix means this cannot come from any settings parsed - ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this)); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - ExceptionsHelper.maybeDieOnAnotherThread(cause); - super.exceptionCaught(ctx, cause); - } - } - - protected class ServerChannelInitializer extends ChannelInitializer { - - protected final String name; - private final NettyByteBufSizer sizer = new NettyByteBufSizer(); - - protected ServerChannelInitializer(String name) { - this.name = name; - } - - @Override - protected void initChannel(Channel ch) throws Exception { - addClosedExceptionLogger(ch); - assert ch instanceof Netty4NioSocketChannel; - NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel()); - Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, ch.newSucceededFuture()); - ch.attr(CHANNEL_KEY).set(nettyTcpChannel); - ch.pipeline().addLast("byte_buf_sizer", sizer); - ch.pipeline().addLast("logging", new OpenSearchLoggingHandler()); - ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this)); - serverAcceptedChannel(nettyTcpChannel); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - ExceptionsHelper.maybeDieOnAnotherThread(cause); - super.exceptionCaught(ctx, cause); - } - } - - private void addClosedExceptionLogger(Channel channel) { - channel.closeFuture().addListener(f -> { - if (f.isSuccess() == false) { - logger.debug(() -> new ParameterizedMessage("exception while closing channel: {}", channel), f.cause()); - } - }); - } - - @ChannelHandler.Sharable - private class ServerChannelExceptionHandler extends ChannelInboundHandlerAdapter { - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - ExceptionsHelper.maybeDieOnAnotherThread(cause); - Netty4TcpServerChannel serverChannel = ctx.channel().attr(SERVER_CHANNEL_KEY).get(); - if (cause instanceof Error) { - onServerException(serverChannel, new Exception(cause)); - } else { - onServerException(serverChannel, (Exception) cause); - } - } - } -} diff --git a/src/main/java/org/opensearch/sdk/netty4/Netty4Utils.java b/src/main/java/org/opensearch/sdk/netty4/Netty4Utils.java deleted file mode 100644 index 47ff20c6..00000000 --- a/src/main/java/org/opensearch/sdk/netty4/Netty4Utils.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.sdk.netty4; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.util.NettyRuntime; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefIterator; -import org.opensearch.common.Booleans; -import org.opensearch.common.bytes.BytesArray; -import org.opensearch.common.bytes.BytesReference; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.concurrent.atomic.AtomicBoolean; - -public class Netty4Utils { - - private static final AtomicBoolean isAvailableProcessorsSet = new AtomicBoolean(); - - /** - * Set the number of available processors that transportservice.netty4.Netty uses for sizing various resources (e.g., thread pools). - * - * @param availableProcessors the number of available processors - * @throws IllegalStateException if available processors was set previously and the specified value does not match the already-set value - */ - public static void setAvailableProcessors(final int availableProcessors) { - // we set this to false in tests to avoid tests that randomly set processors from stepping on each other - final boolean set = Booleans.parseBoolean(System.getProperty("opensearch.set.netty.runtime.available.processors", "true")); - if (!set) { - return; - } - - /* - * This can be invoked twice, once from Netty4Transport and another time from Netty4HttpServerTransport; however, - * Netty4Runtime#availableProcessors forbids settings the number of processors twice so we prevent double invocation here. - */ - if (isAvailableProcessorsSet.compareAndSet(false, true)) { - NettyRuntime.setAvailableProcessors(availableProcessors); - } else if (availableProcessors != NettyRuntime.availableProcessors()) { - /* - * We have previously set the available processors yet either we are trying to set it to a different value now or there is a bug - * in transportservice.Netty and our previous value did not take, bail. - */ - final String message = String.format( - Locale.ROOT, - "available processors value [%d] did not match current value [%d]", - availableProcessors, - NettyRuntime.availableProcessors() - ); - throw new IllegalStateException(message); - } - } - - /** - * Turns the given BytesReference into a ByteBuf. Note: the returned ByteBuf will reference the internal - * pages of the BytesReference. Don't free the bytes of reference before the ByteBuf goes out of scope. - * - * @param reference The reference to change into a ByteBuf. - * @return The reference as a ByteBuf. - */ - public static ByteBuf toByteBuf(final BytesReference reference) { - if (reference.length() == 0) { - return Unpooled.EMPTY_BUFFER; - } - final BytesRefIterator iterator = reference.iterator(); - // usually we have one, two, or three components from the header, the message, and a buffer - final List buffers = new ArrayList<>(3); - try { - BytesRef slice; - while ((slice = iterator.next()) != null) { - buffers.add(Unpooled.wrappedBuffer(slice.bytes, slice.offset, slice.length)); - } - - if (buffers.size() == 1) { - return buffers.get(0); - } else { - CompositeByteBuf composite = Unpooled.compositeBuffer(buffers.size()); - composite.addComponents(true, buffers); - return composite; - } - } catch (IOException ex) { - throw new AssertionError("no IO happens here", ex); - } - } - - /** - * Wraps the given ChannelBuffer with a BytesReference - * - * @param buffer The ByteBuf to wrap. - * @return The wrapped ByteBuf as a BytesReference. - */ - public static BytesReference toBytesReference(final ByteBuf buffer) { - final int readableBytes = buffer.readableBytes(); - if (readableBytes == 0) { - return BytesArray.EMPTY; - } else if (buffer.hasArray()) { - return new BytesArray(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), readableBytes); - } else { - final ByteBuffer[] byteBuffers = buffer.nioBuffers(); - return BytesReference.fromByteBuffers(byteBuffers); - } - } -} diff --git a/src/main/java/org/opensearch/sdk/netty4/NettyAllocator.java b/src/main/java/org/opensearch/sdk/netty4/NettyAllocator.java deleted file mode 100644 index 03bada21..00000000 --- a/src/main/java/org/opensearch/sdk/netty4/NettyAllocator.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.sdk.netty4; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; -import io.netty.channel.Channel; -import io.netty.channel.ServerChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.common.Booleans; -import org.opensearch.common.unit.ByteSizeValue; -import org.opensearch.monitor.jvm.JvmInfo; - -import java.util.concurrent.atomic.AtomicBoolean; - -public class NettyAllocator { - - private static final Logger logger = LogManager.getLogger(NettyAllocator.class); - private static final AtomicBoolean descriptionLogged = new AtomicBoolean(false); - - private static final long SUGGESTED_MAX_ALLOCATION_SIZE; - private static final ByteBufAllocator ALLOCATOR; - private static final String DESCRIPTION; - - private static final String USE_UNPOOLED = "opensearch.use_unpooled_allocator"; - private static final String USE_NETTY_DEFAULT = "opensearch.unsafe.use_netty_default_allocator"; - private static final String USE_NETTY_DEFAULT_CHUNK = "opensearch.unsafe.use_netty_default_chunk_and_page_size"; - - static { - if (Booleans.parseBoolean(System.getProperty(USE_NETTY_DEFAULT), false)) { - ALLOCATOR = ByteBufAllocator.DEFAULT; - SUGGESTED_MAX_ALLOCATION_SIZE = 1024 * 1024; - DESCRIPTION = "[name=netty_default, suggested_max_allocation_size=" - + new ByteSizeValue(SUGGESTED_MAX_ALLOCATION_SIZE) - + ", factors={opensearch.unsafe.use_netty_default_allocator=true}]"; - } else { - final long heapSizeInBytes = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes(); - final boolean g1gcEnabled = Boolean.parseBoolean(JvmInfo.jvmInfo().useG1GC()); - final long g1gcRegionSizeInBytes = JvmInfo.jvmInfo().getG1RegionSize(); - final boolean g1gcRegionSizeIsKnown = g1gcRegionSizeInBytes != -1; - ByteSizeValue heapSize = new ByteSizeValue(heapSizeInBytes); - ByteSizeValue g1gcRegionSize = new ByteSizeValue(g1gcRegionSizeInBytes); - - ByteBufAllocator delegate; - if (useUnpooled(heapSizeInBytes, g1gcEnabled, g1gcRegionSizeIsKnown, g1gcRegionSizeInBytes)) { - delegate = UnpooledByteBufAllocator.DEFAULT; - if (g1gcEnabled && g1gcRegionSizeIsKnown) { - // Suggested max allocation size 1/4 of region size. Guard against unknown edge cases - // where this value would be less than 256KB. - SUGGESTED_MAX_ALLOCATION_SIZE = Math.max(g1gcRegionSizeInBytes >> 2, 256 * 1024); - } else { - SUGGESTED_MAX_ALLOCATION_SIZE = 1024 * 1024; - } - DESCRIPTION = "[name=unpooled, suggested_max_allocation_size=" - + new ByteSizeValue(SUGGESTED_MAX_ALLOCATION_SIZE) - + ", factors={opensearch.unsafe.use_unpooled_allocator=" - + System.getProperty(USE_UNPOOLED) - + ", g1gc_enabled=" - + g1gcEnabled - + ", g1gc_region_size=" - + g1gcRegionSize - + ", heap_size=" - + heapSize - + "}]"; - } else { - int nHeapArena = PooledByteBufAllocator.defaultNumHeapArena(); - int pageSize; - int maxOrder; - if (useDefaultChunkAndPageSize()) { - pageSize = PooledByteBufAllocator.defaultPageSize(); - maxOrder = PooledByteBufAllocator.defaultMaxOrder(); - } else { - pageSize = 8192; - if (g1gcEnabled == false || g1gcRegionSizeIsKnown == false || g1gcRegionSizeInBytes >= (4 * 1024 * 1024)) { - // This combined with a 8192 page size = 1 MB chunk sizes - maxOrder = 7; - } else if (g1gcRegionSizeInBytes >= (2 * 1024 * 1024)) { - // This combined with a 8192 page size = 512 KB chunk sizes - maxOrder = 6; - } else { - // This combined with a 8192 page size = 256 KB chunk sizes - maxOrder = 5; - } - } - int tinyCacheSize = PooledByteBufAllocator.defaultTinyCacheSize(); - int smallCacheSize = PooledByteBufAllocator.defaultSmallCacheSize(); - int normalCacheSize = PooledByteBufAllocator.defaultNormalCacheSize(); - boolean useCacheForAllThreads = PooledByteBufAllocator.defaultUseCacheForAllThreads(); - delegate = new PooledByteBufAllocator( - false, - nHeapArena, - 0, - pageSize, - maxOrder, - tinyCacheSize, - smallCacheSize, - normalCacheSize, - useCacheForAllThreads - ); - int chunkSizeInBytes = pageSize << maxOrder; - ByteSizeValue chunkSize = new ByteSizeValue(chunkSizeInBytes); - SUGGESTED_MAX_ALLOCATION_SIZE = chunkSizeInBytes; - DESCRIPTION = "[name=opensearch_configured, chunk_size=" - + chunkSize - + ", suggested_max_allocation_size=" - + new ByteSizeValue(SUGGESTED_MAX_ALLOCATION_SIZE) - + ", factors={opensearch.unsafe.use_netty_default_chunk_and_page_size=" - + useDefaultChunkAndPageSize() - + ", g1gc_enabled=" - + g1gcEnabled - + ", g1gc_region_size=" - + g1gcRegionSize - + "}]"; - } - ALLOCATOR = new NoDirectBuffers(delegate); - } - } - - public static void logAllocatorDescriptionIfNeeded() { - if (descriptionLogged.compareAndSet(false, true)) { - logger.info("creating NettyAllocator with the following configs: " + NettyAllocator.getAllocatorDescription()); - } - } - - public static ByteBufAllocator getAllocator() { - return ALLOCATOR; - } - - public static long suggestedMaxAllocationSize() { - return SUGGESTED_MAX_ALLOCATION_SIZE; - } - - public static String getAllocatorDescription() { - return DESCRIPTION; - } - - public static Class getChannelType() { - if (ALLOCATOR instanceof NoDirectBuffers) { - return CopyBytesSocketChannel.class; - } else { - return Netty4NioSocketChannel.class; - } - } - - public static Class getServerChannelType() { - if (ALLOCATOR instanceof NoDirectBuffers) { - return CopyBytesServerSocketChannel.class; - } else { - return NioServerSocketChannel.class; - } - } - - private static boolean useUnpooled(long heapSizeInBytes, boolean g1gcEnabled, boolean g1gcRegionSizeIsKnown, long g1RegionSize) { - if (userForcedUnpooled()) { - return true; - } else if (userForcedPooled()) { - return true; - } else if (heapSizeInBytes <= 1 << 30) { - // If the heap is 1GB or less we use unpooled - return true; - } else if (g1gcEnabled == false) { - return false; - } else { - // If the G1GC is enabled and the region size is known and is less than 1MB we use unpooled. - boolean g1gcRegionIsLessThan1MB = g1RegionSize < 1 << 20; - return (g1gcRegionSizeIsKnown && g1gcRegionIsLessThan1MB); - } - } - - private static boolean userForcedUnpooled() { - if (System.getProperty(USE_UNPOOLED) != null) { - return Booleans.parseBoolean(System.getProperty(USE_UNPOOLED)); - } else { - return false; - } - } - - private static boolean userForcedPooled() { - if (System.getProperty(USE_UNPOOLED) != null) { - return Booleans.parseBoolean(System.getProperty(USE_UNPOOLED)) == false; - } else { - return false; - } - } - - private static boolean useDefaultChunkAndPageSize() { - if (System.getProperty(USE_NETTY_DEFAULT_CHUNK) != null) { - return Booleans.parseBoolean(System.getProperty(USE_NETTY_DEFAULT_CHUNK)); - } else { - return false; - } - } - - public static class NoDirectBuffers implements ByteBufAllocator { - - private final ByteBufAllocator delegate; - - private NoDirectBuffers(ByteBufAllocator delegate) { - this.delegate = delegate; - } - - @Override - public ByteBuf buffer() { - return heapBuffer(); - } - - @Override - public ByteBuf buffer(int initialCapacity) { - return heapBuffer(initialCapacity); - } - - @Override - public ByteBuf buffer(int initialCapacity, int maxCapacity) { - return heapBuffer(initialCapacity, maxCapacity); - } - - @Override - public ByteBuf ioBuffer() { - return heapBuffer(); - } - - @Override - public ByteBuf ioBuffer(int initialCapacity) { - return heapBuffer(initialCapacity); - } - - @Override - public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) { - return heapBuffer(initialCapacity, maxCapacity); - } - - @Override - public ByteBuf heapBuffer() { - return delegate.heapBuffer(); - } - - @Override - public ByteBuf heapBuffer(int initialCapacity) { - return delegate.heapBuffer(initialCapacity); - } - - @Override - public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { - return delegate.heapBuffer(initialCapacity, maxCapacity); - } - - @Override - public ByteBuf directBuffer() { - throw new UnsupportedOperationException("Direct buffers not supported"); - } - - @Override - public ByteBuf directBuffer(int initialCapacity) { - throw new UnsupportedOperationException("Direct buffers not supported"); - } - - @Override - public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { - throw new UnsupportedOperationException("Direct buffers not supported"); - } - - @Override - public CompositeByteBuf compositeBuffer() { - return compositeHeapBuffer(); - } - - @Override - public CompositeByteBuf compositeBuffer(int maxNumComponents) { - return compositeHeapBuffer(maxNumComponents); - } - - @Override - public CompositeByteBuf compositeHeapBuffer() { - return delegate.compositeHeapBuffer(); - } - - @Override - public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { - return delegate.compositeHeapBuffer(maxNumComponents); - } - - @Override - public CompositeByteBuf compositeDirectBuffer() { - throw new UnsupportedOperationException("Direct buffers not supported."); - } - - @Override - public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { - throw new UnsupportedOperationException("Direct buffers not supported."); - } - - @Override - public boolean isDirectBufferPooled() { - assert delegate.isDirectBufferPooled() == false; - return false; - } - - @Override - public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { - return delegate.calculateNewCapacity(minNewCapacity, maxCapacity); - } - - public ByteBufAllocator getDelegate() { - return delegate; - } - } -} diff --git a/src/main/java/org/opensearch/sdk/netty4/NettyByteBufSizer.java b/src/main/java/org/opensearch/sdk/netty4/NettyByteBufSizer.java deleted file mode 100644 index 1dd57487..00000000 --- a/src/main/java/org/opensearch/sdk/netty4/NettyByteBufSizer.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.sdk.netty4; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; - -import java.util.List; - -@ChannelHandler.Sharable -public class NettyByteBufSizer extends MessageToMessageDecoder { - - @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List out) { - int readableBytes = buf.readableBytes(); - if (buf.capacity() >= 1024) { - ByteBuf resized = buf.discardReadBytes().capacity(readableBytes); - assert resized.readableBytes() == readableBytes; - out.add(resized.retain()); - } else { - out.add(buf.retain()); - } - } -} diff --git a/src/main/java/org/opensearch/sdk/netty4/OpenSearchLoggingHandler.java b/src/main/java/org/opensearch/sdk/netty4/OpenSearchLoggingHandler.java deleted file mode 100644 index 86916887..00000000 --- a/src/main/java/org/opensearch/sdk/netty4/OpenSearchLoggingHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.sdk.netty4; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; - -final class OpenSearchLoggingHandler extends LoggingHandler { - - OpenSearchLoggingHandler() { - super(LogLevel.TRACE); - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - // We do not want to log read complete events because we log inbound messages in the TcpTransport. - ctx.fireChannelReadComplete(); - } -} diff --git a/src/main/java/org/opensearch/sdk/netty4/SharedGroupFactory.java b/src/main/java/org/opensearch/sdk/netty4/SharedGroupFactory.java deleted file mode 100644 index 774f9b24..00000000 --- a/src/main/java/org/opensearch/sdk/netty4/SharedGroupFactory.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.sdk.netty4; - -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.util.concurrent.Future; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.concurrent.AbstractRefCounted; -import org.opensearch.transport.TcpTransport; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory; - -/** - * Creates and returns {@link io.netty.channel.EventLoopGroup} instances. It will return a shared group for - * both {@code #getHttpGroup()} (disabled) and {@link #getTransportGroup()} if - * {@code org.opensearch.http.netty4.Netty4HttpServerTransport#SETTING_HTTP_WORKER_COUNT} (disabled) is configured to be 0. - * If that setting is not 0, then it will return a different group in the {@code #getHttpGroup()} (disabled) call. - */ -public final class SharedGroupFactory { - - private static final Logger logger = LogManager.getLogger(SharedGroupFactory.class); - - private final Settings settings; - private final int workerCount; - // Remove HTTP calls - // private final int httpWorkerCount; - - private RefCountedGroup genericGroup; - private SharedGroup dedicatedHttpGroup; - - public SharedGroupFactory(Settings settings) { - this.settings = settings; - this.workerCount = Netty4Transport.WORKER_COUNT.get(settings); - // Remove HTTP calls - // this.httpWorkerCount = Netty4HttpServerTransport.SETTING_HTTP_WORKER_COUNT.get(settings); - } - - public Settings getSettings() { - return settings; - } - - public int getTransportWorkerCount() { - return workerCount; - } - - public synchronized SharedGroup getTransportGroup() { - return getGenericGroup(); - } - - // Remove HTTP calls - // public synchronized SharedGroup getHttpGroup() { - // if (httpWorkerCount == 0) { - // return getGenericGroup(); - // } else { - // if (dedicatedHttpGroup == null) { - // NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup( - // httpWorkerCount, - // daemonThreadFactory(settings, HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX) - // ); - // dedicatedHttpGroup = new SharedGroup(new RefCountedGroup(eventLoopGroup)); - // } - // return dedicatedHttpGroup; - // } - // } - - private SharedGroup getGenericGroup() { - if (genericGroup == null) { - EventLoopGroup eventLoopGroup = new NioEventLoopGroup( - workerCount, - daemonThreadFactory(settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX) - ); - this.genericGroup = new RefCountedGroup(eventLoopGroup); - } else { - genericGroup.incRef(); - } - return new SharedGroup(genericGroup); - } - - private static class RefCountedGroup extends AbstractRefCounted { - - public static final String NAME = "ref-counted-event-loop-group"; - private final EventLoopGroup eventLoopGroup; - - private RefCountedGroup(EventLoopGroup eventLoopGroup) { - super(NAME); - this.eventLoopGroup = eventLoopGroup; - } - - @Override - protected void closeInternal() { - Future shutdownFuture = eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS); - shutdownFuture.awaitUninterruptibly(); - if (shutdownFuture.isSuccess() == false) { - logger.warn("Error closing netty event loop group", shutdownFuture.cause()); - } - } - } - - /** - * Wraps the {@link RefCountedGroup}. Calls {@link RefCountedGroup#decRef()} on close. After close, - * this wrapped instance can no longer be used. - */ - public static class SharedGroup { - - private final RefCountedGroup refCountedGroup; - - private final AtomicBoolean isOpen = new AtomicBoolean(true); - - private SharedGroup(RefCountedGroup refCountedGroup) { - this.refCountedGroup = refCountedGroup; - } - - public EventLoopGroup getLowLevelGroup() { - return refCountedGroup.eventLoopGroup; - } - - public void shutdown() { - if (isOpen.compareAndSet(true, false)) { - refCountedGroup.decRef(); - } - } - } -} diff --git a/src/test/java/org/opensearch/sdk/TestNetty4Transport.java b/src/test/java/org/opensearch/sdk/TestNetty4Transport.java index e0b47e82..c3ef71ae 100644 --- a/src/test/java/org/opensearch/sdk/TestNetty4Transport.java +++ b/src/test/java/org/opensearch/sdk/TestNetty4Transport.java @@ -20,7 +20,7 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportSettings; -import org.opensearch.sdk.netty4.Netty4Transport; +import org.opensearch.transport.netty4.Netty4Transport; public class TestNetty4Transport extends OpenSearchTestCase { diff --git a/src/test/java/org/opensearch/sdk/TransportCommunicationIT.java b/src/test/java/org/opensearch/sdk/TransportCommunicationIT.java index 6dbee1c2..cc384d37 100644 --- a/src/test/java/org/opensearch/sdk/TransportCommunicationIT.java +++ b/src/test/java/org/opensearch/sdk/TransportCommunicationIT.java @@ -17,7 +17,7 @@ import org.opensearch.common.network.NetworkAddress; import org.opensearch.common.settings.Settings; import org.opensearch.common.transport.TransportAddress; -import org.opensearch.sdk.netty4.Netty4Transport; +import org.opensearch.transport.netty4.Netty4Transport; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService;