diff --git a/agent-module/agent/src/main/resources/pinpoint-root.config b/agent-module/agent/src/main/resources/pinpoint-root.config index 149fe89ef843..483e1e61c690 100644 --- a/agent-module/agent/src/main/resources/pinpoint-root.config +++ b/agent-module/agent/src/main/resources/pinpoint-root.config @@ -132,6 +132,16 @@ profiler.system.property.io.netty.tryReflectionSetAccessible=true # disable netty directbuffer profiler.system.property.io.netty.noPreferDirect=false +# Enable Grpc Hedging Policy +# https://github.com/grpc/proposal/blob/master/A6-client-retries.md#hedging-policy +profiler.transport.grpc.metadata.sender.retry.enable=false +profiler.transport.grpc.metadata.sender.max.attempts=3 +profiler.transport.grpc.metadata.sender.hedging.delay.millis=1000 +# 16777216 = 64m +profiler.transport.grpc.metadata.sender.retry.buffer.size=16777216 +# 1048576 = 1m +profiler.transport.grpc.metadata.sender.retry.per.rpc.buffer.limit=1048576 + ########################################################### # Profiler Global Configuration # ########################################################### diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/MetadataGrpcDataSenderProvider.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/MetadataGrpcDataSenderProvider.java index 74eff80907ec..300354f35eed 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/MetadataGrpcDataSenderProvider.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/MetadataGrpcDataSenderProvider.java @@ -34,6 +34,7 @@ import com.navercorp.pinpoint.profiler.context.module.MetadataDataSender; import com.navercorp.pinpoint.profiler.metadata.MetaDataType; import com.navercorp.pinpoint.profiler.sender.grpc.MetadataGrpcDataSender; +import com.navercorp.pinpoint.profiler.sender.grpc.MetadataGrpcHedgingDataSender; import io.grpc.ClientInterceptor; import io.grpc.NameResolverProvider; import io.netty.handler.ssl.SslContext; @@ -81,17 +82,22 @@ public EnhancedDataSender get() { final String collectorIp = grpcTransportConfig.getMetadataCollectorIp(); final int collectorPort = grpcTransportConfig.getMetadataCollectorPort(); final boolean sslEnable = grpcTransportConfig.isMetadataSslEnable(); - final int senderExecutorQueueSize = grpcTransportConfig.getMetadataSenderExecutorQueueSize(); + final boolean clientRetryEnable = grpcTransportConfig.isMetadataRetryEnable(); final ChannelFactoryBuilder channelFactoryBuilder = newChannelFactoryBuilder(sslEnable, clientRetryEnable); final ChannelFactory channelFactory = channelFactoryBuilder.build(); + final int senderExecutorQueueSize = grpcTransportConfig.getMetadataSenderExecutorQueueSize(); + + if (clientRetryEnable) { + return new MetadataGrpcHedgingDataSender<>(collectorIp, collectorPort, senderExecutorQueueSize, messageConverter, channelFactory); + } final int retryMaxCount = grpcTransportConfig.getMetadataRetryMaxCount(); final int retryDelayMillis = grpcTransportConfig.getMetadataRetryDelayMillis(); - return new MetadataGrpcDataSender<>(collectorIp, collectorPort, senderExecutorQueueSize, messageConverter, channelFactory, retryMaxCount, retryDelayMillis, clientRetryEnable); + return new MetadataGrpcDataSender<>(collectorIp, collectorPort, senderExecutorQueueSize, messageConverter, channelFactory, retryMaxCount, retryDelayMillis); } protected ChannelFactoryBuilder newChannelFactoryBuilder(boolean sslEnable, boolean clientRetryEnable) { @@ -99,7 +105,9 @@ protected ChannelFactoryBuilder newChannelFactoryBuilder(boolean sslEnable, bool final UnaryCallDeadlineInterceptor unaryCallDeadlineInterceptor = new UnaryCallDeadlineInterceptor(grpcTransportConfig.getMetadataRequestTimeout()); final ClientOption clientOption = grpcTransportConfig.getMetadataClientOption(); - ChannelFactoryBuilder channelFactoryBuilder = new DefaultChannelFactoryBuilder("MetadataGrpcDataSender"); + final String factoryName = getChannelFactoryName(clientRetryEnable); + + ChannelFactoryBuilder channelFactoryBuilder = new DefaultChannelFactoryBuilder(factoryName); channelFactoryBuilder.setHeaderFactory(headerFactory); channelFactoryBuilder.setNameResolverProvider(nameResolverProvider); channelFactoryBuilder.addClientInterceptor(unaryCallDeadlineInterceptor); @@ -131,4 +139,11 @@ protected ChannelFactoryBuilder newChannelFactoryBuilder(boolean sslEnable, bool return channelFactoryBuilder; } + + private String getChannelFactoryName(boolean clientRetryEnable) { + if (clientRetryEnable) { + return MetadataGrpcHedgingDataSender.class.getSimpleName(); + } + return MetadataGrpcDataSender.class.getSimpleName(); + } } \ No newline at end of file diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/LogResponseStreamObserver.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/LogResponseStreamObserver.java index a851094161c8..896a09739bc7 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/LogResponseStreamObserver.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/LogResponseStreamObserver.java @@ -27,33 +27,44 @@ public class LogResponseStreamObserver implements StreamObserver { private final Logger logger; + private final String name; + private final long requestCount; - public LogResponseStreamObserver(Logger logger) { + public LogResponseStreamObserver(Logger logger, String name, long requestCount) { this.logger = Objects.requireNonNull(logger, "logger"); + this.name = Objects.requireNonNull(name, "name"); + this.requestCount = requestCount; } @Override public void onNext(ResT response) { if (logger.isDebugEnabled()) { - logger.debug("Request success. result={}", logString(response)); + logger.debug("{} Request success. result={}", logString(response)); } } @Override public void onError(Throwable throwable) { - final StatusError statusError = StatusErrors.throwable(throwable); - if (statusError.isSimpleError()) { - logger.info("Error. cause={}", statusError.getMessage()); - } else { - logger.info("Error. cause={}", statusError.getMessage(), statusError.getThrowable()); + if (logger.isInfoEnabled()) { + final StatusError statusError = StatusErrors.throwable(throwable); + if (statusError.isSimpleError()) { + logger.info("{} Error. requestCount={}, cause={}", name, requestCount, statusError.getMessage()); + } else { + if (logger.isDebugEnabled()) { + logger.debug("{} Error. requestCount={}, cause={}", name, requestCount, statusError.getMessage(), statusError.getThrowable()); + } else { + logger.info("{} Error. requestCount={}, cause={}", name, requestCount, statusError.getMessage()); + } + + } } } @Override public void onCompleted() { if (logger.isDebugEnabled()) { - logger.debug("onCompleted"); + logger.debug("{} onCompleted. requestCount={}", requestCount, name); } } @@ -73,6 +84,7 @@ private String logString(Object message) { public String toString() { return "LogResponseStreamObserver{" + "logger=" + logger + + ", name='" + name + '\'' + '}'; } } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSender.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSender.java index 92d69dc3da3d..90b7b68a7d83 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSender.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSender.java @@ -49,7 +49,6 @@ public class MetadataGrpcDataSender extends GrpcDataSender implements Enha private final MetadataGrpc.MetadataStub metadataStub; private final int maxAttempts; private final int retryDelayMillis; - private final boolean clientRetryEnable; private final Timer retryTimer; private static final long MAX_PENDING_TIMEOUTS = 1024 * 4; @@ -58,7 +57,7 @@ public class MetadataGrpcDataSender extends GrpcDataSender implements Enha public MetadataGrpcDataSender(String host, int port, int executorQueueSize, MessageConverter messageConverter, - ChannelFactory channelFactory, int retryMaxCount, int retryDelayMillis, boolean clientRetryEnable) { + ChannelFactory channelFactory, int retryMaxCount, int retryDelayMillis) { super(host, port, executorQueueSize, messageConverter, channelFactory); this.maxAttempts = getMaxAttempts(retryMaxCount); @@ -78,7 +77,6 @@ public void scheduleNextRetry(GeneratedMessageV3 request, int remainingRetryCoun MetadataGrpcDataSender.this.scheduleNextRetry(request, remainingRetryCount); } }; - this.clientRetryEnable = clientRetryEnable; } private int getMaxAttempts(int retryMaxCount) { @@ -104,47 +102,13 @@ public boolean request(T data, BiConsumer listener) throw new UnsupportedOperationException("unsupported operation request(data, listener)"); } - //send with retry @Override public boolean send(T data) { - try { - final GeneratedMessageV3 message = messageConverter.toMessage(data); - - if (message instanceof PSqlMetaData) { - final PSqlMetaData sqlMetaData = (PSqlMetaData) message; - this.metadataStub.requestSqlMetaData(sqlMetaData, newLogStreamObserver()); - } else if (message instanceof PSqlUidMetaData) { - final PSqlUidMetaData sqlUidMetaData = (PSqlUidMetaData) message; - this.metadataStub.requestSqlUidMetaData(sqlUidMetaData, newLogStreamObserver()); - } else if (message instanceof PApiMetaData) { - final PApiMetaData apiMetaData = (PApiMetaData) message; - this.metadataStub.requestApiMetaData(apiMetaData, newLogStreamObserver()); - } else if (message instanceof PStringMetaData) { - final PStringMetaData stringMetaData = (PStringMetaData) message; - this.metadataStub.requestStringMetaData(stringMetaData, newLogStreamObserver()); - } else if (message instanceof PExceptionMetaData) { - final PExceptionMetaData exceptionMetaData = (PExceptionMetaData) message; - this.metadataStub.requestExceptionMetaData(exceptionMetaData, newLogStreamObserver()); - } else { - logger.warn("Unsupported message {}", MessageFormatUtils.debugLog(message)); - } - } catch (Exception e) { - logger.info("Failed to send metadata={}", data, e); - return false; - } - return true; - } - - private StreamObserver newLogStreamObserver() { - return new LogResponseStreamObserver<>(logger); + throw new UnsupportedOperationException("unsupported operation send(data)"); } @Override public boolean request(final T data) { - if (clientRetryEnable) { - return this.send(data); - } - final Runnable convertAndRun = new Runnable() { @Override public void run() { diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcHedgingDataSender.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcHedgingDataSender.java new file mode 100644 index 000000000000..6de2116b3c91 --- /dev/null +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcHedgingDataSender.java @@ -0,0 +1,116 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.profiler.sender.grpc; + +import com.google.protobuf.GeneratedMessageV3; +import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; +import com.navercorp.pinpoint.common.profiler.message.MessageConverter; +import com.navercorp.pinpoint.grpc.MessageFormatUtils; +import com.navercorp.pinpoint.grpc.client.ChannelFactory; +import com.navercorp.pinpoint.grpc.trace.MetadataGrpc; +import com.navercorp.pinpoint.grpc.trace.PApiMetaData; +import com.navercorp.pinpoint.grpc.trace.PExceptionMetaData; +import com.navercorp.pinpoint.grpc.trace.PResult; +import com.navercorp.pinpoint.grpc.trace.PSqlMetaData; +import com.navercorp.pinpoint.grpc.trace.PSqlUidMetaData; +import com.navercorp.pinpoint.grpc.trace.PStringMetaData; +import com.navercorp.pinpoint.io.ResponseMessage; +import io.grpc.stub.StreamObserver; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +/** + */ +public class MetadataGrpcHedgingDataSender extends GrpcDataSender implements EnhancedDataSender { + // + private final MetadataGrpc.MetadataStub metadataStub; + + private final AtomicLong requestCount = new AtomicLong(0); + + public MetadataGrpcHedgingDataSender(String host, int port, int executorQueueSize, + MessageConverter messageConverter, + ChannelFactory channelFactory) { + super(host, port, executorQueueSize, messageConverter, channelFactory); + + this.metadataStub = MetadataGrpc.newStub(managedChannel); + } + + // Unsupported Operation + @Override + public boolean request(T data, int retry) { + throw new UnsupportedOperationException("unsupported operation request(data, retry)"); + } + + @Override + public boolean request(T data, BiConsumer listener) { + throw new UnsupportedOperationException("unsupported operation request(data, listener)"); + } + + @Override + public boolean send(T data) { + throw new UnsupportedOperationException("unsupported operation send(data)"); + } + + @Override + public boolean request(final T data) { + try { + final GeneratedMessageV3 message = messageConverter.toMessage(data); + + if (message instanceof PSqlMetaData) { + final PSqlMetaData sqlMetaData = (PSqlMetaData) message; + this.metadataStub.requestSqlMetaData(sqlMetaData, newLogStreamObserver(sqlMetaData)); + } else if (message instanceof PSqlUidMetaData) { + final PSqlUidMetaData sqlUidMetaData = (PSqlUidMetaData) message; + this.metadataStub.requestSqlUidMetaData(sqlUidMetaData, newLogStreamObserver(sqlUidMetaData)); + } else if (message instanceof PApiMetaData) { + final PApiMetaData apiMetaData = (PApiMetaData) message; + this.metadataStub.requestApiMetaData(apiMetaData, newLogStreamObserver(apiMetaData)); + } else if (message instanceof PStringMetaData) { + final PStringMetaData stringMetaData = (PStringMetaData) message; + this.metadataStub.requestStringMetaData(stringMetaData, newLogStreamObserver(stringMetaData)); + } else if (message instanceof PExceptionMetaData) { + final PExceptionMetaData exceptionMetaData = (PExceptionMetaData) message; + this.metadataStub.requestExceptionMetaData(exceptionMetaData, newLogStreamObserver(exceptionMetaData)); + } else { + if (logger.isWarnEnabled()) { + logger.warn("Unsupported message {}", MessageFormatUtils.debugLog(message)); + } + } + } catch (Throwable e) { + logger.info("Failed to send metadata={}", data, e); + return false; + } + return true; + } + + private StreamObserver newLogStreamObserver(GeneratedMessageV3 message) { + String type = message.getClass().getSimpleName(); + long requestCount = this.requestCount.incrementAndGet(); + return new LogResponseStreamObserver<>(logger, type, requestCount); + } + + @Override + public void stop() { + if (shutdown) { + return; + } + this.shutdown = true; + + super.release(); + } +} \ No newline at end of file