Skip to content

Commit

Permalink
fix:fix job-master leak memory when submitted a large number of DIST_…
Browse files Browse the repository at this point in the history
…LOAD jobs
  • Loading branch information
liuziqi committed Jun 27, 2024
1 parent e939a39 commit c176c77
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 2 deletions.
8 changes: 8 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -7357,6 +7357,12 @@ public String toString() {
.setDefaultValue("60sec")
.setScope(Scope.MASTER)
.build();
public static final PropertyKey JOB_MASTER_JOB_TRACE_RETENTION_TIME =
durationBuilder(Name.JOB_MASTER_JOB_TRACE_RETENTION_TIME)
.setDescription("The length of time the client can trace the submitted job.")
.setDefaultValue("1d")
.setScope(Scope.MASTER)
.build();
public static final PropertyKey JOB_MASTER_JOB_CAPACITY =
longBuilder(Name.JOB_MASTER_JOB_CAPACITY)
.setDescription("The total possible number of available job statuses in the job master. "
Expand Down Expand Up @@ -9199,6 +9205,8 @@ public static final class Name {
"alluxio.job.master.finished.job.purge.count";
public static final String JOB_MASTER_FINISHED_JOB_RETENTION_TIME =
"alluxio.job.master.finished.job.retention.time";
public static final String JOB_MASTER_JOB_TRACE_RETENTION_TIME =
"alluxio.job.master.job.trace.retention.time";
public static final String JOB_MASTER_JOB_CAPACITY = "alluxio.job.master.job.capacity";
public static final String JOB_MASTER_MASTER_HEARTBEAT_INTERVAL =
"alluxio.job.master.master.heartbeat.interval";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

import alluxio.AlluxioURI;
import alluxio.client.file.FileSystemContext;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.JobDoesNotExistException;
import alluxio.job.CmdConfig;
Expand All @@ -36,14 +38,17 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;

/**
* CmdJobTracker to schedule a Cmd job to run.
*/
@ThreadSafe
public class CmdJobTracker {
public class CmdJobTracker implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(CmdJobTracker.class);
private final Map<Long, CmdInfo> mInfoMap = new ConcurrentHashMap<>(0, 0.95f,
Math.max(8, 2 * Runtime.getRuntime().availableProcessors()));
Expand All @@ -52,6 +57,8 @@ public class CmdJobTracker {
private final PersistRunner mPersistRunner;
protected FileSystemContext mFsContext;
public static final String DELIMITER = ",";
private final ScheduledExecutorService mScheduleCleanExecutor;
private static Long sTraceRetentionTime;

/**
* Create a new instance of {@link CmdJobTracker}.
Expand All @@ -64,6 +71,11 @@ public CmdJobTracker(FileSystemContext fsContext,
mDistLoadCliRunner = new DistLoadCliRunner(mFsContext, jobMaster);
mMigrateCliRunner = new MigrateCliRunner(mFsContext, jobMaster);
mPersistRunner = new PersistRunner(mFsContext, jobMaster);
mScheduleCleanExecutor = Executors.newSingleThreadScheduledExecutor();
mScheduleCleanExecutor.scheduleAtFixedRate(this::
cleanUnableTracedJobs, 60, 600, TimeUnit.SECONDS);
sTraceRetentionTime = Configuration.getMs(
PropertyKey.JOB_MASTER_JOB_TRACE_RETENTION_TIME);
}

/**
Expand All @@ -72,15 +84,22 @@ public CmdJobTracker(FileSystemContext fsContext,
* @param distLoadCliRunner DistributedLoad runner
* @param migrateCliRunner DistributedCopy runner
* @param persistRunner Persist runner
* @param retentionTime job retention time
*/
public CmdJobTracker(FileSystemContext fsContext,
DistLoadCliRunner distLoadCliRunner,
MigrateCliRunner migrateCliRunner,
PersistRunner persistRunner) {
PersistRunner persistRunner,
Long retentionTime
) {
mFsContext = fsContext;
mDistLoadCliRunner = distLoadCliRunner;
mMigrateCliRunner = migrateCliRunner;
mPersistRunner = persistRunner;
mScheduleCleanExecutor = Executors.newSingleThreadScheduledExecutor();
mScheduleCleanExecutor.scheduleAtFixedRate(this::
cleanUnableTracedJobs, 60, 600, TimeUnit.SECONDS);
sTraceRetentionTime = retentionTime;
}

/**
Expand Down Expand Up @@ -270,4 +289,29 @@ public CmdStatusBlock getCmdStatusBlock(long jobControlId)
.collect(Collectors.toList());
return new CmdStatusBlock(cmdInfo.getJobControlId(), blockList, cmdInfo.getOperationType());
}

private void cleanUnableTracedJobs() {
long currentTime = System.currentTimeMillis();
for (Map.Entry<Long, CmdInfo> x : mInfoMap.entrySet()) {
CmdInfo cmdInfo = x.getValue();
if (currentTime - cmdInfo.getJobSubmissionTime() > sTraceRetentionTime) {
try {
Status jobStatus = getCmdStatus(cmdInfo.getJobControlId());
if (jobStatus.isFinished()) {
mInfoMap.remove(cmdInfo.getJobControlId());
LOG.debug("JobControlId:{} has been cleaned in CmdJobTracker,"
+ " client will not trace the job anymore", cmdInfo.getJobControlId());
}
} catch (JobDoesNotExistException e) {
LOG.warn("JobControlId:{} can not find in CmdJobTracker when clean expired Job"
+ "with unexpected exception", cmdInfo.getJobControlId());
}
}
}
}

@Override
public void close() throws Exception {
mScheduleCleanExecutor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,29 @@ public void runDistLoadBatchCompleteTest() throws Exception {
Assert.assertEquals(s, Status.COMPLETED);
}

@Test
public void runCleanUnableTracedJobsTest() throws Exception {
testFindCmdIdsForMultipleCmds();
Thread.sleep(90000L);
// the expired job has been cleaned in mInfoMap
mSearchingCriteria.clear();
mSearchingCriteria.add(Status.CANCELED);
Set<Long> cancelCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria);
Assert.assertEquals(cancelCmdIds.size(), 0);
mSearchingCriteria.clear();
mSearchingCriteria.add(Status.COMPLETED);
Set<Long> completedCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria);
Assert.assertEquals(completedCmdIds.size(), 0);
mSearchingCriteria.clear();
mSearchingCriteria.add(Status.FAILED);
Set<Long> failedCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria);
Assert.assertEquals(failedCmdIds.size(), 0);
mSearchingCriteria.clear();
mSearchingCriteria.add(Status.RUNNING);
Set<Long> runningCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria);
Assert.assertEquals(runningCmdIds.size(), 3);
}

@Test
public void runDistLoadBatchFailTest() throws Exception {
CmdInfo cmdInfo = new CmdInfo(mLoadJobId, OperationType.DIST_LOAD,
Expand Down

0 comments on commit c176c77

Please sign in to comment.