Skip to content

Commit

Permalink
For #156 refactor job facade.
Browse files Browse the repository at this point in the history
  • Loading branch information
haocao committed Nov 9, 2016
1 parent 350f2c5 commit f9bf288
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

import com.dangdang.ddframe.job.config.JobRootConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.event.JobEvent;
import com.dangdang.ddframe.job.event.JobEventBus;
import com.dangdang.ddframe.job.event.type.JobExecutionEvent;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent.State;
import com.dangdang.ddframe.job.exception.JobExecutionEnvironmentException;
import com.dangdang.ddframe.job.executor.JobFacade;
import com.dangdang.ddframe.job.executor.ShardingContexts;
Expand Down Expand Up @@ -104,7 +105,11 @@ public void afterJobExecuted(final ShardingContexts shardingContexts) {
}

@Override
public void postJobEvent(final JobEvent jobEvent) {
jobEventBus.post(jobEvent);
public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
jobEventBus.post(jobExecutionEvent);
}

@Override
public void postJobStatusTraceEvent(final String taskId, final State state, final String message) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void assertAfterJobExecuted() {

@Test
public void assertPostJobEvent() {
jobFacade.postJobEvent(null);
jobFacade.postJobExecutionEvent(null);
verify(eventBus).post(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.dangdang.ddframe.job.cloud.scheduler.context.TaskContext;
import com.dangdang.ddframe.job.event.JobEventBus;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent.Source;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent.State;
import com.netflix.fenzo.TaskScheduler;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -81,8 +82,8 @@ public void statusUpdate(final SchedulerDriver schedulerDriver, final Protos.Tas
TaskContext taskContext = TaskContext.from(taskId);
log.trace("call statusUpdate task state is: {}, task id is: {}", taskStatus.getState(), taskId);
jobEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(), taskContext.getSlaveId(),
taskContext.getType().name(), String.valueOf(taskContext.getMetaInfo().getShardingItem()),
State.valueOf(taskStatus.getState().name()), String.format("source is: %s, message is: %s.", taskStatus.getSource(), taskStatus.getMessage())));
taskContext.getType().name(), String.valueOf(taskContext.getMetaInfo().getShardingItem()),
Source.CLOUD_SCHEDULER, State.valueOf(taskStatus.getState().name()), String.format("source is: %s, message is: %s.", taskStatus.getSource(), taskStatus.getMessage())));
switch (taskStatus.getState()) {
case TASK_RUNNING:
if ("BEGIN".equals(taskStatus.getMessage())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.dangdang.ddframe.job.cloud.scheduler.context.TaskContext;
import com.dangdang.ddframe.job.event.JobEventBus;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent.Source;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent.State;
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.util.concurrent.BlockUtils;
Expand Down Expand Up @@ -90,7 +91,7 @@ public void run() {
TaskContext taskContext = TaskContext.from(taskInfo.getTaskId().getValue());
facadeService.addRunning(taskContext);
jobEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(), taskContext.getSlaveId(),
taskContext.getType().name(), String.valueOf(taskContext.getMetaInfo().getShardingItem()),
taskContext.getType().name(), String.valueOf(taskContext.getMetaInfo().getShardingItem()), Source.CLOUD_SCHEDULER,
State.TASK_STAGING, String.format("task info is: %s", taskInfo)));
}
facadeService.removeLaunchTasksFromQueue(Lists.transform(taskInfoList, new Function<TaskInfo, TaskContext>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class JobStatusTraceEvent implements JobEvent {

private final String shardingItem;

private final Source source;

private final State state;

private final String message;
Expand All @@ -44,4 +46,8 @@ public class JobStatusTraceEvent implements JobEvent {
public enum State {
TASK_STAGING, TASK_RUNNING, TASK_FINISHED, TASK_KILLED, TASK_LOST, TASK_FAILED, TASK_ERROR
}

public enum Source {
CLOUD_SCHEDULER, CLOUD_EXECUTOR, LITE_EXECUTOR
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private void process(final ShardingContexts shardingContexts, final JobExecution
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
if (1 == items.size()) {
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
process(shardingContexts, item, jobExecutionEvent);
return;
}
Expand All @@ -171,7 +171,7 @@ public void run() {
}

private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent jobExecutionEvent) {
jobFacade.postJobEvent(jobExecutionEvent);
jobFacade.postJobExecutionEvent(jobExecutionEvent);
log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
try {
process(new ShardingContext(shardingContexts, item));
Expand All @@ -183,7 +183,7 @@ private void process(final ShardingContexts shardingContexts, final int item, fi
jobExecutionEvent.executionFailure(ex);
jobExceptionHandler.handleException(jobName, ex);
} finally {
jobFacade.postJobEvent(jobExecutionEvent);
jobFacade.postJobExecutionEvent(jobExecutionEvent);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package com.dangdang.ddframe.job.executor;

import com.dangdang.ddframe.job.config.JobRootConfiguration;
import com.dangdang.ddframe.job.event.JobEvent;
import com.dangdang.ddframe.job.event.type.JobExecutionEvent;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent;
import com.dangdang.ddframe.job.exception.JobExecutionEnvironmentException;

import java.util.Collection;
Expand Down Expand Up @@ -130,9 +131,18 @@ public interface JobFacade {
void afterJobExecuted(ShardingContexts shardingContexts);

/**
* 发布事件.
* 发布执行事件.
*
* @param jobEvent 作业事件
* @param jobExecutionEvent 作业执行事件
*/
void postJobEvent(JobEvent jobEvent);
void postJobExecutionEvent(JobExecutionEvent jobExecutionEvent);

/**
* 发布作业状态追踪事件.
*
* @param taskId 作业Id
* @param state 作业执行状态
* @param message 作业执行消息
*/
void postJobStatusTraceEvent(String taskId, JobStatusTraceEvent.State state, String message);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.dangdang.ddframe.job.event.JobEventListenerConfigurationException;
import com.dangdang.ddframe.job.event.type.JobExecutionEvent;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent.Source;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent.State;
import org.apache.commons.dbcp.BasicDataSource;
import org.junit.Before;
Expand Down Expand Up @@ -71,7 +72,7 @@ public void assertPostJobExecutionEvent() {

@Test
public void assertPostJobStatusTraceEvent() {
JobStatusTraceEvent jobStatusTraceEvent = new JobStatusTraceEvent(JOB_NAME, "fake_task_id", "fake_slave_id", "READY", "0", State.TASK_RUNNING, "message is empty.");
JobStatusTraceEvent jobStatusTraceEvent = new JobStatusTraceEvent(JOB_NAME, "fake_task_id", "fake_slave_id", "READY", "0", Source.LITE_EXECUTOR, State.TASK_RUNNING, "message is empty.");
jobEventBus.post(jobStatusTraceEvent);
verify(repository, atMost(1)).addJobStatusTraceEvent(jobStatusTraceEvent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.dangdang.ddframe.job.event.type.JobExecutionEvent;
import com.dangdang.ddframe.job.event.type.JobExecutionEvent.ExecutionSource;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent.Source;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent.State;
import org.apache.commons.dbcp.BasicDataSource;
import org.junit.Before;
Expand Down Expand Up @@ -53,7 +54,8 @@ public void assertAddJobExecutionEvent() throws SQLException {

@Test
public void assertAddJobStatusTraceEvent() throws SQLException {
assertTrue(storage.addJobStatusTraceEvent(new JobStatusTraceEvent("test_job", "fake_task_id", "fake_slave_id", "READY", "0", State.TASK_RUNNING, "message is empty.")));
assertTrue(storage.addJobStatusTraceEvent(new JobStatusTraceEvent("test_job", "fake_task_id", "fake_slave_id", "READY", "0",
Source.LITE_EXECUTOR, State.TASK_RUNNING, "message is empty.")));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package com.dangdang.ddframe.job.lite.internal.schedule;

import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.event.JobEvent;
import com.dangdang.ddframe.job.event.JobEventBus;
import com.dangdang.ddframe.job.event.type.JobExecutionEvent;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent.State;
import com.dangdang.ddframe.job.exception.JobExecutionEnvironmentException;
import com.dangdang.ddframe.job.executor.JobFacade;
import com.dangdang.ddframe.job.executor.ShardingContexts;
Expand Down Expand Up @@ -165,7 +166,13 @@ public void afterJobExecuted(final ShardingContexts shardingContexts) {
}

@Override
public void postJobEvent(final JobEvent jobEvent) {
jobEventBus.post(jobEvent);
public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
jobEventBus.post(jobExecutionEvent);
}

@Override
public void postJobStatusTraceEvent(final String taskId, final State state, final String message) {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ public void assertEligibleForJobRunningWhenNotJobPausedManuallyAndNotNeedShardin
}

@Test
public void assertPostJobEvent() {
liteJobFacade.postJobEvent(null);
public void assertPostJobExecutionEvent() {
liteJobFacade.postJobExecutionEvent(null);
verify(eventBus).post(null);
}
}

0 comments on commit f9bf288

Please sign in to comment.