Skip to content

Commit

Permalink
[pinpoint-apm#11481] Fix DirectByteBuffer leak in ActiveThreadCount
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Sep 9, 2024
1 parent 15af83a commit 8f6d852
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.google.protobuf.Empty;
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCountRes;
import com.navercorp.pinpoint.grpc.trace.PCmdStreamResponse;
import io.grpc.stub.ClientResponseObserver;
Expand All @@ -27,7 +28,7 @@
/**
* @author Taejin Koo
*/
public class ActiveThreadCountStreamSocket implements GrpcProfilerStreamSocket<PCmdActiveThreadCountRes.Builder> {
public class ActiveThreadCountStreamSocket implements GrpcProfilerStreamSocket<PCmdActiveThreadCountRes, Empty> {

private final Logger logger = LogManager.getLogger(this.getClass());

Expand All @@ -36,23 +37,27 @@ public class ActiveThreadCountStreamSocket implements GrpcProfilerStreamSocket<P
private final int streamObserverId;
private int sequenceId = 0;

private final PinpointClientResponseObserver<PCmdActiveThreadCountRes> clientResponseObserver;
private final PinpointClientResponseObserver<PCmdActiveThreadCountRes, Empty> clientResponseObserver;

public ActiveThreadCountStreamSocket(int streamObserverId, GrpcStreamService grpcStreamService) {
this.streamObserverId = streamObserverId;
this.grpcStreamService = Objects.requireNonNull(grpcStreamService, "grpcStreamService");
this.clientResponseObserver = new PinpointClientResponseObserver<>(this);
}

@Override
public void send(PCmdActiveThreadCountRes.Builder sendBuilder) {
public PCmdStreamResponse newHeader() {
PCmdStreamResponse.Builder headerResponseBuilder = PCmdStreamResponse.newBuilder();
headerResponseBuilder.setResponseId(streamObserverId);
headerResponseBuilder.setSequenceId(getSequenceId());
sendBuilder.setCommonStreamResponse(headerResponseBuilder.build());
return headerResponseBuilder.build();
}

@Override
public void send(PCmdActiveThreadCountRes activeThreadCount) {
if (clientResponseObserver.isReady()) {
clientResponseObserver.getRequestObserver().onNext(sendBuilder.build());
clientResponseObserver.sendRequest(activeThreadCount);
} else {
logger.info("Send fail. (ActiveThreadCount) client is not ready. streamObserverId:{}", streamObserverId);
}
}

Expand Down Expand Up @@ -86,18 +91,12 @@ public void disconnect(Throwable throwable) {
}

private void close0(Throwable throwable) {
if (clientResponseObserver.isReady()) {
if (throwable == null) {
clientResponseObserver.getRequestObserver().onCompleted();
} else {
clientResponseObserver.getRequestObserver().onError(throwable);
}
}
clientResponseObserver.close(throwable);
grpcStreamService.unregister(this);
}

@Override
public ClientResponseObserver getResponseObserver() {
public ClientResponseObserver<PCmdActiveThreadCountRes, Empty> getResponseObserver() {
return clientResponseObserver;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@

package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.google.protobuf.Empty;
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCountRes;
import com.navercorp.pinpoint.grpc.trace.PCmdRequest;
import com.navercorp.pinpoint.grpc.trace.PCmdStreamResponse;
import com.navercorp.pinpoint.grpc.trace.PCommandType;
import com.navercorp.pinpoint.grpc.trace.ProfilerCommandServiceGrpc;
import com.navercorp.pinpoint.profiler.context.active.ActiveTraceHistogram;
import com.navercorp.pinpoint.profiler.context.active.ActiveTraceHistogramUtils;
import com.navercorp.pinpoint.profiler.context.active.ActiveTraceRepository;
import io.grpc.stub.ClientResponseObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.TimerTask;
Expand Down Expand Up @@ -58,7 +62,8 @@ public short getCommandServiceCode() {
@Override
public void handle(PCmdRequest request, ProfilerCommandServiceGrpc.ProfilerCommandServiceStub profilerCommandServiceStub) {
ActiveThreadCountStreamSocket activeThreadCountStreamSocket = new ActiveThreadCountStreamSocket(request.getRequestId(), grpcStreamService);
profilerCommandServiceStub.commandStreamActiveThreadCount(activeThreadCountStreamSocket.getResponseObserver());
ClientResponseObserver<PCmdActiveThreadCountRes, Empty> responseObserver = activeThreadCountStreamSocket.getResponseObserver();
profilerCommandServiceStub.commandStreamActiveThreadCount(responseObserver);

grpcStreamService.register(activeThreadCountStreamSocket, new ActiveThreadCountTimerTask());
}
Expand Down Expand Up @@ -90,15 +95,21 @@ private class ActiveThreadCountTimerTask extends TimerTask {
@Override
public void run() {
if (isDebug) {
LOGGER.debug("ActiveThreadCountTimerTask started. streamSocketList:{}", grpcStreamService.getStreamSocketList());
LOGGER.debug("ActiveThreadCountTimerTask started. streamSocketList:{}", Arrays.toString(grpcStreamService.getStreamSocketList()));
}

PCmdActiveThreadCountRes.Builder activeThreadCountResponseBuilder = getActiveThreadCountResponse();
for (GrpcProfilerStreamSocket streamSocket : grpcStreamService.getStreamSocketList()) {
if (streamSocket != null) {
for (GrpcProfilerStreamSocket<?, ?> streamSocket : grpcStreamService.getStreamSocketList()) {
if (streamSocket instanceof ActiveThreadCountStreamSocket) {
try {
streamSocket.send(activeThreadCountResponseBuilder);
} catch (Exception e) {
final ActiveThreadCountStreamSocket stream = (ActiveThreadCountStreamSocket) streamSocket;

PCmdStreamResponse header = stream.newHeader();
activeThreadCountResponseBuilder.setCommonStreamResponse(header);
PCmdActiveThreadCountRes activeThreadCount = activeThreadCountResponseBuilder.build();

stream.send(activeThreadCount);
} catch (Throwable e) {
LOGGER.warn("failed to execute ActiveThreadCountTimerTask.run method. streamSocket:{}, message:{}", streamSocket, e.getMessage(), e);
streamSocket.close(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
/**
* @author Taejin Koo
*/
public interface GrpcProfilerStreamSocket<T> {
public interface GrpcProfilerStreamSocket<Req, Res> {

void send(T send);
void send(Req send);

void close();

Expand All @@ -33,6 +33,6 @@ public interface GrpcProfilerStreamSocket<T> {

void disconnect(Throwable throwable);

ClientResponseObserver getResponseObserver();
ClientResponseObserver<Req, Res> getResponseObserver();

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class GrpcStreamService {

private final AtomicReference<TimerTask> currentTaskReference = new AtomicReference<>();

private final List<GrpcProfilerStreamSocket<?>> grpcProfilerStreamSocketList = new CopyOnWriteArrayList<>();
private final List<GrpcProfilerStreamSocket<?, ?>> grpcProfilerStreamSocketList = new CopyOnWriteArrayList<>();

public GrpcStreamService(String name, long flushDelay) {
Objects.requireNonNull(name, "name");
Expand All @@ -50,11 +50,11 @@ public GrpcStreamService(String name, long flushDelay) {
this.flushDelay = flushDelay;
}

public GrpcProfilerStreamSocket<?>[] getStreamSocketList() {
public GrpcProfilerStreamSocket<?, ?>[] getStreamSocketList() {
return grpcProfilerStreamSocketList.toArray(new GrpcProfilerStreamSocket[0]);
}

public boolean register(GrpcProfilerStreamSocket<?> streamSocket, TimerTask timerTask) {
public boolean register(GrpcProfilerStreamSocket<?, ?> streamSocket, TimerTask timerTask) {
synchronized (lock) {
grpcProfilerStreamSocketList.add(streamSocket);
boolean turnOn = currentTaskReference.compareAndSet(null, timerTask);
Expand All @@ -67,7 +67,7 @@ public boolean register(GrpcProfilerStreamSocket<?> streamSocket, TimerTask time
return false;
}

public boolean unregister(GrpcProfilerStreamSocket<?> streamSocket) {
public boolean unregister(GrpcProfilerStreamSocket<?, ?> streamSocket) {
synchronized (lock) {
grpcProfilerStreamSocketList.remove(streamSocket);
// turnoff
Expand All @@ -91,8 +91,8 @@ public void close() {
timer.cancel();
}

GrpcProfilerStreamSocket<?>[] streamSockets = grpcProfilerStreamSocketList.toArray(new GrpcProfilerStreamSocket[0]);
for (GrpcProfilerStreamSocket<?> streamSocket : streamSockets) {
GrpcProfilerStreamSocket<?, ?>[] streamSockets = grpcProfilerStreamSocketList.toArray(new GrpcProfilerStreamSocket[0]);
for (GrpcProfilerStreamSocket<?, ?> streamSocket : streamSockets) {
if (streamSocket != null) {
streamSocket.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.google.protobuf.Empty;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;

Expand All @@ -25,14 +24,14 @@
/**
* @author Taejin Koo
*/
public class PinpointClientResponseObserver<ReqT> implements ClientResponseObserver<ReqT, Empty> {
public class PinpointClientResponseObserver<ReqT, ResT> implements ClientResponseObserver<ReqT, ResT> {

private final GrpcProfilerStreamSocket pinpointGrpcProfilerStreamSocket;
private final GrpcProfilerStreamSocket<ReqT, ResT> socket;

private volatile ClientCallStreamObserver<ReqT> requestStream;

public PinpointClientResponseObserver(GrpcProfilerStreamSocket pinpointGrpcProfilerStreamSocket) {
this.pinpointGrpcProfilerStreamSocket = Objects.requireNonNull(pinpointGrpcProfilerStreamSocket, "pinpointGrpcProfilerStreamSocket");
public PinpointClientResponseObserver(GrpcProfilerStreamSocket<ReqT, ResT> socket) {
this.socket = Objects.requireNonNull(socket, "socket");
}

@Override
Expand All @@ -41,26 +40,48 @@ public void beforeStart(ClientCallStreamObserver<ReqT> requestStream) {
}

@Override
public void onNext(Empty value) {
public void onNext(ResT res) {
// do nothing
}

@Override
public void onError(Throwable t) {
pinpointGrpcProfilerStreamSocket.disconnect(t);
socket.disconnect(t);
}

@Override
public void onCompleted() {
pinpointGrpcProfilerStreamSocket.disconnect();
socket.disconnect();
}

public void sendRequest(ReqT value) {
final ClientCallStreamObserver<ReqT> copy = this.requestStream;
if (copy == null) {
return;
}
copy.onNext(value);
}

public boolean isReady() {
return requestStream != null;
final ClientCallStreamObserver<ReqT> copy = this.requestStream;
if (copy == null) {
return false;
}
return copy.isReady();
}

public ClientCallStreamObserver<ReqT> getRequestObserver() {
return requestStream;

public void close(Throwable throwable) {
final ClientCallStreamObserver<ReqT> copy = requestStream;
if (copy == null) {
return;
}

if (throwable == null) {
copy.onCompleted();
} else {
copy.onError(throwable);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.google.protobuf.Empty;
import io.grpc.stub.ClientCallStreamObserver;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class PinpointClientResponseObserverTest {

@Test
void isReady_true() {
GrpcProfilerStreamSocket<String, Empty> socket = mock(GrpcProfilerStreamSocket.class);
PinpointClientResponseObserver<String, Empty> responseObserver = new PinpointClientResponseObserver<>(socket);

ClientCallStreamObserver<String> requestStream = mock(ClientCallStreamObserver.class);
when(requestStream.isReady()).thenReturn(true);
responseObserver.beforeStart(requestStream);

Assertions.assertTrue(responseObserver.isReady());
}

@Test
void isReady_false() {
GrpcProfilerStreamSocket<String, Empty> socket = mock(GrpcProfilerStreamSocket.class);
PinpointClientResponseObserver<String, Empty> responseObserver = new PinpointClientResponseObserver<>(socket);

Assertions.assertFalse(responseObserver.isReady());

ClientCallStreamObserver<String> requestStream = mock(ClientCallStreamObserver.class);
responseObserver.beforeStart(requestStream);
Assertions.assertFalse(responseObserver.isReady());
}
}

0 comments on commit 8f6d852

Please sign in to comment.