From 89a97e8cfe1ca0c0f629c567bbaf36029abbe6bb Mon Sep 17 00:00:00 2001 From: emeroad Date: Mon, 14 Oct 2024 18:27:17 +0900 Subject: [PATCH] [#11497] Apply ClientCallStateStreamObserver --- .../sender/grpc/ClientStreamingService.java | 4 +-- .../grpc/DefaultStreamEventListener.java | 14 ++++----- .../sender/grpc/ResponseStreamObserver.java | 25 ++++++++++++--- .../sender/grpc/SpanGrpcDataSender.java | 10 ++++-- .../sender/grpc/StatGrpcDataSender.java | 9 ++++-- .../sender/grpc/StreamEventListener.java | 4 +-- .../grpc/stream/ClientStreamingProvider.java | 4 +-- .../sender/grpc/stream/DefaultStreamTask.java | 31 +++++++++++-------- .../sender/grpc/stream/StreamJob.java | 4 +-- .../pinpoint/profiler/util/NamedRunnable.java | 2 +- .../receiver/grpc/SpanClientMock.java | 4 +-- 11 files changed, 70 insertions(+), 41 deletions(-) diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/ClientStreamingService.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/ClientStreamingService.java index a839ea9ea21f..fbe58d9b7582 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/ClientStreamingService.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/ClientStreamingService.java @@ -1,8 +1,8 @@ package com.navercorp.pinpoint.profiler.sender.grpc; +import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver; import com.navercorp.pinpoint.profiler.sender.grpc.stream.ClientStreamingProvider; import com.navercorp.pinpoint.profiler.sender.grpc.stream.StreamJob; -import io.grpc.stub.ClientCallStreamObserver; import java.util.Objects; @@ -16,7 +16,7 @@ public ClientStreamingService(ClientStreamingProvider clientStreamin this.reconnector = Objects.requireNonNull(reconnector, "reconnector"); } - public ClientCallStreamObserver newStream(StreamJob streamJob) { + public ClientCallStateStreamObserver newStream(StreamJob streamJob) { ResponseStreamObserver response = newResponse(streamJob); return clientStreamingProvider.newStream(response); } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/DefaultStreamEventListener.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/DefaultStreamEventListener.java index a864d9bcf608..6fc8fa40db9a 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/DefaultStreamEventListener.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/DefaultStreamEventListener.java @@ -1,7 +1,7 @@ package com.navercorp.pinpoint.profiler.sender.grpc; +import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver; import com.navercorp.pinpoint.profiler.sender.grpc.stream.StreamJob; -import io.grpc.stub.ClientCallStreamObserver; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -21,7 +21,7 @@ public DefaultStreamEventListener(Reconnector reconnector, StreamJob strea } @Override - public void start(final ClientCallStreamObserver requestStream) { + public void start(final ClientCallStateStreamObserver requestStream) { this.handle = streamJob.start(requestStream); reconnector.reset(); } @@ -29,15 +29,15 @@ public void start(final ClientCallStreamObserver requestStream) { @Override public void onError(Throwable t) { - final Future handle = this.handle; - if (handle != null) { - handle.cancel(true); - } - reconnector.reconnect(); + cancel(); } @Override public void onCompleted() { + cancel(); + } + + private void cancel() { final Future handle = this.handle; if (handle != null) { handle.cancel(true); diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/ResponseStreamObserver.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/ResponseStreamObserver.java index 0f3948553030..430f9c9af48f 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/ResponseStreamObserver.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/ResponseStreamObserver.java @@ -16,6 +16,8 @@ package com.navercorp.pinpoint.profiler.sender.grpc; +import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver; +import com.navercorp.pinpoint.grpc.stream.StreamUtils; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.stub.ClientCallStreamObserver; @@ -33,6 +35,7 @@ public class ResponseStreamObserver implements ClientResponseObserve private final Logger logger = LogManager.getLogger(this.getClass()); + private ClientCallStateStreamObserver requestStream; private final StreamEventListener listener; public ResponseStreamObserver(StreamEventListener listener) { @@ -40,9 +43,11 @@ public ResponseStreamObserver(StreamEventListener listener) { } @Override - public void beforeStart(final ClientCallStreamObserver requestStream) { + public void beforeStart(final ClientCallStreamObserver stream) { + this.requestStream = ClientCallStateStreamObserver.clientCall(stream); + logger.info("beforeStart {}", listener); - requestStream.setOnReadyHandler(new Runnable() { + this.requestStream.setOnReadyHandler(new Runnable() { private final AtomicLong isReadyCounter = new AtomicLong(0); @Override @@ -56,6 +61,10 @@ public void run() { }); } + public ClientCallStateStreamObserver getRequestStream() { + return requestStream; + } + @Override public void onNext(ResT value) { if (logger.isDebugEnabled()) { @@ -68,15 +77,23 @@ public void onError(Throwable t) { Status status = Status.fromThrowable(t); Metadata metadata = Status.trailersFromThrowable(t); - logger.info("Failed to stream, name={}, {} {}", listener, status, metadata); + logger.info("onError Failed to stream, name={}, {} {}", listener, status, metadata); listener.onError(t); + + if (requestStream.isRun()) { + StreamUtils.onCompleted(requestStream, (th) -> logger.info("ResponseStreamObserver.onError", th)); + } } @Override public void onCompleted() { - logger.info("{} onCompleted", listener); + logger.info("onCompleted {}", listener); listener.onCompleted(); + + if (requestStream.isRun()) { + StreamUtils.onCompleted(requestStream, (th) -> logger.info("ResponseStreamObserver.onCompleted", th)); + } } @Override diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/SpanGrpcDataSender.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/SpanGrpcDataSender.java index 78454366de80..226037b0d1d4 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/SpanGrpcDataSender.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/SpanGrpcDataSender.java @@ -21,6 +21,7 @@ import com.google.protobuf.GeneratedMessageV3; import com.navercorp.pinpoint.common.profiler.message.MessageConverter; import com.navercorp.pinpoint.grpc.client.ChannelFactory; +import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver; import com.navercorp.pinpoint.grpc.trace.PSpan; import com.navercorp.pinpoint.grpc.trace.PSpanChunk; import com.navercorp.pinpoint.grpc.trace.PSpanMessage; @@ -47,6 +48,7 @@ */ public class SpanGrpcDataSender extends GrpcDataSender { + private final SpanGrpc.SpanStub spanStub; private final ReconnectExecutor reconnectExecutor; private final Reconnector reconnector; @@ -95,6 +97,7 @@ public SpanGrpcDataSender(String host, int port, StreamState failState, long maxRpcAgeMillis) { super(host, port, executorQueueSize, messageConverter, channelFactory); + this.spanStub = SpanGrpc.newStub(managedChannel); this.interval = newIntervalFunction(maxRpcAgeMillis); this.rpcExpiredAt = new AtomicLong(System.currentTimeMillis()); @@ -112,14 +115,15 @@ public void run() { ClientStreamingProvider clientStreamProvider = new ClientStreamingProvider() { @Override - public ClientCallStreamObserver newStream(ResponseStreamObserver response) { + public ClientCallStateStreamObserver newStream(ResponseStreamObserver response) { final ManagedChannel managedChannel = SpanGrpcDataSender.this.managedChannel; String authority = managedChannel.authority(); final ConnectivityState state = managedChannel.getState(false); SpanGrpcDataSender.this.logger.info("newStream {}/{} state:{} isShutdown:{} isTerminated:{}", id, authority, state, managedChannel.isShutdown(), managedChannel.isTerminated()); - SpanGrpc.SpanStub spanStub = SpanGrpc.newStub(managedChannel); - return (ClientCallStreamObserver) spanStub.sendSpan(response); + spanStub.sendSpan(response); + + return response.getRequestStream(); } }; diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StatGrpcDataSender.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StatGrpcDataSender.java index d7d017a9c724..74f0c381cf1f 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StatGrpcDataSender.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StatGrpcDataSender.java @@ -21,6 +21,7 @@ import com.google.protobuf.GeneratedMessageV3; import com.navercorp.pinpoint.common.profiler.message.MessageConverter; import com.navercorp.pinpoint.grpc.client.ChannelFactory; +import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver; import com.navercorp.pinpoint.grpc.trace.PAgentStat; import com.navercorp.pinpoint.grpc.trace.PAgentStatBatch; import com.navercorp.pinpoint.grpc.trace.PAgentUriStat; @@ -44,6 +45,7 @@ public class StatGrpcDataSender extends GrpcDataSender { private static final String ID = "StatStream"; + private final StatGrpc.StatStub statStub; private final ReconnectExecutor reconnectExecutor; private final Reconnector reconnector; @@ -100,6 +102,7 @@ public StatGrpcDataSender(String host, int port, ReconnectExecutor reconnectExecutor, ChannelFactory channelFactory) { super(host, port, executorQueueSize, messageConverter, channelFactory); + this.statStub = StatGrpc.newStub(managedChannel); this.reconnectExecutor = Objects.requireNonNull(reconnectExecutor, "reconnectExecutor"); final Runnable reconnectJob = new NamedRunnable(ID) { @@ -114,10 +117,10 @@ public void run() { ClientStreamingProvider clientStreamProvider = new ClientStreamingProvider() { @Override - public ClientCallStreamObserver newStream(ResponseStreamObserver response) { + public ClientCallStateStreamObserver newStream(ResponseStreamObserver response) { logger.info("newStream {}", ID); - StatGrpc.StatStub statStub = StatGrpc.newStub(managedChannel); - return (ClientCallStreamObserver) statStub.sendAgentStat(response); + statStub.sendAgentStat(response); + return response.getRequestStream(); } }; this.clientStreamService = new ClientStreamingService<>(clientStreamProvider, reconnector); diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StreamEventListener.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StreamEventListener.java index f9b29b82427c..8afe76353448 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StreamEventListener.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StreamEventListener.java @@ -1,9 +1,9 @@ package com.navercorp.pinpoint.profiler.sender.grpc; -import io.grpc.stub.ClientCallStreamObserver; +import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver; public interface StreamEventListener { - void start(ClientCallStreamObserver requestStream); + void start(ClientCallStateStreamObserver requestStream); void onError(Throwable t); diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/stream/ClientStreamingProvider.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/stream/ClientStreamingProvider.java index 588cd07cfa3b..72c13daf7d90 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/stream/ClientStreamingProvider.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/stream/ClientStreamingProvider.java @@ -1,8 +1,8 @@ package com.navercorp.pinpoint.profiler.sender.grpc.stream; +import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver; import com.navercorp.pinpoint.profiler.sender.grpc.ResponseStreamObserver; -import io.grpc.stub.ClientCallStreamObserver; public interface ClientStreamingProvider { - ClientCallStreamObserver newStream(ResponseStreamObserver response); + ClientCallStateStreamObserver newStream(ResponseStreamObserver response); } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/stream/DefaultStreamTask.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/stream/DefaultStreamTask.java index d0b9bccf478a..2ebac701de1b 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/stream/DefaultStreamTask.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/stream/DefaultStreamTask.java @@ -1,12 +1,13 @@ package com.navercorp.pinpoint.profiler.sender.grpc.stream; +import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver; +import com.navercorp.pinpoint.grpc.stream.StreamUtils; import com.navercorp.pinpoint.profiler.sender.grpc.ClientStreamingService; import com.navercorp.pinpoint.profiler.sender.grpc.MessageDispatcher; import com.navercorp.pinpoint.profiler.sender.grpc.StreamId; import com.navercorp.pinpoint.profiler.sender.grpc.StreamState; import com.navercorp.pinpoint.profiler.sender.grpc.StreamTask; import com.navercorp.pinpoint.profiler.util.NamedRunnable; -import io.grpc.stub.ClientCallStreamObserver; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -28,7 +29,7 @@ public class DefaultStreamTask implements StreamTask { private final MessageDispatcher dispatcher; private final StreamState failState; - private volatile ClientCallStreamObserver stream; + private volatile ClientCallStateStreamObserver stream; private volatile CountDownLatch latch; private volatile boolean stop = false; @@ -49,7 +50,7 @@ public void start() { this.latch = new CountDownLatch(1); StreamJob job = new StreamJob() { @Override - public Future start(final ClientCallStreamObserver requestStream) { + public Future start(final ClientCallStateStreamObserver requestStream) { Runnable runnable = DefaultStreamTask.this.newRunnable(requestStream, latch); StreamExecutor streamExecutor = streamExecutorFactory.newStreamExecutor(); return streamExecutor.execute(runnable); @@ -70,14 +71,14 @@ enum FinishStatus { ISREADY_ERROR } - public Runnable newRunnable(final ClientCallStreamObserver requestStream, final CountDownLatch latch) { + public Runnable newRunnable(final ClientCallStateStreamObserver requestStream, final CountDownLatch latch) { return new NamedRunnable(streamId.toString()) { @Override public void run() { dispatch(requestStream); } - private void dispatch(ClientCallStreamObserver stream) { + private void dispatch(ClientCallStateStreamObserver stream) { logger.info("dispatch start {}", this); FinishStatus status = FinishStatus.UNKNOWN; @@ -97,8 +98,7 @@ private void dispatch(ClientCallStreamObserver stream) { failState.fail(); if (failState.isFailure()) { - logger.info("isReadyState error, Trigger stream.cancel {}", this); - stream.cancel("isReadyState error", new Exception("isReadyState error")); + logger.info("isReadyState error {}", this); status = FinishStatus.ISREADY_ERROR; break; } @@ -110,7 +110,9 @@ private void dispatch(ClientCallStreamObserver stream) { status = FinishStatus.INTERRUPTED; } catch (Throwable th) { logger.error("Unexpected DispatchThread error {}/{}", Thread.currentThread().getName(), this, th); - stream.onError(th); + } + if (stream.isRun()) { + StreamUtils.onCompleted(stream, (ex) -> logger.info("stream stop", ex)); } logger.info("dispatch thread end status:{} {}", status, this); @@ -120,17 +122,20 @@ private void dispatch(ClientCallStreamObserver stream) { }; } - @Override public void stop() { logger.info("stop start {}", this.streamId); - + if (stop) { + logger.info("already stop {}", this.streamId); + return; + } this.stop = true; - final ClientCallStreamObserver copy = this.stream; + final ClientCallStateStreamObserver copy = this.stream; if (copy != null) { -// copy.cancel("stream stop", new Exception("stream stop")); - copy.onCompleted(); + if (copy.isRun()) { + StreamUtils.onCompleted(copy, (th) -> logger.info("stream stop", th)); + } } final CountDownLatch latch = this.latch; if (latch != null) { diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/stream/StreamJob.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/stream/StreamJob.java index 5279e8d12b35..3c709d10763a 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/stream/StreamJob.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/stream/StreamJob.java @@ -1,11 +1,11 @@ package com.navercorp.pinpoint.profiler.sender.grpc.stream; -import io.grpc.stub.ClientCallStreamObserver; +import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver; import java.util.concurrent.Future; public interface StreamJob { - Future start(final ClientCallStreamObserver requestStream); + Future start(final ClientCallStateStreamObserver requestStream); } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/util/NamedRunnable.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/util/NamedRunnable.java index b2fa6072bdd9..79b37e378cf5 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/util/NamedRunnable.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/util/NamedRunnable.java @@ -9,6 +9,6 @@ public NamedRunnable(String name) { @Override public String toString() { - return "name='" + name; + return name; } } diff --git a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/SpanClientMock.java b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/SpanClientMock.java index 9e8efa57ffef..3372724fbe47 100644 --- a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/SpanClientMock.java +++ b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/SpanClientMock.java @@ -29,6 +29,7 @@ import com.navercorp.pinpoint.grpc.client.interceptor.DiscardClientInterceptor; import com.navercorp.pinpoint.grpc.client.interceptor.DiscardEventListener; import com.navercorp.pinpoint.grpc.client.interceptor.LoggingDiscardEventListener; +import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver; import com.navercorp.pinpoint.grpc.trace.PAnnotation; import com.navercorp.pinpoint.grpc.trace.PAnnotationValue; import com.navercorp.pinpoint.grpc.trace.PSpan; @@ -44,7 +45,6 @@ import com.navercorp.pinpoint.profiler.sender.grpc.StreamId; import io.grpc.ClientInterceptor; import io.grpc.ManagedChannel; -import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.StreamObserver; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -136,7 +136,7 @@ private StreamObserver newSpanStream() { StreamEventListener listener = new StreamEventListener() { @Override - public void start(ClientCallStreamObserver requestStream) { + public void start(ClientCallStateStreamObserver requestStream) { spanStreamReconnector.reset(); }