Skip to content

Commit

Permalink
HDDS-9819. Recon - Potential memory overflow in Container Health Task. (
Browse files Browse the repository at this point in the history
  • Loading branch information
devmadhuu authored Jan 14, 2024
1 parent b932e16 commit 1398f58
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public void init() throws Exception {
taskConfig.setMissingContainerTaskInterval(Duration.ofSeconds(15));
conf.setFromObject(taskConfig);

conf.set("ozone.scm.stale.node.interval", "10s");
conf.set("ozone.scm.dead.node.interval", "20s");
conf.set("ozone.scm.stale.node.interval", "6s");
conf.set("ozone.scm.dead.node.interval", "10s");
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1)
.includeRecon(true).build();
cluster.waitForClusterToBeReady();
Expand Down Expand Up @@ -102,9 +102,6 @@ public void testSyncSCMContainerInfo() throws Exception {
final ContainerInfo container2 = scmContainerManager.allocateContainer(
RatisReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE), "admin");
reconContainerManager.allocateContainer(
RatisReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE), "admin");
scmContainerManager.updateContainerState(container1.containerID(),
HddsProtos.LifeCycleEvent.FINALIZE);
scmContainerManager.updateContainerState(container2.containerID(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_COUNT;
import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_FETCH_COUNT;
import static org.apache.hadoop.ozone.recon.ReconConstants.TOTAL_KEYS;
import static org.apache.hadoop.ozone.recon.ReconConstants.TOTAL_USED_BYTES;

Expand All @@ -65,6 +66,7 @@ public class ContainerHealthTask extends ReconScmTask {

private static final Logger LOG =
LoggerFactory.getLogger(ContainerHealthTask.class);
public static final int FETCH_COUNT = Integer.parseInt(DEFAULT_FETCH_COUNT);

private ReadWriteLock lock = new ReentrantReadWriteLock(true);

Expand Down Expand Up @@ -131,8 +133,24 @@ public void triggerContainerHealthCheck() {
LOG.info("Container Health task thread took {} milliseconds to" +
" process {} existing database records.",
Time.monotonicNow() - start, existingCount);

checkAndProcessContainers(unhealthyContainerStateStatsMap, currentTime);
processedContainers.clear();
} finally {
lock.writeLock().unlock();
}
}

private void checkAndProcessContainers(
Map<UnHealthyContainerStates, Map<String, Long>>
unhealthyContainerStateStatsMap, long currentTime) {
ContainerID startID = ContainerID.valueOf(1);
List<ContainerInfo> containers = containerManager.getContainers(startID,
FETCH_COUNT);
long start;
long iterationCount = 0;
while (!containers.isEmpty()) {
start = Time.monotonicNow();
final List<ContainerInfo> containers = containerManager.getContainers();
containers.stream()
.filter(c -> !processedContainers.contains(c))
.forEach(c -> processContainer(c, currentTime,
Expand All @@ -142,10 +160,19 @@ public void triggerContainerHealthCheck() {
" processing {} containers.", Time.monotonicNow() - start,
containers.size());
logUnhealthyContainerStats(unhealthyContainerStateStatsMap);
processedContainers.clear();
} finally {
lock.writeLock().unlock();
if (containers.size() >= FETCH_COUNT) {
startID = ContainerID.valueOf(
containers.get(containers.size() - 1).getContainerID() + 1);
containers = containerManager.getContainers(startID, FETCH_COUNT);
} else {
containers.clear();
}
iterationCount++;
}
LOG.info(
"Container Health task thread took {} iterations to fetch all " +
"containers using batched approach with batch size of {}",
iterationCount, FETCH_COUNT);
}

private void logUnhealthyContainerStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

package org.apache.hadoop.ozone.recon.fsck;

import static org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates.ALL_REPLICAS_UNHEALTHY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates.ALL_REPLICAS_UNHEALTHY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -96,7 +98,8 @@ public void testRun() throws Exception {
List<ContainerInfo> mockContainers = getMockContainers(7);
when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock);
when(scmMock.getContainerManager()).thenReturn(containerManagerMock);
when(containerManagerMock.getContainers()).thenReturn(mockContainers);
when(containerManagerMock.getContainers(any(ContainerID.class),
anyInt())).thenReturn(mockContainers);
for (ContainerInfo c : mockContainers) {
when(containerManagerMock.getContainer(c.containerID())).thenReturn(c);
when(scmClientMock.getContainerWithPipeline(c.getContainerID()))
Expand Down Expand Up @@ -151,7 +154,7 @@ public void testRun() throws Exception {
reconTaskStatusDao, containerHealthSchemaManager,
placementMock, reconTaskConfig, reconContainerMetadataManager);
containerHealthTask.start();
LambdaTestUtils.await(6000, 1000, () ->
LambdaTestUtils.await(60000, 1000, () ->
(unHealthyContainersTableHandle.count() == 6));
UnhealthyContainers rec =
unHealthyContainersTableHandle.fetchByContainerId(1L).get(0);
Expand Down Expand Up @@ -192,7 +195,8 @@ public void testRun() throws Exception {

ReconTaskStatus taskStatus =
reconTaskStatusDao.findById(containerHealthTask.getTaskName());
assertThat(taskStatus.getLastUpdatedTimestamp()).isGreaterThan(currentTime);
assertThat(taskStatus.getLastUpdatedTimestamp())
.isGreaterThan(currentTime);

// Now run the job again, to check that relevant records are updated or
// removed as appropriate. Need to adjust the return value for all the mocks
Expand Down Expand Up @@ -267,7 +271,8 @@ public void testDeletedContainer() throws Exception {
List<ContainerInfo> mockContainers = getMockContainers(3);
when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock);
when(scmMock.getContainerManager()).thenReturn(containerManagerMock);
when(containerManagerMock.getContainers()).thenReturn(mockContainers);
when(containerManagerMock.getContainers(any(ContainerID.class),
anyInt())).thenReturn(mockContainers);
for (ContainerInfo c : mockContainers) {
when(containerManagerMock.getContainer(c.containerID())).thenReturn(c);
when(scmClientMock.getContainerWithPipeline(c.getContainerID()))
Expand Down Expand Up @@ -327,7 +332,8 @@ public void testDeletedContainer() throws Exception {

ReconTaskStatus taskStatus =
reconTaskStatusDao.findById(containerHealthTask.getTaskName());
assertThat(taskStatus.getLastUpdatedTimestamp()).isGreaterThan(currentTime);
assertThat(taskStatus.getLastUpdatedTimestamp())
.isGreaterThan(currentTime);
}

private Set<ContainerReplica> getMockReplicas(
Expand Down

0 comments on commit 1398f58

Please sign in to comment.