Skip to content

Commit

Permalink
[#noissue] Improve reuse of CommandStub
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Oct 15, 2024
1 parent 5240a1a commit 68e3525
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 53 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.navercorp.pinpoint.profiler.receiver.ProfilerCommandServiceLocator;
import com.navercorp.pinpoint.profiler.sender.grpc.ReconnectExecutor;
import com.navercorp.pinpoint.profiler.sender.grpc.Reconnector;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.stub.ClientCallStreamObserver;
Expand All @@ -34,6 +35,7 @@
import org.apache.logging.log4j.Logger;

import java.util.Objects;
import java.util.Set;

/**
* @author Taejin Koo
Expand All @@ -43,7 +45,9 @@ public class GrpcCommandService {
private final Logger logger = LogManager.getLogger(this.getClass());
private final boolean isDebug = logger.isDebugEnabled();

private final CommandServiceStubFactory commandServiceStubFactory;
private final ProfilerCommandServiceGrpc.ProfilerCommandServiceStub commandDispatchStub;
// Requires commandServiceCodes
private final ProfilerCommandServiceGrpc.ProfilerCommandServiceStub commandChannelStub;
private final ProfilerCommandServiceLocator profilerCommandServiceLocator;

private final Reconnector reconnector;
Expand All @@ -52,10 +56,12 @@ public class GrpcCommandService {

private volatile CommandServiceMainStreamObserver commandServiceMainStreamObserver;

public GrpcCommandService(CommandServiceStubFactory commandServiceStubFactory, ReconnectExecutor reconnectScheduler, ProfilerCommandServiceLocator profilerCommandServiceLocator) {
this.commandServiceStubFactory = Objects.requireNonNull(commandServiceStubFactory, "commandServiceStubFactory");
Objects.requireNonNull(reconnectScheduler, "reconnectScheduler");
public GrpcCommandService(ManagedChannel channel, ReconnectExecutor reconnectScheduler, ProfilerCommandServiceLocator profilerCommandServiceLocator) {
Objects.requireNonNull(channel, "channel");
this.commandDispatchStub = ProfilerCommandServiceGrpc.newStub(channel);
this.commandChannelStub = newCommandChannelStub(commandDispatchStub, profilerCommandServiceLocator);

Objects.requireNonNull(reconnectScheduler, "reconnectScheduler");
this.profilerCommandServiceLocator = Objects.requireNonNull(profilerCommandServiceLocator, "profilerCommandServiceLocator");

this.reconnector = reconnectScheduler.newReconnector(new Runnable() {
Expand All @@ -68,27 +74,28 @@ public void run() {
connect();
}

private ProfilerCommandServiceGrpc.ProfilerCommandServiceStub newCommandChannelStub(ProfilerCommandServiceGrpc.ProfilerCommandServiceStub stub,
ProfilerCommandServiceLocator profilerCommandServiceLocator) {
Set<Short> commandServiceCodes = profilerCommandServiceLocator.getCommandServiceCodes();
final SupportCommandCodeClientInterceptor interceptor = new SupportCommandCodeClientInterceptor(commandServiceCodes);
return stub.withInterceptors(interceptor);
}

private void connect() {
if (shutdown) {
logger.info("Already shutdown");
return;
}
logger.info("Attempt to connect to CommandServiceStream");
ProfilerCommandServiceGrpc.ProfilerCommandServiceStub profilerCommandServiceStub = newCommandServiceStub(commandServiceStubFactory, profilerCommandServiceLocator);
GrpcCommandDispatcher commandDispatcher = new GrpcCommandDispatcher(profilerCommandServiceStub, profilerCommandServiceLocator);
GrpcCommandDispatcher commandDispatcher = new GrpcCommandDispatcher(commandDispatchStub, profilerCommandServiceLocator);

CommandServiceMainStreamObserver commandServiceMainStreamObserver = new CommandServiceMainStreamObserver(commandDispatcher);
profilerCommandServiceStub.handleCommandV2(commandServiceMainStreamObserver);
// Requires commandServiceCodes
commandChannelStub.handleCommandV2(commandServiceMainStreamObserver);

this.commandServiceMainStreamObserver = commandServiceMainStreamObserver;
}

private ProfilerCommandServiceGrpc.ProfilerCommandServiceStub newCommandServiceStub(CommandServiceStubFactory commandServiceStubFactory, ProfilerCommandServiceLocator profilerCommandServiceLocator) {
final ProfilerCommandServiceGrpc.ProfilerCommandServiceStub commandServiceStub = commandServiceStubFactory.newStub();

final SupportCommandCodeClientInterceptor supportCommandCodeClientInterceptor = new SupportCommandCodeClientInterceptor(profilerCommandServiceLocator.getCommandServiceCodes());
return commandServiceStub.withInterceptors(supportCommandCodeClientInterceptor);
}

private void reserveReconnect() {
reconnector.reconnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.navercorp.pinpoint.grpc.trace.PResult;
import com.navercorp.pinpoint.profiler.metadata.MetaDataType;
import com.navercorp.pinpoint.profiler.receiver.ProfilerCommandServiceLocator;
import com.navercorp.pinpoint.profiler.receiver.grpc.CommandServiceStubFactory;
import com.navercorp.pinpoint.profiler.receiver.grpc.GrpcCommandService;

import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -58,8 +57,8 @@ public AgentGrpcDataSender(String host, int port, int executorQueueSize,
this.agentPingStub = newAgentPingStub();

this.reconnectExecutor = reconnectExecutor;
CommandServiceStubFactory commandServiceStubFactory = new CommandServiceStubFactory(managedChannel);
this.grpcCommandService = new GrpcCommandService(commandServiceStubFactory, reconnectExecutor, profilerCommandServiceLocator);

this.grpcCommandService = new GrpcCommandService(managedChannel, reconnectExecutor, profilerCommandServiceLocator);
{
final Runnable reconnectJob = new Runnable() {
@Override
Expand Down

0 comments on commit 68e3525

Please sign in to comment.