Skip to content

Commit

Permalink
HDDS-10268. [hsync] Add OpenTracing traces to client side read path (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jojochuang authored Apr 10, 2024
1 parent 14f2452 commit cedb459
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import org.apache.hadoop.ozone.OzoneConsts;

import java.util.Map;
Expand All @@ -51,6 +52,11 @@ public final class ContainerClientMetrics {
private MutableCounterLong totalWriteChunkCalls;
@Metric
private MutableCounterLong totalWriteChunkBytes;
private MutableQuantiles[] listBlockLatency;
private MutableQuantiles[] getBlockLatency;
private MutableQuantiles[] getCommittedBlockLengthLatency;
private MutableQuantiles[] readChunkLatency;
private MutableQuantiles[] getSmallFileLatency;
private final Map<PipelineID, MutableCounterLong> writeChunkCallsByPipeline;
private final Map<PipelineID, MutableCounterLong> writeChunkBytesByPipeline;
private final Map<UUID, MutableCounterLong> writeChunksCallsByLeaders;
Expand Down Expand Up @@ -84,6 +90,36 @@ private ContainerClientMetrics() {
writeChunkCallsByPipeline = new ConcurrentHashMap<>();
writeChunkBytesByPipeline = new ConcurrentHashMap<>();
writeChunksCallsByLeaders = new ConcurrentHashMap<>();

listBlockLatency = new MutableQuantiles[3];
getBlockLatency = new MutableQuantiles[3];
getCommittedBlockLengthLatency = new MutableQuantiles[3];
readChunkLatency = new MutableQuantiles[3];
getSmallFileLatency = new MutableQuantiles[3];
int[] intervals = {60, 300, 900};
for (int i = 0; i < intervals.length; i++) {
int interval = intervals[i];
listBlockLatency[i] = registry
.newQuantiles("listBlockLatency" + interval
+ "s", "ListBlock latency in microseconds", "ops",
"latency", interval);
getBlockLatency[i] = registry
.newQuantiles("getBlockLatency" + interval
+ "s", "GetBlock latency in microseconds", "ops",
"latency", interval);
getCommittedBlockLengthLatency[i] = registry
.newQuantiles("getCommittedBlockLengthLatency" + interval
+ "s", "GetCommittedBlockLength latency in microseconds",
"ops", "latency", interval);
readChunkLatency[i] = registry
.newQuantiles("readChunkLatency" + interval
+ "s", "ReadChunk latency in microseconds", "ops",
"latency", interval);
getSmallFileLatency[i] = registry
.newQuantiles("getSmallFileLatency" + interval
+ "s", "GetSmallFile latency in microseconds", "ops",
"latency", interval);
}
}

public void recordWriteChunk(Pipeline pipeline, long chunkSizeBytes) {
Expand Down Expand Up @@ -111,7 +147,48 @@ public void recordWriteChunk(Pipeline pipeline, long chunkSizeBytes) {
totalWriteChunkBytes.incr(chunkSizeBytes);
}

MutableCounterLong getTotalWriteChunkBytes() {
public void addListBlockLatency(long latency) {
for (MutableQuantiles q : listBlockLatency) {
if (q != null) {
q.add(latency);
}
}
}

public void addGetBlockLatency(long latency) {
for (MutableQuantiles q : getBlockLatency) {
if (q != null) {
q.add(latency);
}
}
}

public void addGetCommittedBlockLengthLatency(long latency) {
for (MutableQuantiles q : getCommittedBlockLengthLatency) {
if (q != null) {
q.add(latency);
}
}
}

public void addReadChunkLatency(long latency) {
for (MutableQuantiles q : readChunkLatency) {
if (q != null) {
q.add(latency);
}
}
}

public void addGetSmallFileLatency(long latency) {
for (MutableQuantiles q : getSmallFileLatency) {
if (q != null) {
q.add(latency);
}
}
}

@VisibleForTesting
public MutableCounterLong getTotalWriteChunkBytes() {
return totalWriteChunkBytes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -64,6 +67,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.security.token.Token;
Expand Down Expand Up @@ -128,6 +132,10 @@ public static ListBlockResponseProto listBlock(XceiverClientSpi xceiverClient,
if (token != null) {
builder.setEncodedToken(token.encodeToUrlString());
}
String traceId = TracingUtil.exportCurrentSpan();
if (traceId != null) {
builder.setTraceID(traceId);
}

ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response =
Expand All @@ -146,14 +154,17 @@ static <T> T tryEachDatanode(Pipeline pipeline,
try {
return op.apply(d);
} catch (IOException e) {
Span span = GlobalTracer.get().activeSpan();
if (e instanceof StorageContainerException) {
StorageContainerException sce = (StorageContainerException)e;
// Block token expired. There's no point retrying other DN.
// Throw the exception to request a new block token right away.
if (sce.getResult() == BLOCK_TOKEN_VERIFICATION_FAILED) {
span.log("block token verification failed at DN " + d);
throw e;
}
}
span.log("failed to connect to DN " + d);
excluded.add(d);
if (excluded.size() < pipeline.size()) {
LOG.warn(toErrorMessage.apply(d)
Expand Down Expand Up @@ -211,6 +222,10 @@ private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
List<Validator> validators,
ContainerCommandRequestProto.Builder builder,
DatanodeDetails datanode) throws IOException {
String traceId = TracingUtil.exportCurrentSpan();
if (traceId != null) {
builder.setTraceID(traceId);
}
final ContainerCommandRequestProto request = builder
.setDatanodeUuid(datanode.getUuidString()).build();
ContainerCommandResponseProto response =
Expand Down Expand Up @@ -246,6 +261,10 @@ private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
if (token != null) {
builder.setEncodedToken(token.encodeToUrlString());
}
String traceId = TracingUtil.exportCurrentSpan();
if (traceId != null) {
builder.setTraceID(traceId);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response =
xceiverClient.sendCommand(request, getValidatorList());
Expand Down Expand Up @@ -319,21 +338,35 @@ public static ContainerProtos.ReadChunkResponseProto readChunk(
builder.setEncodedToken(token.encodeToUrlString());
}

return tryEachDatanode(xceiverClient.getPipeline(),
d -> readChunk(xceiverClient, chunk, blockID,
validators, builder, d),
d -> toErrorMessage(chunk, blockID, d));
Span span = GlobalTracer.get()
.buildSpan("readChunk").start();
try (Scope ignored = GlobalTracer.get().activateSpan(span)) {
span.setTag("offset", chunk.getOffset())
.setTag("length", chunk.getLen())
.setTag("block", blockID.toString());
return tryEachDatanode(xceiverClient.getPipeline(),
d -> readChunk(xceiverClient, chunk, blockID,
validators, builder, d),
d -> toErrorMessage(chunk, blockID, d));
} finally {
span.finish();
}
}

private static ContainerProtos.ReadChunkResponseProto readChunk(
XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
List<Validator> validators,
ContainerCommandRequestProto.Builder builder,
DatanodeDetails d) throws IOException {
final ContainerCommandRequestProto request = builder
.setDatanodeUuid(d.getUuidString()).build();
ContainerCommandRequestProto.Builder requestBuilder = builder
.setDatanodeUuid(d.getUuidString());
Span span = GlobalTracer.get().activeSpan();
String traceId = TracingUtil.exportSpan(span);
if (traceId != null) {
requestBuilder = requestBuilder.setTraceID(traceId);
}
ContainerCommandResponseProto reply =
xceiverClient.sendCommand(request, validators);
xceiverClient.sendCommand(requestBuilder.build(), validators);
final ReadChunkResponseProto response = reply.getReadChunk();
final long readLen = getLen(response);
if (readLen != chunk.getLen()) {
Expand Down Expand Up @@ -515,6 +548,11 @@ public static void createContainer(XceiverClientSpi client,
if (encodedToken != null) {
request.setEncodedToken(encodedToken);
}
String traceId = TracingUtil.exportCurrentSpan();
if (traceId != null) {
request.setTraceID(traceId);
}

request.setCmdType(ContainerProtos.Type.CreateContainer);
request.setContainerID(containerID);
request.setCreateContainer(createRequest.build());
Expand Down Expand Up @@ -544,6 +582,10 @@ public static void deleteContainer(XceiverClientSpi client, long containerID,
if (encodedToken != null) {
request.setEncodedToken(encodedToken);
}
String traceId = TracingUtil.exportCurrentSpan();
if (traceId != null) {
request.setTraceID(traceId);
}
client.sendCommand(request.build(), getValidatorList());
}

Expand All @@ -566,6 +608,10 @@ public static void closeContainer(XceiverClientSpi client,
if (encodedToken != null) {
request.setEncodedToken(encodedToken);
}
String traceId = TracingUtil.exportCurrentSpan();
if (traceId != null) {
request.setTraceID(traceId);
}
client.sendCommand(request.build(), getValidatorList());
}

Expand All @@ -589,6 +635,10 @@ public static ReadContainerResponseProto readContainer(
if (encodedToken != null) {
request.setEncodedToken(encodedToken);
}
String traceId = TracingUtil.exportCurrentSpan();
if (traceId != null) {
request.setTraceID(traceId);
}
ContainerCommandResponseProto response =
client.sendCommand(request.build(), getValidatorList());

Expand Down Expand Up @@ -624,6 +674,10 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client,
if (token != null) {
builder.setEncodedToken(token.encodeToUrlString());
}
String traceId = TracingUtil.exportCurrentSpan();
if (traceId != null) {
builder.setTraceID(traceId);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response =
client.sendCommand(request, getValidatorList());
Expand Down Expand Up @@ -694,6 +748,10 @@ public static List<Validator> toValidatorList(Validator validator) {
if (token != null) {
builder.setEncodedToken(token.encodeToUrlString());
}
String traceId = TracingUtil.exportCurrentSpan();
if (traceId != null) {
builder.setTraceID(traceId);
}
ContainerCommandRequestProto request = builder.build();
Map<DatanodeDetails, ContainerCommandResponseProto> responses =
xceiverClient.sendCommandOnAllNodes(request);
Expand All @@ -719,6 +777,10 @@ public static List<Validator> toValidatorList(Validator validator) {
if (encodedToken != null) {
request.setEncodedToken(encodedToken);
}
String traceId = TracingUtil.exportCurrentSpan();
if (traceId != null) {
request.setTraceID(traceId);
}
Map<DatanodeDetails, ContainerCommandResponseProto> responses =
client.sendCommandOnAllNodes(request.build());
for (Map.Entry<DatanodeDetails, ContainerCommandResponseProto> entry :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,16 @@ public static boolean isTracingEnabled(
ScmConfigKeys.HDDS_TRACING_ENABLED_DEFAULT);
}

/**
* Execute {@code runnable} inside an activated new span.
*/
public static <E extends Exception> void executeInNewSpan(String spanName,
CheckedRunnable<E> runnable) throws E {
Span span = GlobalTracer.get()
.buildSpan(spanName).start();
executeInSpan(span, runnable);
}

/**
* Execute {@code supplier} inside an activated new span.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ public void onRemoval(
this.blockInputStreamFactory = BlockInputStreamFactoryImpl
.getInstance(byteBufferPool, ecReconstructExecutor);
this.clientMetrics = ContainerClientMetrics.acquire();

TracingUtil.initTracing("client", conf);
}

public XceiverClientFactory getXceiverClientManager() {
Expand Down
Loading

0 comments on commit cedb459

Please sign in to comment.