Skip to content

Commit

Permalink
HDDS-11593. Improve container scanner metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jianghuazhu committed Oct 19, 2024
1 parent 86b7aae commit c5264f7
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public abstract class AbstractBackgroundContainerScanner extends Thread {

private final AtomicBoolean stopping;
private final AtomicBoolean pausing = new AtomicBoolean();
private int numUnHealthyEachIteration = 0;
private int numContainersScannedEachIteration = 0;

public AbstractBackgroundContainerScanner(String name,
long dataScanInterval) {
Expand All @@ -55,8 +57,6 @@ public final void run() {
try {
while (!stopping.get()) {
runIteration();
metrics.resetNumContainersScanned();
metrics.resetNumUnhealthyContainers();
}
LOG.info("{} exiting.", this);
} catch (Exception e) {
Expand All @@ -70,6 +70,8 @@ public final void run() {

@VisibleForTesting
public final void runIteration() {
numUnHealthyEachIteration = 0;
numContainersScannedEachIteration = 0;
final boolean paused = pausing.get();
long startTime = System.nanoTime();
if (!paused) {
Expand All @@ -84,6 +86,11 @@ public final void runIteration() {
} else {
AbstractContainerScannerMetrics metrics = getMetrics();
metrics.incNumScanIterations();
metrics.incTotalRunTimes(TimeUnit.NANOSECONDS.toMillis(totalDuration));
metrics.resetNumContainersScanned();
metrics.resetNumUnhealthyContainers();
metrics.incNumContainersScanned(numContainersScannedEachIteration);
metrics.incNumUnHealthyContainers(numUnHealthyEachIteration);
LOG.info("Completed an iteration in {} minutes." +
" Number of iterations (since the data-node restart) : {}" +
", Number of containers scanned in this iteration : {}" +
Expand Down Expand Up @@ -137,6 +144,14 @@ public final void handleRemainingSleep(long remainingSleep) {
}
}

public void incNumUnHealthyEachIteration() {
numUnHealthyEachIteration++;
}

public void incNumContainersScannedEachIteration() {
numContainersScannedEachIteration++;
}

/**
* Shutdown the current container scanning thread.
* If the thread is already being shutdown, the call will block until the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;

/**
Expand All @@ -42,6 +43,8 @@ public abstract class AbstractContainerScannerMetrics {
private MutableGaugeInt numUnHealthyContainers;
@Metric("number of iterations of scanner completed since the restart")
private MutableCounterInt numScanIterations;
@Metric("total time that the container scanned has been running, in milliseconds.")
private MutableCounterLong totalRunTimes;

public AbstractContainerScannerMetrics(String name, MetricsSystem ms) {
this.name = name;
Expand All @@ -56,6 +59,10 @@ public void incNumContainersScanned() {
numContainersScanned.incr();
}

public void incNumContainersScanned(int delta) {
numContainersScanned.incr(delta);
}

public void resetNumContainersScanned() {
numContainersScanned.decr(getNumContainersScanned());
}
Expand All @@ -68,6 +75,10 @@ public void incNumUnHealthyContainers() {
numUnHealthyContainers.incr();
}

public void incNumUnHealthyContainers(int delta) {
numUnHealthyContainers.incr(delta);
}

public void resetNumUnhealthyContainers() {
numUnHealthyContainers.decr(getNumUnHealthyContainers());
}
Expand All @@ -80,6 +91,14 @@ public void incNumScanIterations() {
numScanIterations.incr();
}

public long getTotalRunTimes() {
return (long) totalRunTimes.value();
}

public void incTotalRunTimes(long runTime) {
totalRunTimes.incr(runTime);
}

public void unregister() {
ms.unregisterSource(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ public void scanContainer(Container<?> c)
if (!result.isHealthy()) {
LOG.error("Corruption detected in container [{}]. Marking it UNHEALTHY.",
containerId, result.getException());
metrics.incNumUnHealthyContainers();
incNumUnHealthyEachIteration();
controller.markContainerUnhealthy(containerId, result);
}

metrics.incNumContainersScanned();
incNumContainersScannedEachIteration();
Instant now = Instant.now();
logScanCompleted(containerData, now);
controller.updateDataScanTimestamp(containerId, now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ public void scanContainer(Container<?> container)
if (!result.isHealthy()) {
LOG.error("Corruption detected in container [{}]. Marking it UNHEALTHY.",
containerID, result.getException());
metrics.incNumUnHealthyContainers();
incNumUnHealthyEachIteration();
controller.markContainerUnhealthy(containerID, result);
}

// Do not update the scan timestamp after the scan since this was just a
// metadata scan, not a full data scan.
metrics.incNumContainersScanned();
incNumContainersScannedEachIteration();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ private static void performOnDemandScan(Container<?> container) {
return;
}

long startTime = System.nanoTime();
long containerId = container.getContainerData().getContainerID();
try {
ContainerData containerData = container.getContainerData();
Expand All @@ -144,12 +145,12 @@ private static void performOnDemandScan(Container<?> container) {
if (!result.isHealthy()) {
LOG.error("Corruption detected in container [{}]." +
"Marking it UNHEALTHY.", containerId, result.getException());
instance.metrics.incNumUnHealthyContainers();
getMetrics().incNumUnHealthyContainers();
instance.containerController.markContainerUnhealthy(containerId,
result);
}

instance.metrics.incNumContainersScanned();
getMetrics().incNumContainersScanned();
Instant now = Instant.now();
logScanCompleted(containerData, now);
instance.containerController.updateDataScanTimestamp(containerId, now);
Expand All @@ -160,6 +161,9 @@ private static void performOnDemandScan(Container<?> container) {
// This should only happen as part of shutdown, which will stop the
// ExecutorService.
LOG.info("On demand container scan interrupted.");
} finally {
long totalDuration = System.nanoTime() - startTime;
getMetrics().incTotalRunTimes(TimeUnit.NANOSECONDS.toMillis(totalDuration));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public void testScannerMetrics() {
assertEquals(1, metrics.getNumUnHealthyContainers());
}

@Test
public void testTotalRunTimes() {
scanner.runIteration();
assertTrue(scanner.getMetrics().getTotalRunTimes() > 0);
}

@Test
@Override
public void testScannerMetricsUnregisters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ public void testUnhealthyContainersDetected() throws Exception {
verifyContainerMarkedUnhealthy(openContainer, never());
}

@Test
public void testTotalRunTimes() {
scanner.runIteration();
assertTrue(scanner.getMetrics().getTotalRunTimes() > 0);
}

@Test
@Override
public void testUnhealthyContainerNotRescanned() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,19 @@ public void testUnhealthyContainersDetected() throws Exception {
verifyContainerMarkedUnhealthy(deletedContainer, never());
}

@Test
public void testTotalRunTimes() throws Exception {
Container<?> unhealthy = mockKeyValueContainer();
when(unhealthy.scanMetaData()).thenReturn(ScanResult.healthy());
when(unhealthy.scanData(
any(DataTransferThrottler.class), any(Canceler.class)))
.thenReturn(getUnhealthyScanResult());
scanContainer(unhealthy);
verifyContainerMarkedUnhealthy(unhealthy, atMostOnce());
OnDemandScannerMetrics metrics = OnDemandContainerDataScanner.getMetrics();
assertTrue(metrics.getTotalRunTimes() > 0);
}

/**
* A datanode will have one on-demand scanner thread for the whole process.
* When a volume fails, any the containers queued for scanning in that volume
Expand Down

0 comments on commit c5264f7

Please sign in to comment.