Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed bug #158

Merged
merged 7 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public enum CodeEnum implements CodeExceptionAssert {
HTTP_PROCESSOR_STRING_V_INVALID(422, "Http string value cat not be empty!"),
JOB_CRON_INTERVAL_INVALID(423, "Job cron interval invalid!"),
JOB_FIXED_RATE_INTERVAL_INVALID(424, "Job fixed rate interval invalid!"),
JOB_SECOND_DELAY_INTERVAL_INVALID(425, "Job second delay interval invalid!"),

// Delay
DELAY_TOPIC_EXIST(500, "Topic is exist!"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,14 @@ private PageVO<ListTaskVO> getTaskListBySecond(ListTaskRequest request) {

// Pull task from worker at first page
if (NumberUtils.INTEGER_ZERO.equals(request.getPage() - 1)) {
Long pullCircleId = 0L;
JobInstanceTask latestParentTask = this.jobInstanceTaskDAO.getLatestParentTask(request.getJobInstanceId(), TaskConstant.DEFAULT_PARENT_ID);
if (Objects.isNull(latestParentTask)) {
return pageVO;
if (Objects.nonNull(latestParentTask)) {
pullCircleId = latestParentTask.getCircleId();
}

// Add pull tasks
List<ListTaskVO> taskList = this.pullTaskListFromWorker(latestParentTask.getCircleId(), jobInstance);
List<ListTaskVO> taskList = this.pullTaskListFromWorker(pullCircleId, jobInstance);
pageVO.getList().addAll(0, taskList);
}
return pageVO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ public PageVO<ListJobVO> getPageList(ListJobRequest request) {
* @param request request
*/
private void validAndInitJob(AddJobRequest request) {
// Processor type valid
if (ProcessorTypeEnum.isShell(request.getProcessorType())) {
// Shell
if (StringUtils.isBlank(request.getShellProcessorType())) {
Expand Down Expand Up @@ -325,8 +326,9 @@ private void validAndInitJob(AddJobRequest request) {
request.setProcessorInfo(JsonUtil.encode(httpProcessor));
}

// ShardingParams
// Execute type valid
if (ExecuteTypeEnum.isSharding(request.getExecuteType())) {
// ShardingParams
CodeEnum.SHARDING_PARAMS_INVALID.assertIsFalse(StringUtils.isBlank(request.getShardingParams()));

// Format
Expand All @@ -338,20 +340,25 @@ private void validAndInitJob(AddJobRequest request) {
request.setParams(request.getShardingParams());
}

// Cron
// Time expression type valid
if (TimeExpressionTypeEnum.isCron(request.getTimeExpressionType())) {
// Cron
long one = this.parseTimeExpression(request.getTimeExpression(), null);
long two = this.parseTimeExpression(request.getTimeExpression(), new Date(one * 1000L));
if ((two - one) < TimeUnit.MINUTES.toSeconds(1)) {
CodeEnum.JOB_CRON_INTERVAL_INVALID.throwException();
}
}

// Fixed rate
if (TimeExpressionTypeEnum.isFixedRate(request.getTimeExpressionType())) {
} else if (TimeExpressionTypeEnum.isFixedRate(request.getTimeExpressionType())) {
// Fixed rate
if (TimeUnit.MINUTES.toSeconds(1) >= request.getTimeExpressionValue()) {
CodeEnum.JOB_FIXED_RATE_INTERVAL_INVALID.throwException();
}
} else if (TimeExpressionTypeEnum.isSecondDelay(request.getTimeExpressionType())) {
// Second delay
long delay = Optional.ofNullable(request.getTimeExpressionValue()).orElse(0L);
if (delay <= 0 || delay > 60) {
CodeEnum.JOB_SECOND_DELAY_INTERVAL_INVALID.throwException();
}
}
}

Expand Down Expand Up @@ -392,6 +399,9 @@ private void updateJobBySecond(Job updateJob) {
private void createSecondJobInstance(Job job) {
Long timestamp = DateUtil.timestamp();
JobInstance jobInstance = BeanMapperUtil.map(job, JobInstance.class);

// Fixed save to update bug
jobInstance.setId(null);
jobInstance.setJobId(job.getId());
jobInstance.setDeleteTime(0L);
jobInstance.setDeleted(CommonConstant.NO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,16 @@ public AbstractRunnable(Long currentSlotId) {
public void setFinish(Boolean finish) {
this.finish.set(finish);
}

/**
* Fail sleep
*/
protected void failSleep(){
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public void run() {
break;
} catch (Throwable ex) {
log.error("List delay instance failed!", ex);
this.failSleep();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void run() {
break;
} catch (Throwable ex) {
log.error("Delete list delay instance failed!", ex);
this.failSleep();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void run() {
break;
} catch (Throwable ex) {
log.error("Range delay fail instance failed!", ex);
this.failSleep();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void run() {
break;
} catch (Throwable ex) {
log.error("Status list delay instance failed!", ex);
this.failSleep();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public void run() {
break;
} catch (Throwable ex) {
log.error("Range delay instance failed!", ex);
this.failSleep();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ ${AnsiColor.GREEN}
| |_| | | |_) | | __/ | | | | | | | (_) | | |_) |
\___/ | .__/ \___| |_| |_| _/ | \___/ |_.__/
|_| |__/
:: Version :: Openjob Server(v1.0.6) Spring Boot(v${spring-boot.version}) Akka(v2.6.19)
:: Version :: Openjob Server(v1.0.7) Spring Boot(v${spring-boot.version}) Akka(v2.6.19)
:: Website :: https://openjob.io
:: Author :: https://github.com/stelin
:: Github :: https://github.com/open-job/openjob

Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
ALTER TABLE `job_instance`
ADD `dispatch_version` bigint(20) NOT NULL DEFAULT '0' COMMENT 'Dispatch version' AFTER `last_report_time`;
ALTER TABLE `job_instance`
ADD `execute_once` tinyint(2) NOT NULL DEFAULT '2' COMMENT 'Execute once, 1=yes 2=no' AFTER `execute_once`;

ADD `execute_once` tinyint(2) NOT NULL DEFAULT '2' COMMENT 'Execute once, 1=yes 2=no' AFTER `execute_type`;

#`job_instance_task`
# ------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
akka {
# Dead letters log
log-dead-letters = off

# Coordinated configure
coordinated-shutdown {
terminate-actor-system = on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ public void completeTask() throws InterruptedException {
// Remove task from manager
this.removeTaskFromManager();

// Not second delay task or execute once
if (!this.isSecondDelay() || CommonConstant.YES.equals(this.jobInstanceDTO.getExecuteOnce())) {
// Stop complete: any task to destroy task container
// Normal complete: not second delay task or execute once to destroy task container
if (this.stopping.get() > 0 || !this.isSecondDelay() || CommonConstant.YES.equals(this.jobInstanceDTO.getExecuteOnce())) {
// When task complete reset status.
this.running.set(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public void stop(Integer type) {
// Second delay to shut down scheduler
if (this.isSecondDelay()) {
this.secondDelayService.shutdown();
return;
}

super.stop(type);
Expand Down
Loading