Skip to content

Commit

Permalink
Merge pull request #158 from stelin/1.x
Browse files Browse the repository at this point in the history
Fixed bug
  • Loading branch information
stelin authored Sep 7, 2023
2 parents 5c06a3c + faf96ba commit 23d661b
Show file tree
Hide file tree
Showing 14 changed files with 47 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public enum CodeEnum implements CodeExceptionAssert {
HTTP_PROCESSOR_STRING_V_INVALID(422, "Http string value cat not be empty!"),
JOB_CRON_INTERVAL_INVALID(423, "Job cron interval invalid!"),
JOB_FIXED_RATE_INTERVAL_INVALID(424, "Job fixed rate interval invalid!"),
JOB_SECOND_DELAY_INTERVAL_INVALID(425, "Job second delay interval invalid!"),

// Delay
DELAY_TOPIC_EXIST(500, "Topic is exist!"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,14 @@ private PageVO<ListTaskVO> getTaskListBySecond(ListTaskRequest request) {

// Pull task from worker at first page
if (NumberUtils.INTEGER_ZERO.equals(request.getPage() - 1)) {
Long pullCircleId = 0L;
JobInstanceTask latestParentTask = this.jobInstanceTaskDAO.getLatestParentTask(request.getJobInstanceId(), TaskConstant.DEFAULT_PARENT_ID);
if (Objects.isNull(latestParentTask)) {
return pageVO;
if (Objects.nonNull(latestParentTask)) {
pullCircleId = latestParentTask.getCircleId();
}

// Add pull tasks
List<ListTaskVO> taskList = this.pullTaskListFromWorker(latestParentTask.getCircleId(), jobInstance);
List<ListTaskVO> taskList = this.pullTaskListFromWorker(pullCircleId, jobInstance);
pageVO.getList().addAll(0, taskList);
}
return pageVO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ public PageVO<ListJobVO> getPageList(ListJobRequest request) {
* @param request request
*/
private void validAndInitJob(AddJobRequest request) {
// Processor type valid
if (ProcessorTypeEnum.isShell(request.getProcessorType())) {
// Shell
if (StringUtils.isBlank(request.getShellProcessorType())) {
Expand Down Expand Up @@ -325,8 +326,9 @@ private void validAndInitJob(AddJobRequest request) {
request.setProcessorInfo(JsonUtil.encode(httpProcessor));
}

// ShardingParams
// Execute type valid
if (ExecuteTypeEnum.isSharding(request.getExecuteType())) {
// ShardingParams
CodeEnum.SHARDING_PARAMS_INVALID.assertIsFalse(StringUtils.isBlank(request.getShardingParams()));

// Format
Expand All @@ -338,20 +340,25 @@ private void validAndInitJob(AddJobRequest request) {
request.setParams(request.getShardingParams());
}

// Cron
// Time expression type valid
if (TimeExpressionTypeEnum.isCron(request.getTimeExpressionType())) {
// Cron
long one = this.parseTimeExpression(request.getTimeExpression(), null);
long two = this.parseTimeExpression(request.getTimeExpression(), new Date(one * 1000L));
if ((two - one) < TimeUnit.MINUTES.toSeconds(1)) {
CodeEnum.JOB_CRON_INTERVAL_INVALID.throwException();
}
}

// Fixed rate
if (TimeExpressionTypeEnum.isFixedRate(request.getTimeExpressionType())) {
} else if (TimeExpressionTypeEnum.isFixedRate(request.getTimeExpressionType())) {
// Fixed rate
if (TimeUnit.MINUTES.toSeconds(1) >= request.getTimeExpressionValue()) {
CodeEnum.JOB_FIXED_RATE_INTERVAL_INVALID.throwException();
}
} else if (TimeExpressionTypeEnum.isSecondDelay(request.getTimeExpressionType())) {
// Second delay
long delay = Optional.ofNullable(request.getTimeExpressionValue()).orElse(0L);
if (delay <= 0 || delay > 60) {
CodeEnum.JOB_SECOND_DELAY_INTERVAL_INVALID.throwException();
}
}
}

Expand Down Expand Up @@ -392,6 +399,9 @@ private void updateJobBySecond(Job updateJob) {
private void createSecondJobInstance(Job job) {
Long timestamp = DateUtil.timestamp();
JobInstance jobInstance = BeanMapperUtil.map(job, JobInstance.class);

// Fixed save to update bug
jobInstance.setId(null);
jobInstance.setJobId(job.getId());
jobInstance.setDeleteTime(0L);
jobInstance.setDeleted(CommonConstant.NO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,16 @@ public AbstractRunnable(Long currentSlotId) {
public void setFinish(Boolean finish) {
this.finish.set(finish);
}

/**
* Fail sleep
*/
protected void failSleep(){
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public void run() {
break;
} catch (Throwable ex) {
log.error("List delay instance failed!", ex);
this.failSleep();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void run() {
break;
} catch (Throwable ex) {
log.error("Delete list delay instance failed!", ex);
this.failSleep();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void run() {
break;
} catch (Throwable ex) {
log.error("Range delay fail instance failed!", ex);
this.failSleep();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void run() {
break;
} catch (Throwable ex) {
log.error("Status list delay instance failed!", ex);
this.failSleep();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public void run() {
break;
} catch (Throwable ex) {
log.error("Range delay instance failed!", ex);
this.failSleep();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ ${AnsiColor.GREEN}
| |_| | | |_) | | __/ | | | | | | | (_) | | |_) |
\___/ | .__/ \___| |_| |_| _/ | \___/ |_.__/
|_| |__/
:: Version :: Openjob Server(v1.0.6) Spring Boot(v${spring-boot.version}) Akka(v2.6.19)
:: Version :: Openjob Server(v1.0.7) Spring Boot(v${spring-boot.version}) Akka(v2.6.19)
:: Website :: https://openjob.io
:: Author :: https://github.com/stelin
:: Github :: https://github.com/open-job/openjob

Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
ALTER TABLE `job_instance`
ADD `dispatch_version` bigint(20) NOT NULL DEFAULT '0' COMMENT 'Dispatch version' AFTER `last_report_time`;
ALTER TABLE `job_instance`
ADD `execute_once` tinyint(2) NOT NULL DEFAULT '2' COMMENT 'Execute once, 1=yes 2=no' AFTER `execute_once`;

ADD `execute_once` tinyint(2) NOT NULL DEFAULT '2' COMMENT 'Execute once, 1=yes 2=no' AFTER `execute_type`;

#`job_instance_task`
# ------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
akka {
# Dead letters log
log-dead-letters = off

# Coordinated configure
coordinated-shutdown {
terminate-actor-system = on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ public void completeTask() throws InterruptedException {
// Remove task from manager
this.removeTaskFromManager();

// Not second delay task or execute once
if (!this.isSecondDelay() || CommonConstant.YES.equals(this.jobInstanceDTO.getExecuteOnce())) {
// Stop complete: any task to destroy task container
// Normal complete: not second delay task or execute once to destroy task container
if (this.stopping.get() > 0 || !this.isSecondDelay() || CommonConstant.YES.equals(this.jobInstanceDTO.getExecuteOnce())) {
// When task complete reset status.
this.running.set(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public void stop(Integer type) {
// Second delay to shut down scheduler
if (this.isSecondDelay()) {
this.secondDelayService.shutdown();
return;
}

super.stop(type);
Expand Down

0 comments on commit 23d661b

Please sign in to comment.