Skip to content

Commit

Permalink
[#11497] Apply ClientCallStateStreamObserver
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Oct 16, 2024
1 parent b2fdded commit 89a97e8
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.navercorp.pinpoint.profiler.sender.grpc;

import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver;
import com.navercorp.pinpoint.profiler.sender.grpc.stream.ClientStreamingProvider;
import com.navercorp.pinpoint.profiler.sender.grpc.stream.StreamJob;
import io.grpc.stub.ClientCallStreamObserver;

import java.util.Objects;

Expand All @@ -16,7 +16,7 @@ public ClientStreamingService(ClientStreamingProvider<ReqT, ResT> clientStreamin
this.reconnector = Objects.requireNonNull(reconnector, "reconnector");
}

public ClientCallStreamObserver<ReqT> newStream(StreamJob<ReqT> streamJob) {
public ClientCallStateStreamObserver<ReqT> newStream(StreamJob<ReqT> streamJob) {
ResponseStreamObserver<ReqT, ResT> response = newResponse(streamJob);
return clientStreamingProvider.newStream(response);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.navercorp.pinpoint.profiler.sender.grpc;

import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver;
import com.navercorp.pinpoint.profiler.sender.grpc.stream.StreamJob;
import io.grpc.stub.ClientCallStreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -21,23 +21,23 @@ public DefaultStreamEventListener(Reconnector reconnector, StreamJob<ReqT> strea
}

@Override
public void start(final ClientCallStreamObserver<ReqT> requestStream) {
public void start(final ClientCallStateStreamObserver<ReqT> requestStream) {
this.handle = streamJob.start(requestStream);
reconnector.reset();
}


@Override
public void onError(Throwable t) {
final Future<?> handle = this.handle;
if (handle != null) {
handle.cancel(true);
}
reconnector.reconnect();
cancel();
}

@Override
public void onCompleted() {
cancel();
}

private void cancel() {
final Future<?> handle = this.handle;
if (handle != null) {
handle.cancel(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.navercorp.pinpoint.profiler.sender.grpc;

import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver;
import com.navercorp.pinpoint.grpc.stream.StreamUtils;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.stub.ClientCallStreamObserver;
Expand All @@ -33,16 +35,19 @@ public class ResponseStreamObserver<ReqT, ResT> implements ClientResponseObserve

private final Logger logger = LogManager.getLogger(this.getClass());

private ClientCallStateStreamObserver<ReqT> requestStream;
private final StreamEventListener<ReqT> listener;

public ResponseStreamObserver(StreamEventListener<ReqT> listener) {
this.listener = Objects.requireNonNull(listener, "listener");
}

@Override
public void beforeStart(final ClientCallStreamObserver<ReqT> requestStream) {
public void beforeStart(final ClientCallStreamObserver<ReqT> stream) {
this.requestStream = ClientCallStateStreamObserver.clientCall(stream);

logger.info("beforeStart {}", listener);
requestStream.setOnReadyHandler(new Runnable() {
this.requestStream.setOnReadyHandler(new Runnable() {
private final AtomicLong isReadyCounter = new AtomicLong(0);

@Override
Expand All @@ -56,6 +61,10 @@ public void run() {
});
}

public ClientCallStateStreamObserver<ReqT> getRequestStream() {
return requestStream;
}

@Override
public void onNext(ResT value) {
if (logger.isDebugEnabled()) {
Expand All @@ -68,15 +77,23 @@ public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
Metadata metadata = Status.trailersFromThrowable(t);

logger.info("Failed to stream, name={}, {} {}", listener, status, metadata);
logger.info("onError Failed to stream, name={}, {} {}", listener, status, metadata);

listener.onError(t);

if (requestStream.isRun()) {
StreamUtils.onCompleted(requestStream, (th) -> logger.info("ResponseStreamObserver.onError", th));
}
}

@Override
public void onCompleted() {
logger.info("{} onCompleted", listener);
logger.info("onCompleted {}", listener);
listener.onCompleted();

if (requestStream.isRun()) {
StreamUtils.onCompleted(requestStream, (th) -> logger.info("ResponseStreamObserver.onCompleted", th));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.protobuf.GeneratedMessageV3;
import com.navercorp.pinpoint.common.profiler.message.MessageConverter;
import com.navercorp.pinpoint.grpc.client.ChannelFactory;
import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver;
import com.navercorp.pinpoint.grpc.trace.PSpan;
import com.navercorp.pinpoint.grpc.trace.PSpanChunk;
import com.navercorp.pinpoint.grpc.trace.PSpanMessage;
Expand All @@ -47,6 +48,7 @@
*/
public class SpanGrpcDataSender extends GrpcDataSender<SpanType> {

private final SpanGrpc.SpanStub spanStub;
private final ReconnectExecutor reconnectExecutor;

private final Reconnector reconnector;
Expand Down Expand Up @@ -95,6 +97,7 @@ public SpanGrpcDataSender(String host, int port,
StreamState failState,
long maxRpcAgeMillis) {
super(host, port, executorQueueSize, messageConverter, channelFactory);
this.spanStub = SpanGrpc.newStub(managedChannel);

this.interval = newIntervalFunction(maxRpcAgeMillis);
this.rpcExpiredAt = new AtomicLong(System.currentTimeMillis());
Expand All @@ -112,14 +115,15 @@ public void run() {

ClientStreamingProvider<PSpanMessage, Empty> clientStreamProvider = new ClientStreamingProvider<PSpanMessage, Empty>() {
@Override
public ClientCallStreamObserver<PSpanMessage> newStream(ResponseStreamObserver<PSpanMessage, Empty> response) {
public ClientCallStateStreamObserver<PSpanMessage> newStream(ResponseStreamObserver<PSpanMessage, Empty> response) {
final ManagedChannel managedChannel = SpanGrpcDataSender.this.managedChannel;
String authority = managedChannel.authority();
final ConnectivityState state = managedChannel.getState(false);
SpanGrpcDataSender.this.logger.info("newStream {}/{} state:{} isShutdown:{} isTerminated:{}", id, authority, state, managedChannel.isShutdown(), managedChannel.isTerminated());

SpanGrpc.SpanStub spanStub = SpanGrpc.newStub(managedChannel);
return (ClientCallStreamObserver<PSpanMessage>) spanStub.sendSpan(response);
spanStub.sendSpan(response);

return response.getRequestStream();
}

};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.protobuf.GeneratedMessageV3;
import com.navercorp.pinpoint.common.profiler.message.MessageConverter;
import com.navercorp.pinpoint.grpc.client.ChannelFactory;
import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver;
import com.navercorp.pinpoint.grpc.trace.PAgentStat;
import com.navercorp.pinpoint.grpc.trace.PAgentStatBatch;
import com.navercorp.pinpoint.grpc.trace.PAgentUriStat;
Expand All @@ -44,6 +45,7 @@
public class StatGrpcDataSender extends GrpcDataSender<MetricType> {
private static final String ID = "StatStream";

private final StatGrpc.StatStub statStub;
private final ReconnectExecutor reconnectExecutor;

private final Reconnector reconnector;
Expand Down Expand Up @@ -100,6 +102,7 @@ public StatGrpcDataSender(String host, int port,
ReconnectExecutor reconnectExecutor,
ChannelFactory channelFactory) {
super(host, port, executorQueueSize, messageConverter, channelFactory);
this.statStub = StatGrpc.newStub(managedChannel);

this.reconnectExecutor = Objects.requireNonNull(reconnectExecutor, "reconnectExecutor");
final Runnable reconnectJob = new NamedRunnable(ID) {
Expand All @@ -114,10 +117,10 @@ public void run() {

ClientStreamingProvider<PStatMessage, Empty> clientStreamProvider = new ClientStreamingProvider<PStatMessage, Empty>() {
@Override
public ClientCallStreamObserver<PStatMessage> newStream(ResponseStreamObserver<PStatMessage, Empty> response) {
public ClientCallStateStreamObserver<PStatMessage> newStream(ResponseStreamObserver<PStatMessage, Empty> response) {
logger.info("newStream {}", ID);
StatGrpc.StatStub statStub = StatGrpc.newStub(managedChannel);
return (ClientCallStreamObserver<PStatMessage>) statStub.sendAgentStat(response);
statStub.sendAgentStat(response);
return response.getRequestStream();
}
};
this.clientStreamService = new ClientStreamingService<>(clientStreamProvider, reconnector);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.navercorp.pinpoint.profiler.sender.grpc;

import io.grpc.stub.ClientCallStreamObserver;
import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver;

public interface StreamEventListener<ReqT> {
void start(ClientCallStreamObserver<ReqT> requestStream);
void start(ClientCallStateStreamObserver<ReqT> requestStream);

void onError(Throwable t);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.navercorp.pinpoint.profiler.sender.grpc.stream;

import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver;
import com.navercorp.pinpoint.profiler.sender.grpc.ResponseStreamObserver;
import io.grpc.stub.ClientCallStreamObserver;

public interface ClientStreamingProvider<ReqT, ResT> {
ClientCallStreamObserver<ReqT> newStream(ResponseStreamObserver<ReqT, ResT> response);
ClientCallStateStreamObserver<ReqT> newStream(ResponseStreamObserver<ReqT, ResT> response);
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.navercorp.pinpoint.profiler.sender.grpc.stream;

import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver;
import com.navercorp.pinpoint.grpc.stream.StreamUtils;
import com.navercorp.pinpoint.profiler.sender.grpc.ClientStreamingService;
import com.navercorp.pinpoint.profiler.sender.grpc.MessageDispatcher;
import com.navercorp.pinpoint.profiler.sender.grpc.StreamId;
import com.navercorp.pinpoint.profiler.sender.grpc.StreamState;
import com.navercorp.pinpoint.profiler.sender.grpc.StreamTask;
import com.navercorp.pinpoint.profiler.util.NamedRunnable;
import io.grpc.stub.ClientCallStreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -28,7 +29,7 @@ public class DefaultStreamTask<M, ReqT, ResT> implements StreamTask<M, ReqT> {
private final MessageDispatcher<M, ReqT> dispatcher;
private final StreamState failState;

private volatile ClientCallStreamObserver<ReqT> stream;
private volatile ClientCallStateStreamObserver<ReqT> stream;
private volatile CountDownLatch latch;
private volatile boolean stop = false;

Expand All @@ -49,7 +50,7 @@ public void start() {
this.latch = new CountDownLatch(1);
StreamJob<ReqT> job = new StreamJob<ReqT>() {
@Override
public Future<?> start(final ClientCallStreamObserver<ReqT> requestStream) {
public Future<?> start(final ClientCallStateStreamObserver<ReqT> requestStream) {
Runnable runnable = DefaultStreamTask.this.newRunnable(requestStream, latch);
StreamExecutor<ReqT> streamExecutor = streamExecutorFactory.newStreamExecutor();
return streamExecutor.execute(runnable);
Expand All @@ -70,14 +71,14 @@ enum FinishStatus {
ISREADY_ERROR
}

public Runnable newRunnable(final ClientCallStreamObserver<ReqT> requestStream, final CountDownLatch latch) {
public Runnable newRunnable(final ClientCallStateStreamObserver<ReqT> requestStream, final CountDownLatch latch) {
return new NamedRunnable(streamId.toString()) {
@Override
public void run() {
dispatch(requestStream);
}

private void dispatch(ClientCallStreamObserver<ReqT> stream) {
private void dispatch(ClientCallStateStreamObserver<ReqT> stream) {
logger.info("dispatch start {}", this);
FinishStatus status = FinishStatus.UNKNOWN;

Expand All @@ -97,8 +98,7 @@ private void dispatch(ClientCallStreamObserver<ReqT> stream) {
failState.fail();

if (failState.isFailure()) {
logger.info("isReadyState error, Trigger stream.cancel {}", this);
stream.cancel("isReadyState error", new Exception("isReadyState error"));
logger.info("isReadyState error {}", this);
status = FinishStatus.ISREADY_ERROR;
break;
}
Expand All @@ -110,7 +110,9 @@ private void dispatch(ClientCallStreamObserver<ReqT> stream) {
status = FinishStatus.INTERRUPTED;
} catch (Throwable th) {
logger.error("Unexpected DispatchThread error {}/{}", Thread.currentThread().getName(), this, th);
stream.onError(th);
}
if (stream.isRun()) {
StreamUtils.onCompleted(stream, (ex) -> logger.info("stream stop", ex));
}

logger.info("dispatch thread end status:{} {}", status, this);
Expand All @@ -120,17 +122,20 @@ private void dispatch(ClientCallStreamObserver<ReqT> stream) {
};
}


@Override
public void stop() {
logger.info("stop start {}", this.streamId);

if (stop) {
logger.info("already stop {}", this.streamId);
return;
}
this.stop = true;

final ClientCallStreamObserver<ReqT> copy = this.stream;
final ClientCallStateStreamObserver<ReqT> copy = this.stream;
if (copy != null) {
// copy.cancel("stream stop", new Exception("stream stop"));
copy.onCompleted();
if (copy.isRun()) {
StreamUtils.onCompleted(copy, (th) -> logger.info("stream stop", th));
}
}
final CountDownLatch latch = this.latch;
if (latch != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.navercorp.pinpoint.profiler.sender.grpc.stream;

import io.grpc.stub.ClientCallStreamObserver;
import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver;

import java.util.concurrent.Future;

public interface StreamJob<ReqT> {

Future<?> start(final ClientCallStreamObserver<ReqT> requestStream);
Future<?> start(final ClientCallStateStreamObserver<ReqT> requestStream);

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ public NamedRunnable(String name) {

@Override
public String toString() {
return "name='" + name;
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.navercorp.pinpoint.grpc.client.interceptor.DiscardClientInterceptor;
import com.navercorp.pinpoint.grpc.client.interceptor.DiscardEventListener;
import com.navercorp.pinpoint.grpc.client.interceptor.LoggingDiscardEventListener;
import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver;
import com.navercorp.pinpoint.grpc.trace.PAnnotation;
import com.navercorp.pinpoint.grpc.trace.PAnnotationValue;
import com.navercorp.pinpoint.grpc.trace.PSpan;
Expand All @@ -44,7 +45,6 @@
import com.navercorp.pinpoint.profiler.sender.grpc.StreamId;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -136,7 +136,7 @@ private StreamObserver<PSpanMessage> newSpanStream() {
StreamEventListener<PSpanMessage> listener = new StreamEventListener<PSpanMessage>() {

@Override
public void start(ClientCallStreamObserver<PSpanMessage> requestStream) {
public void start(ClientCallStateStreamObserver<PSpanMessage> requestStream) {
spanStreamReconnector.reset();
}

Expand Down

0 comments on commit 89a97e8

Please sign in to comment.