Skip to content

Commit

Permalink
[#11497] Refactor ActiveThreadCountStreamSocket
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Oct 10, 2024
1 parent 336e81e commit 3a4325d
Show file tree
Hide file tree
Showing 15 changed files with 530 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.navercorp.pinpoint.profiler.receiver.grpc.GrpcActiveThreadDumpService;
import com.navercorp.pinpoint.profiler.receiver.grpc.GrpcActiveThreadLightDumpService;
import com.navercorp.pinpoint.profiler.receiver.grpc.GrpcEchoService;
import com.navercorp.pinpoint.profiler.receiver.grpc.GrpcStreamService;
import com.navercorp.pinpoint.profiler.sender.grpc.AgentGrpcDataSender;
import com.navercorp.pinpoint.profiler.sender.grpc.ReconnectExecutor;
import io.grpc.ClientInterceptor;
Expand Down Expand Up @@ -166,12 +167,16 @@ private ProfilerCommandServiceLocator createProfilerCommandServiceLocator(Active

profilerCommandLocatorBuilder.addService(new GrpcEchoService());
if (activeTraceRepository != null) {
profilerCommandLocatorBuilder.addService(new GrpcActiveThreadCountService(activeTraceRepository));
GrpcActiveThreadCountService grpcActiveThreadCountService = newActiveThreadCountService(activeTraceRepository);
profilerCommandLocatorBuilder.addService(grpcActiveThreadCountService);
profilerCommandLocatorBuilder.addService(new GrpcActiveThreadDumpService(activeTraceRepository, threadDumpMapper));
profilerCommandLocatorBuilder.addService(new GrpcActiveThreadLightDumpService(activeTraceRepository, threadDumpMapper));
}
return profilerCommandLocatorBuilder.build();
}

final ProfilerCommandServiceLocator commandServiceLocator = profilerCommandLocatorBuilder.build();
return commandServiceLocator;
private GrpcActiveThreadCountService newActiveThreadCountService(ActiveTraceRepository activeTraceRepository) {
GrpcStreamService grpcStreamService = new GrpcStreamService("ActiveThreadCountService", GrpcStreamService.DEFAULT_FLUSH_DELAY, activeTraceRepository);
return new GrpcActiveThreadCountService(grpcStreamService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,48 @@
package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.google.protobuf.Empty;
import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver;
import com.navercorp.pinpoint.grpc.stream.StreamUtils;
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCountRes;
import com.navercorp.pinpoint.grpc.trace.PCmdStreamResponse;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import javax.annotation.Nullable;
import java.util.Objects;

/**
* @author Taejin Koo
*/
public class ActiveThreadCountStreamSocket implements GrpcProfilerStreamSocket<PCmdActiveThreadCountRes, Empty> {
public class ActiveThreadCountStreamSocket implements GrpcProfilerStreamSocket<PCmdActiveThreadCountRes>, ClientResponseObserver<PCmdActiveThreadCountRes, Empty> {

private final Logger logger = LogManager.getLogger(this.getClass());

private final GrpcStreamService grpcStreamService;

private final int socketId;
private final int streamObserverId;

private int sequenceId = 0;

private final PinpointClientResponseObserver<PCmdActiveThreadCountRes, Empty> clientResponseObserver;
private ClientCallStateStreamObserver<PCmdActiveThreadCountRes> requestStream;
private volatile boolean closed = false;

public ActiveThreadCountStreamSocket(int streamObserverId, GrpcStreamService grpcStreamService) {
public ActiveThreadCountStreamSocket(int socketId, int streamObserverId,
GrpcStreamService grpcStreamService) {
this.socketId = socketId;
this.streamObserverId = streamObserverId;
this.grpcStreamService = Objects.requireNonNull(grpcStreamService, "grpcStreamService");
this.clientResponseObserver = new PinpointClientResponseObserver<>(this);
}

@Override
public void beforeStart(ClientCallStreamObserver<PCmdActiveThreadCountRes> requestStream) {
this.requestStream = ClientCallStateStreamObserver.clientCall(requestStream);
}

public PCmdStreamResponse newHeader() {
Expand All @@ -54,13 +68,20 @@ public PCmdStreamResponse newHeader() {
return headerResponseBuilder.build();
}


@Override
public void send(PCmdActiveThreadCountRes activeThreadCount) {
if (clientResponseObserver.isReady()) {
clientResponseObserver.sendRequest(activeThreadCount);
public boolean send(PCmdActiveThreadCountRes activeThreadCount) {
if (closed) {
return false;

Check warning on line 75 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java#L75

Added line #L75 was not covered by tests
}
final CallStreamObserver<PCmdActiveThreadCountRes> request = this.requestStream;
if (request.isReady()) {
request.onNext(activeThreadCount);
return true;
} else {
logger.info("Send fail. (ActiveThreadCount) client is not ready. streamObserverId:{}", streamObserverId);
logger.info("Send fail. (ActiveThreadCount) client is not ready. socketId:{} streamObserverId:{}", socketId, streamObserverId);
}
return false;
}

private int getSequenceId() {
Expand All @@ -69,45 +90,62 @@ private int getSequenceId() {

@Override
public void close() {
logger.info("close");
close0(null);
close(null);

Check warning on line 93 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java#L93

Added line #L93 was not covered by tests
}

@Override
public void close(Throwable throwable) {
public void close(@Nullable Throwable throwable) {
if (closed) {
return;

Check warning on line 99 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java#L99

Added line #L99 was not covered by tests
}
logger.warn("close", throwable);
close0(throwable);
dispose();

StreamUtils.onCompleted(requestStream, (th) -> logger.info("close", th));
}

public boolean isClosed() {
return closed;

Check warning on line 108 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java#L108

Added line #L108 was not covered by tests
}


private void dispose() {
this.closed = true;
grpcStreamService.unregister(this);
}

@Override
public void disconnect() {
logger.info("disconnect");
close0(null);
public void onNext(Empty empty) {
logger.debug("onNext {}", empty);

Check warning on line 119 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java#L119

Added line #L119 was not covered by tests
}

@Override
public void disconnect(Throwable throwable) {
public void onError(Throwable throwable) {
Status status = Status.fromThrowable(throwable);
Metadata metadata = Status.trailersFromThrowable(throwable);
logger.info("disconnect. {} {}", status, metadata);
close0(throwable);
}
logger.info("onError {}. {} {}", this, status, metadata);

Check warning on line 126 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java#L126

Added line #L126 was not covered by tests

private void close0(Throwable throwable) {
clientResponseObserver.close(throwable);
grpcStreamService.unregister(this);
this.dispose();

Check warning on line 128 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java#L128

Added line #L128 was not covered by tests
if (requestStream.isRun()) {
StreamUtils.onCompleted(requestStream, (th) -> logger.info("onError", th));

Check warning on line 130 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java#L130

Added line #L130 was not covered by tests
}
}

@Override
public ClientResponseObserver<PCmdActiveThreadCountRes, Empty> getResponseObserver() {
return clientResponseObserver;
public void onCompleted() {
logger.info("onCompleted {}", this);

Check warning on line 136 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java#L136

Added line #L136 was not covered by tests

this.dispose();

Check warning on line 138 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java#L138

Added line #L138 was not covered by tests
if (requestStream.isRun()) {
StreamUtils.onCompleted(requestStream, (th) -> logger.info("onCompleted", th));

Check warning on line 140 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java#L140

Added line #L140 was not covered by tests
}
}

@Override
public String toString() {
return "ActiveThreadCountStreamSocket{" +
"streamObserverId=" + streamObserverId +
"socketId=" + socketId +
", streamObserverId=" + streamObserverId +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,29 @@

package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.google.protobuf.Empty;
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCountRes;
import com.navercorp.pinpoint.grpc.trace.PCmdRequest;
import com.navercorp.pinpoint.grpc.trace.PCmdStreamResponse;
import com.navercorp.pinpoint.grpc.trace.PCommandType;
import com.navercorp.pinpoint.grpc.trace.ProfilerCommandServiceGrpc;
import com.navercorp.pinpoint.profiler.context.active.ActiveTraceHistogram;
import com.navercorp.pinpoint.profiler.context.active.ActiveTraceRepository;
import io.grpc.stub.ClientResponseObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author Taejin Koo
*/
public class GrpcActiveThreadCountService implements ProfilerGrpcCommandService, Closeable {

private static final long DEFAULT_FLUSH_DELAY = 1000;

private final Logger logger = LogManager.getLogger(getClass());
private final boolean isDebug = logger.isDebugEnabled();

private final ActiveTraceRepository activeTraceRepository;
private final AtomicInteger sequence = new AtomicInteger(0);

private final GrpcStreamService grpcStreamService = new GrpcStreamService("ActiveThreadCountService", DEFAULT_FLUSH_DELAY);
private final GrpcStreamService grpcStreamService ;

public GrpcActiveThreadCountService(ActiveTraceRepository activeTraceRepository) {
this.activeTraceRepository = Objects.requireNonNull(activeTraceRepository, "activeTraceRepository");
public GrpcActiveThreadCountService(GrpcStreamService grpcStreamService) {
this.grpcStreamService = Objects.requireNonNull(grpcStreamService, "grpcStreamService");
}

@Override
Expand All @@ -59,66 +47,18 @@ public short getCommandServiceCode() {
}

@Override
public void handle(PCmdRequest request, ProfilerCommandServiceGrpc.ProfilerCommandServiceStub profilerCommandServiceStub) {
ActiveThreadCountStreamSocket activeThreadCountStreamSocket = new ActiveThreadCountStreamSocket(request.getRequestId(), grpcStreamService);
ClientResponseObserver<PCmdActiveThreadCountRes, Empty> responseObserver = activeThreadCountStreamSocket.getResponseObserver();
profilerCommandServiceStub.commandStreamActiveThreadCount(responseObserver);
public void handle(PCmdRequest request, ProfilerCommandServiceGrpc.ProfilerCommandServiceStub commandServiceStub) {
ActiveThreadCountStreamSocket socket = new ActiveThreadCountStreamSocket(sequence.getAndIncrement(), request.getRequestId(), grpcStreamService);
commandServiceStub.commandStreamActiveThreadCount(socket);

Check warning on line 52 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java#L51-L52

Added lines #L51 - L52 were not covered by tests

grpcStreamService.register(activeThreadCountStreamSocket, new ActiveThreadCountTimerTask());
grpcStreamService.register(socket);

Check warning on line 54 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java#L54

Added line #L54 was not covered by tests
}

private PCmdActiveThreadCountRes.Builder getActiveThreadCountResponse() {
final long currentTime = System.currentTimeMillis();
final ActiveTraceHistogram histogram = activeTraceRepository.getActiveTraceHistogram(currentTime);

PCmdActiveThreadCountRes.Builder responseBuilder = PCmdActiveThreadCountRes.newBuilder();
responseBuilder.setTimeStamp(currentTime);
responseBuilder.setHistogramSchemaType(histogram.getHistogramSchema().getTypeCode());

final List<Integer> activeTraceCountList = histogram.getCounter();
for (Integer activeTraceCount : activeTraceCountList) {
responseBuilder.addActiveThreadCount(activeTraceCount);
}

return responseBuilder;
}

@Override
public void close() throws IOException {
logger.info("close");
grpcStreamService.close();
}

private class ActiveThreadCountTimerTask extends TimerTask {

@Override
public void run() {
if (isDebug) {
logger.debug("ActiveThreadCountTimerTask started. streamSocketList:{}", Arrays.toString(grpcStreamService.getStreamSocketList()));
}

PCmdActiveThreadCountRes.Builder activeThreadCountResponseBuilder = getActiveThreadCountResponse();
for (GrpcProfilerStreamSocket<?, ?> streamSocket : grpcStreamService.getStreamSocketList()) {
if (streamSocket instanceof ActiveThreadCountStreamSocket) {
try {
final ActiveThreadCountStreamSocket stream = (ActiveThreadCountStreamSocket) streamSocket;

PCmdStreamResponse header = stream.newHeader();
activeThreadCountResponseBuilder.setCommonStreamResponse(header);
PCmdActiveThreadCountRes activeThreadCount = activeThreadCountResponseBuilder.build();

stream.send(activeThreadCount);
if (isDebug) {
logger.debug("ActiveThreadCountStreamSocket. {}", stream);
}
} catch (Throwable e) {
logger.warn("failed to execute ActiveThreadCountTimerTask.run method. streamSocket:{}, message:{}", streamSocket, e.getMessage(), e);
streamSocket.close(e);
}
}
}
}

}

}
Loading

0 comments on commit 3a4325d

Please sign in to comment.