diff --git a/core/common/src/main/java/alluxio/conf/PropertyKey.java b/core/common/src/main/java/alluxio/conf/PropertyKey.java index 4a9db8dccc0f..4e2f02702b23 100755 --- a/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -7357,6 +7357,12 @@ public String toString() { .setDefaultValue("60sec") .setScope(Scope.MASTER) .build(); + public static final PropertyKey JOB_MASTER_JOB_TRACE_RETENTION_TIME = + durationBuilder(Name.JOB_MASTER_JOB_TRACE_RETENTION_TIME) + .setDescription("The length of time the client can trace the submitted job.") + .setDefaultValue("1d") + .setScope(Scope.MASTER) + .build(); public static final PropertyKey JOB_MASTER_JOB_CAPACITY = longBuilder(Name.JOB_MASTER_JOB_CAPACITY) .setDescription("The total possible number of available job statuses in the job master. " @@ -9199,6 +9205,8 @@ public static final class Name { "alluxio.job.master.finished.job.purge.count"; public static final String JOB_MASTER_FINISHED_JOB_RETENTION_TIME = "alluxio.job.master.finished.job.retention.time"; + public static final String JOB_MASTER_JOB_TRACE_RETENTION_TIME = + "alluxio.job.master.job.trace.retention.time"; public static final String JOB_MASTER_JOB_CAPACITY = "alluxio.job.master.job.capacity"; public static final String JOB_MASTER_MASTER_HEARTBEAT_INTERVAL = "alluxio.job.master.master.heartbeat.interval"; diff --git a/job/server/src/main/java/alluxio/master/job/tracker/CmdJobTracker.java b/job/server/src/main/java/alluxio/master/job/tracker/CmdJobTracker.java index 2b7e6246d47f..803a718609e1 100644 --- a/job/server/src/main/java/alluxio/master/job/tracker/CmdJobTracker.java +++ b/job/server/src/main/java/alluxio/master/job/tracker/CmdJobTracker.java @@ -13,6 +13,8 @@ import alluxio.AlluxioURI; import alluxio.client.file.FileSystemContext; +import alluxio.conf.Configuration; +import alluxio.conf.PropertyKey; import alluxio.exception.ExceptionMessage; import alluxio.exception.JobDoesNotExistException; import alluxio.job.CmdConfig; @@ -36,6 +38,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.concurrent.ThreadSafe; @@ -43,7 +48,7 @@ * CmdJobTracker to schedule a Cmd job to run. */ @ThreadSafe -public class CmdJobTracker { +public class CmdJobTracker implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(CmdJobTracker.class); private final Map mInfoMap = new ConcurrentHashMap<>(0, 0.95f, Math.max(8, 2 * Runtime.getRuntime().availableProcessors())); @@ -52,6 +57,8 @@ public class CmdJobTracker { private final PersistRunner mPersistRunner; protected FileSystemContext mFsContext; public static final String DELIMITER = ","; + private final ScheduledExecutorService mScheduleCleanExecutor; + private static Long sTraceRetentionTime; /** * Create a new instance of {@link CmdJobTracker}. @@ -64,6 +71,11 @@ public CmdJobTracker(FileSystemContext fsContext, mDistLoadCliRunner = new DistLoadCliRunner(mFsContext, jobMaster); mMigrateCliRunner = new MigrateCliRunner(mFsContext, jobMaster); mPersistRunner = new PersistRunner(mFsContext, jobMaster); + mScheduleCleanExecutor = Executors.newSingleThreadScheduledExecutor(); + mScheduleCleanExecutor.scheduleAtFixedRate(this:: + cleanUnableTracedJobs, 60, 600, TimeUnit.SECONDS); + sTraceRetentionTime = Configuration.getMs( + PropertyKey.JOB_MASTER_JOB_TRACE_RETENTION_TIME); } /** @@ -72,15 +84,22 @@ public CmdJobTracker(FileSystemContext fsContext, * @param distLoadCliRunner DistributedLoad runner * @param migrateCliRunner DistributedCopy runner * @param persistRunner Persist runner + * @param retentionTime job retention time */ public CmdJobTracker(FileSystemContext fsContext, DistLoadCliRunner distLoadCliRunner, MigrateCliRunner migrateCliRunner, - PersistRunner persistRunner) { + PersistRunner persistRunner, + Long retentionTime + ) { mFsContext = fsContext; mDistLoadCliRunner = distLoadCliRunner; mMigrateCliRunner = migrateCliRunner; mPersistRunner = persistRunner; + mScheduleCleanExecutor = Executors.newSingleThreadScheduledExecutor(); + mScheduleCleanExecutor.scheduleAtFixedRate(this:: + cleanUnableTracedJobs, 60, 600, TimeUnit.SECONDS); + sTraceRetentionTime = retentionTime; } /** @@ -270,4 +289,29 @@ public CmdStatusBlock getCmdStatusBlock(long jobControlId) .collect(Collectors.toList()); return new CmdStatusBlock(cmdInfo.getJobControlId(), blockList, cmdInfo.getOperationType()); } + + private void cleanUnableTracedJobs() { + long currentTime = System.currentTimeMillis(); + for (Map.Entry x : mInfoMap.entrySet()) { + CmdInfo cmdInfo = x.getValue(); + if (currentTime - cmdInfo.getJobSubmissionTime() > sTraceRetentionTime) { + try { + Status jobStatus = getCmdStatus(cmdInfo.getJobControlId()); + if (jobStatus.isFinished()) { + mInfoMap.remove(cmdInfo.getJobControlId()); + LOG.debug("JobControlId:{} has been cleaned in CmdJobTracker," + + " client will not trace the job anymore", cmdInfo.getJobControlId()); + } + } catch (JobDoesNotExistException e) { + LOG.warn("JobControlId:{} can not find in CmdJobTracker when clean expired Job" + + "with unexpected exception", cmdInfo.getJobControlId()); + } + } + } + } + + @Override + public void close() throws Exception { + mScheduleCleanExecutor.shutdown(); + } } diff --git a/job/server/src/test/java/alluxio/master/job/tracker/CmdJobTrackerTest.java b/job/server/src/test/java/alluxio/master/job/tracker/CmdJobTrackerTest.java index dfc0575fa685..8abf8a30ec46 100644 --- a/job/server/src/test/java/alluxio/master/job/tracker/CmdJobTrackerTest.java +++ b/job/server/src/test/java/alluxio/master/job/tracker/CmdJobTrackerTest.java @@ -97,6 +97,29 @@ public void runDistLoadBatchCompleteTest() throws Exception { Assert.assertEquals(s, Status.COMPLETED); } + @Test + public void runCleanUnableTracedJobsTest() throws Exception { + testFindCmdIdsForMultipleCmds(); + Thread.sleep(90000L); + // the expired job has been cleaned in mInfoMap + mSearchingCriteria.clear(); + mSearchingCriteria.add(Status.CANCELED); + Set cancelCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria); + Assert.assertEquals(cancelCmdIds.size(), 0); + mSearchingCriteria.clear(); + mSearchingCriteria.add(Status.COMPLETED); + Set completedCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria); + Assert.assertEquals(completedCmdIds.size(), 0); + mSearchingCriteria.clear(); + mSearchingCriteria.add(Status.FAILED); + Set failedCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria); + Assert.assertEquals(failedCmdIds.size(), 0); + mSearchingCriteria.clear(); + mSearchingCriteria.add(Status.RUNNING); + Set runningCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria); + Assert.assertEquals(runningCmdIds.size(), 3); + } + @Test public void runDistLoadBatchFailTest() throws Exception { CmdInfo cmdInfo = new CmdInfo(mLoadJobId, OperationType.DIST_LOAD,