Skip to content

Commit

Permalink
Fixed #367, fixed disable/enable job/app process
Browse files Browse the repository at this point in the history
  • Loading branch information
gaohongtao committed Jun 29, 2017
1 parent c1fa796 commit 064eca0
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void childEvent(final CuratorFramework client, final TreeCacheEvent event
if (!jobConfig.getTypeConfig().getCoreConfig().isMisfire()) {
readyService.setMisfireDisabled(jobConfig.getJobName());
}
producerManager.reschedule(jobConfig);
producerManager.reschedule(jobConfig.getJobName());
} else if (isJobConfigNode(event, path, Type.NODE_REMOVED)) {
String jobName = path.substring(CloudJobConfigurationNode.ROOT.length() + 1, path.length());
producerManager.unschedule(jobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,19 @@ public void recordFailoverTask(final TaskContext taskContext) {
if (!jobConfigOptional.isPresent()) {
return;
}
if (isDisable(jobConfigOptional.get())) {
return;
}
CloudJobConfiguration jobConfig = jobConfigOptional.get();
if (jobConfig.getTypeConfig().getCoreConfig().isFailover() || CloudJobExecutionType.DAEMON == jobConfig.getJobExecutionType()) {
failoverService.add(taskContext);
}
}

private boolean isDisable(final CloudJobConfiguration jobConfiguration) {
return disableAppService.isDisabled(jobConfiguration.getAppName()) || disableJobService.isDisabled(jobConfiguration.getJobName());
}

/**
* 将瞬时作业放入待执行队列.
*
Expand Down Expand Up @@ -223,6 +230,13 @@ public Optional<String> getFailoverTaskId(final MetaInfo metaInfo) {
* @param jobName 作业名称
*/
public void addDaemonJobToReadyQueue(final String jobName) {
Optional<CloudJobConfiguration> jobConfigOptional = jobConfigService.load(jobName);
if (!jobConfigOptional.isPresent()) {
return;
}
if (isDisable(jobConfigOptional.get())) {
return;
}
readyService.addDaemon(jobName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void update(final CloudJobConfiguration jobConfig) {
throw new JobConfigurationException("Cannot found job '%s', please register first.", jobConfig.getJobName());
}
configService.update(jobConfig);
reschedule(jobConfig);
reschedule(jobConfig.getJobName());
}

/**
Expand All @@ -130,7 +130,6 @@ public void deregister(final String jobName) {
if (jobConfig.isPresent()) {
disableJobService.remove(jobName);
configService.remove(jobName);
transientProducerScheduler.deregister(jobConfig.get());
}
unschedule(jobName);
}
Expand Down Expand Up @@ -162,16 +161,23 @@ public void unschedule(final String jobName) {
}
runningService.remove(jobName);
readyService.remove(Lists.newArrayList(jobName));
Optional<CloudJobConfiguration> jobConfig = configService.load(jobName);
if (jobConfig.isPresent()) {
transientProducerScheduler.deregister(jobConfig.get());
}
}

/**
* 重新调度作业.
*
* @param jobConfig 作业配置
* @param jobName 作业名称
*/
public void reschedule(final CloudJobConfiguration jobConfig) {
unschedule(jobConfig.getJobName());
schedule(jobConfig);
public void reschedule(final String jobName) {
unschedule(jobName);
Optional<CloudJobConfiguration> jobConfig = configService.load(jobName);
if (jobConfig.isPresent()) {
schedule(jobConfig.get());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private Trigger buildTrigger(final String cron) {
return TriggerBuilder.newTrigger().withIdentity(cron).withSchedule(CronScheduleBuilder.cronSchedule(cron).withMisfireHandlingInstructionDoNothing()).build();
}

void deregister(final CloudJobConfiguration jobConfig) {
synchronized void deregister(final CloudJobConfiguration jobConfig) {
repository.remove(jobConfig.getJobName());
String cron = jobConfig.getTypeConfig().getCoreConfig().getCron();
if (!repository.containsKey(buildJobKey(cron))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ public void disable(@PathParam("appName") final String appName) {
public void enable(@PathParam("appName") final String appName) throws JSONException {
if (appConfigService.load(appName).isPresent()) {
disableAppService.remove(appName);
for (CloudJobConfiguration each : jobConfigService.loadAll()) {
if (appName.equals(each.getAppName())) {
producerManager.reschedule(each.getJobName());
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,10 @@ public boolean isDisabled(@PathParam("jobName") final String jobName) throws JSO
@DELETE
@Path("/{jobName}/disable")
public void enable(@PathParam("jobName") final String jobName) throws JSONException {
if (configService.load(jobName).isPresent()) {
Optional<CloudJobConfiguration> configOptional = configService.load(jobName);
if (configOptional.isPresent()) {
facadeService.enableJob(jobName);
producerManager.reschedule(jobName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,31 +58,31 @@ public void setUp() throws NoSuchFieldException {
public void assertChildEventWhenDataIsNull() throws Exception {
cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, null));
verify(producerManager, times(0)).schedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.<String>any());
verify(producerManager, times(0)).unschedule(ArgumentMatchers.<String>any());
}

@Test
public void assertChildEventWhenIsNotConfigPath() throws Exception {
cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/other/test_job", null, "".getBytes())));
verify(producerManager, times(0)).schedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.<String>any());
verify(producerManager, times(0)).unschedule(ArgumentMatchers.<String>any());
}

@Test
public void assertChildEventWhenIsRootConfigPath() throws Exception {
cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/config/job", null, "".getBytes())));
verify(producerManager, times(0)).schedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.<String>any());
verify(producerManager, times(0)).unschedule(ArgumentMatchers.<String>any());
}

@Test
public void assertChildEventWhenStateIsAddAndIsConfigPathAndInvalidData() throws Exception {
cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData("/config/job/test_job", null, "".getBytes())));
verify(producerManager, times(0)).schedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.<String>any());
verify(producerManager, times(0)).unschedule(ArgumentMatchers.<String>any());
}

Expand All @@ -96,23 +96,23 @@ public void assertChildEventWhenStateIsAddAndIsConfigPath() throws Exception {
public void assertChildEventWhenStateIsUpdateAndIsConfigPathAndTransientJob() throws Exception {
cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson().getBytes())));
verify(readyService, times(0)).remove(Collections.singletonList("test_job"));
verify(producerManager).reschedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager).reschedule(ArgumentMatchers.<String>any());
}

@Test
public void assertChildEventWhenStateIsUpdateAndIsConfigPathAndDaemonJob() throws Exception {
cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED,
new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson(CloudJobExecutionType.DAEMON).getBytes())));
verify(readyService).remove(Collections.singletonList("test_job"));
verify(producerManager).reschedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager).reschedule(ArgumentMatchers.<String>any());
}

@Test
public void assertChildEventWhenStateIsUpdateAndIsConfigPathAndMisfireDisabled() throws Exception {
cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED,
new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson(false).getBytes())));
verify(readyService).setMisfireDisabled("test_job");
verify(producerManager).reschedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager).reschedule(ArgumentMatchers.<String>any());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ public void assertLoadJobConfigWhenAbsent() {

@Test
public void assertAddDaemonJobToReadyQueue() {
when(jobConfigService.load("test_job")).thenReturn(Optional.of(CloudJobConfigurationBuilder.createCloudJobConfiguration("test_job")));
facadeService.addDaemonJobToReadyQueue("test_job");
verify(readyService).addDaemon("test_job");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void assertUpdate() throws Exception {
public void assertDeregister() throws Exception {
when(getRegCenter().isExisted("/config/job/test_job")).thenReturn(false);
assertThat(sentRequest("http://127.0.0.1:19000/api/job/deregister", "DELETE", "test_job"), is(204));
verify(getRegCenter(), times(2)).get("/config/job/test_job");
verify(getRegCenter(), times(3)).get("/config/job/test_job");
}

@Test
Expand Down

0 comments on commit 064eca0

Please sign in to comment.