Skip to content

Commit

Permalink
refactor stop job
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Mar 16, 2016
1 parent 84131d8 commit dcdd679
Show file tree
Hide file tree
Showing 34 changed files with 190 additions and 354 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public final class JobServer implements Serializable {

private boolean leader;

private boolean leaderStoped;
private boolean leaderStopped;

public enum ServerStatus {
READY,
Expand All @@ -55,14 +55,14 @@ public enum ServerStatus {
STOPED,
CRASHED;

public static ServerStatus getServerStatus(final String status, final boolean disabled, final boolean stoped) {
public static ServerStatus getServerStatus(final String status, final boolean disabled, final boolean stopped) {
if (Strings.isNullOrEmpty(status)) {
return ServerStatus.CRASHED;
}
if (disabled) {
return ServerStatus.DISABLED;
}
if (stoped) {
if (stopped) {
return ServerStatus.STOPED;
}
return ServerStatus.valueOf(status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ private JobServer getJobServer(final String jobName, final String leaderIp, fina
private ServerStatus getServerStatus(final String jobName, final String serverIp) {
String status = curatorRepository.getData(JobNodePath.getServerNodePath(jobName, serverIp, "status"));
boolean disabled = curatorRepository.checkExists(JobNodePath.getServerNodePath(jobName, serverIp, "disabled"));
boolean stoped = curatorRepository.checkExists(JobNodePath.getServerNodePath(jobName, serverIp, "stoped"));
return ServerStatus.getServerStatus(status, disabled, stoped);
boolean stopped = curatorRepository.checkExists(JobNodePath.getServerNodePath(jobName, serverIp, "stoped"));
return ServerStatus.getServerStatus(status, disabled, stopped);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private JobServer getJob(final String serverIp, final String jobName) {
result.setStatus(ServerStatus.getServerStatus(status, disabled, stopped));
String leaderIp = curatorRepository.getData(JobNodePath.getLeaderNodePath(jobName, "election/host"));
result.setLeader(serverIp.equals(leaderIp));
result.setLeaderStoped(curatorRepository.checkExists(JobNodePath.getServerNodePath(jobName, leaderIp, "stoped")));
result.setLeaderStopped(curatorRepository.checkExists(JobNodePath.getServerNodePath(jobName, leaderIp, "stoped")));
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*
* @author zhangliang
*/
public interface ElasticJob extends Job, Stoppable {
public interface ElasticJob extends Job {

/**
* 处理作业执行时异常.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,44 +178,20 @@ public Date getNextFireTime() {
*/
public void stopJob() {
try {
JobRegistry.getInstance().getJobInstance(jobName).stop();
scheduler.pauseAll();
} catch (final SchedulerException ex) {
throw new JobException(ex);
}
}

/**
* 恢复手工停止的作业.
* 恢复作业.
*/
public void resumeManualStoppedJob() {
public void resumeJob() {
try {
if (scheduler.isShutdown()) {
return;
if (!scheduler.isShutdown()) {
scheduler.resumeAll();
}
JobRegistry.getInstance().getJobInstance(jobName).resume();
scheduler.resumeAll();
} catch (final SchedulerException ex) {
throw new JobException(ex);
}
schedulerFacade.clearJobStoppedStatus();
}

/**
* 恢复因服务器崩溃而停止的作业.
*
* <p>
* 不会恢复手工设置停止运行的作业.
* </p>
*/
public void resumeCrashedJob() {
schedulerFacade.resumeCrashedJobInfo();
if (schedulerFacade.isJobStoppedManually()) {
return;
}
JobRegistry.getInstance().getJobInstance(jobName).resume();
try {
scheduler.resumeAll();
} catch (final SchedulerException ex) {
throw new JobException(ex);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
* @author zhangliang
*/
public class GuaranteeService {

private final JobConfiguration jobConfiguration;

private final JobNodeStorage jobNodeStorage;

public GuaranteeService(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration) {
this.jobConfiguration = jobConfiguration;
jobNodeStorage = new JobNodeStorage(coordinatorRegistryCenter, jobConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@
@Slf4j
public abstract class AbstractElasticJob implements ElasticJob {

@Getter(AccessLevel.PROTECTED)
private volatile boolean stopped;

@Getter(AccessLevel.PROTECTED)
private JobFacade jobFacade;

Expand All @@ -60,13 +57,13 @@ public final void execute(final JobExecutionContext context) throws JobExecution
}
executeJobInternal(shardingContext);
log.trace("Elastic job: execute normal completed, sharding context:{}.", shardingContext);
while (jobFacade.isExecuteMisfired(stopped, shardingContext.getShardingItems())) {
while (jobFacade.isExecuteMisfired(shardingContext.getShardingItems())) {
log.trace("Elastic job: execute misfired job, sharding context:{}.", shardingContext);
jobFacade.clearMisfire(shardingContext.getShardingItems());
executeJobInternal(shardingContext);
log.trace("Elastic job: misfired job completed, sharding context:{}.", shardingContext);
}
jobFacade.failoverIfNecessary(stopped);
jobFacade.failoverIfNecessary();
try {
jobFacade.afterJobExecuted(shardingContext);
//CHECKSTYLE:OFF
Expand Down Expand Up @@ -102,16 +99,6 @@ public void handleJobExecutionException(final JobExecutionException jobExecution
throw jobExecutionException;
}

@Override
public final void stop() {
stopped = true;
}

@Override
public final void resume() {
stopped = false;
}

public final void setJobFacade(final JobFacade jobFacade) {
this.jobFacade = jobFacade;
JobRegistry.getInstance().addJobInstance(jobFacade.getJobName(), this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ protected final void executeJob(final JobExecutionMultipleShardingContext shardi

private void executeThroughputStreamingJob(final JobExecutionMultipleShardingContext shardingContext) {
List<T> data = fetchDataForThroughput(shardingContext);
while (null != data && !data.isEmpty() && getJobFacade().isEligibleForJobRunning(isStopped())) {
while (null != data && !data.isEmpty() && getJobFacade().isEligibleForJobRunning()) {
processDataForThroughput(shardingContext, data);
data = fetchDataForThroughput(shardingContext);
}
Expand All @@ -113,7 +113,7 @@ private void executeThroughputOneOffJob(final JobExecutionMultipleShardingContex

private void executeSequenceStreamingJob(final JobExecutionMultipleShardingContext shardingContext) {
Map<Integer, List<T>> data = fetchDataForSequence(shardingContext);
while (!data.isEmpty() && getJobFacade().isEligibleForJobRunning(isStopped())) {
while (!data.isEmpty() && getJobFacade().isEligibleForJobRunning()) {
processDataForSequence(shardingContext, data);
data = fetchDataForSequence(shardingContext);
}
Expand Down Expand Up @@ -155,11 +155,7 @@ public void run() {
}
});
}
try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
latchAwait(latch);
}

private Map<Integer, List<T>> fetchDataForSequence(final JobExecutionMultipleShardingContext shardingContext) {
Expand All @@ -183,11 +179,7 @@ public void run() {
}
});
}
try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
latchAwait(latch);
log.trace("Elastic job: fetch data size: {}.", result.size());
return result;
}
Expand All @@ -208,11 +200,7 @@ public void run() {
}
});
}
try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
latchAwait(latch);
}

protected abstract void processDataWithStatistics(C shardingContext, List<T> data);
Expand All @@ -231,4 +219,12 @@ public ExecutorService getExecutorService() {
public void handleJobExecutionException(final JobExecutionException jobExecutionException) throws JobExecutionException {
log.error("Elastic job: exception occur in job processing...", jobExecutionException.getCause());
}

private void latchAwait(final CountDownLatch latch) {
try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.dangdang.ddframe.job.internal.execution.ExecutionService;
import com.dangdang.ddframe.job.internal.failover.FailoverService;
import com.dangdang.ddframe.job.internal.offset.OffsetService;
import com.dangdang.ddframe.job.internal.server.ServerService;
import com.dangdang.ddframe.job.internal.sharding.ShardingService;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;

Expand All @@ -41,6 +42,8 @@ public class JobFacade {

private final ShardingService shardingService;

private final ServerService serverService;

private final ExecutionContextService executionContextService;

private final ExecutionService executionService;
Expand All @@ -54,6 +57,7 @@ public class JobFacade {
public JobFacade(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration, final List<ElasticJobListener> elasticJobListeners) {
configService = new ConfigurationService(coordinatorRegistryCenter, jobConfiguration);
shardingService = new ShardingService(coordinatorRegistryCenter, jobConfiguration);
serverService = new ServerService(coordinatorRegistryCenter, jobConfiguration);
executionContextService = new ExecutionContextService(coordinatorRegistryCenter, jobConfiguration);
executionService = new ExecutionService(coordinatorRegistryCenter, jobConfiguration);
failoverService = new FailoverService(coordinatorRegistryCenter, jobConfiguration);
Expand Down Expand Up @@ -93,11 +97,9 @@ public void checkMaxTimeDiffSecondsTolerable() {

/**
* 如果需要失效转移, 则设置作业失效转移.
*
* @param stopped 作业是否需要停止
*/
public void failoverIfNecessary(final boolean stopped) {
if (configService.isFailover() && !stopped) {
public void failoverIfNecessary() {
if (configService.isFailover() && !serverService.isJobStoppedManually()) {
failoverService.failoverIfNecessary();
}
}
Expand Down Expand Up @@ -155,24 +157,22 @@ public void clearMisfire(final List<Integer> shardingItems) {
/**
* 判断作业是否需要执行错过的任务.
*
* @param stopped 作业是否需要停止
* @param shardingItems 任务分片项集合
* @return 作业是否需要执行错过的任务
*/
public boolean isExecuteMisfired(final boolean stopped, final List<Integer> shardingItems) {
return isEligibleForJobRunning(stopped) && configService.isMisfire() && !executionService.getMisfiredJobItems(shardingItems).isEmpty();
public boolean isExecuteMisfired(final List<Integer> shardingItems) {
return isEligibleForJobRunning() && configService.isMisfire() && !executionService.getMisfiredJobItems(shardingItems).isEmpty();
}

/**
* 判断作业是否符合继续运行的条件.
*
* <p>如果作业停止或需要重分片则作业将不会继续运行.</p>
*
* @param stopped 作业是否需要停止
* @return 作业是否符合继续运行的条件
*/
public boolean isEligibleForJobRunning(final boolean stopped) {
return !stopped && !shardingService.isNeedSharding();
public boolean isEligibleForJobRunning() {
return !serverService.isJobStoppedManually() && !shardingService.isNeedSharding();
}

/**判断是否需要重分片.
Expand All @@ -192,7 +192,7 @@ public boolean isNeedSharding() {
public void updateOffset(final int item, final String offset) {
offsetService.updateOffset(item, offset);
}

/**
* 作业执行前的执行的方法.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,30 +87,6 @@ public void releaseJobResource() {
statisticsService.stopProcessCountJob();
}

/**
* 恢复因服务器崩溃而停止的作业信息.
*/
public void resumeCrashedJobInfo() {
serverService.persistServerOnline();
executionService.clearRunningInfo(shardingService.getLocalHostShardingItems());
}

/**
* 清除停止作业的标记.
*/
public void clearJobStoppedStatus() {
serverService.clearJobStoppedStatus();
}

/**
* 判断是否是手工停止的作业.
*
* @return 是否是手工停止的作业
*/
public boolean isJobStoppedManually() {
return serverService.isJobStoppedManually();
}

/**
* 获取作业启动时间的cron表达式.
*
Expand Down
Loading

0 comments on commit dcdd679

Please sign in to comment.