Skip to content

Commit

Permalink
For #186.
Browse files Browse the repository at this point in the history
  • Loading branch information
haocao committed Nov 29, 2016
1 parent 68b2e28 commit 2b4a527
Show file tree
Hide file tree
Showing 30 changed files with 128 additions and 574 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler;
import com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler;
import com.dangdang.ddframe.job.util.json.GsonFactory;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;

import java.util.LinkedHashMap;
Expand All @@ -31,10 +33,11 @@
*
* @author zhangliang
*/
@RequiredArgsConstructor
@AllArgsConstructor
@NoArgsConstructor
public final class JobProperties {

private final Map<JobPropertiesEnum, String> map = new LinkedHashMap<>(JobPropertiesEnum.values().length, 1);
private Map<JobPropertiesEnum, String> map = new LinkedHashMap<>(JobPropertiesEnum.values().length, 1);

/**
* 设置作业属性.
Expand Down
1 change: 1 addition & 0 deletions elastic-job-doc/content/post/release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ weight=1
### 结构调整

1. [ISSUE #184](https://github.com/dangdangdotcom/elastic-job/issues/184) ExecutorServiceHandler接口方法调整,增加jobName区分用来区分不同作业线程名
1. [ISSUE #186](https://github.com/dangdangdotcom/elastic-job/issues/186) 去除Spring命名空间DTO相关代码,简化SpringJobScheduler使用

## 2.0.2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.example.job.dataflow.SpringDataflowJob;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.schedule.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
Expand All @@ -50,8 +49,7 @@ public DataflowJob dataflowJob() {
@Bean(initMethod = "init")
public JobScheduler dataflowJobScheduler(final DataflowJob dataflowJob, @Value("${simpleJob.cron}") final String cron, @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
@Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) {
return new SpringJobScheduler(dataflowJob, regCenter, getLiteJobConfiguration(dataflowJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration,
new ElasticJobListener[0]);
return new SpringJobScheduler(dataflowJob, regCenter, getLiteJobConfiguration(dataflowJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
}

private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends DataflowJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.example.job.simple.SpringSimpleJob;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.schedule.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
Expand All @@ -50,8 +49,7 @@ public SimpleJob simpleJob() {
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${simpleJob.cron}") final String cron, @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
@Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) {
return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration,
new ElasticJobListener[0]);
return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
}

private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package com.dangdang.ddframe.job.lite.spring.namespace.parser.common;

import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.executor.handler.JobProperties;
import com.dangdang.ddframe.job.executor.handler.JobProperties.JobPropertiesEnum;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.schedule.SpringJobScheduler;
import com.google.common.base.Strings;
import org.springframework.beans.factory.config.BeanDefinition;
Expand All @@ -29,30 +33,32 @@
import org.springframework.util.xml.DomUtils;
import org.w3c.dom.Element;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.CLASS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.CRON_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.DESCRIPTION_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.DISABLED_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.DISTRIBUTED_LISTENER_COMPLETED_TIMEOUT_MILLISECONDS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.DISTRIBUTED_LISTENER_STARTED_TIMEOUT_MILLISECONDS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.DISTRIBUTED_LISTENER_TAG;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.EVENT_TRACE_RDB_DATA_SOURCE_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.EXECUTOR_SERVICE_HANDLER_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.FAILOVER_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.JOB_EXCEPTION_HANDLER_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.JOB_PARAMETER_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.JOB_SHARDING_STRATEGY_CLASS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.LISTENER_TAG;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.MAX_TIME_DIFF_SECONDS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.MISFIRE_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.MONITOR_EXECUTION_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.MONITOR_PORT_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.OVERWRITE_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.REGISTRY_CENTER_REF_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.SHARDING_ITEM_PARAMETERS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.SHARDING_TOTAL_COUNT_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.CLASS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.CRON_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.DESCRIPTION_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.DISABLED_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.DISTRIBUTED_LISTENER_COMPLETED_TIMEOUT_MILLISECONDS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.DISTRIBUTED_LISTENER_STARTED_TIMEOUT_MILLISECONDS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.DISTRIBUTED_LISTENER_TAG;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.EVENT_TRACE_RDB_DATA_SOURCE_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.EXECUTOR_SERVICE_HANDLER_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.FAILOVER_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.JOB_EXCEPTION_HANDLER_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.JOB_PARAMETER_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.JOB_SHARDING_STRATEGY_CLASS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.LISTENER_TAG;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.MAX_TIME_DIFF_SECONDS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.MISFIRE_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.MONITOR_EXECUTION_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.MONITOR_PORT_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.OVERWRITE_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.REGISTRY_CENTER_REF_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.SHARDING_ITEM_PARAMETERS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.parser.common.BaseJobBeanDefinitionParserTag.SHARDING_TOTAL_COUNT_ATTRIBUTE;

/**
* 基本作业的命名空间解析器.
Expand All @@ -73,7 +79,7 @@ protected AbstractBeanDefinition parseInternal(final Element element, final Pars
factory.addConstructorArgValue(BeanDefinitionBuilder.rootBeanDefinition(element.getAttribute(CLASS_ATTRIBUTE)).getBeanDefinition());
}
factory.addConstructorArgReference(element.getAttribute(REGISTRY_CENTER_REF_ATTRIBUTE));
factory.addConstructorArgValue(createJobConfiguration(element));
factory.addConstructorArgValue(createLiteJobConfiguration(element));
BeanDefinition jobEventConfig = createJobEventConfig(element);
if (null != jobEventConfig) {
factory.addConstructorArgValue(jobEventConfig);
Expand All @@ -82,31 +88,45 @@ protected AbstractBeanDefinition parseInternal(final Element element, final Pars
return factory.getBeanDefinition();
}

protected abstract Class<? extends AbstractJobConfigurationDto> getJobConfigurationDTO();
protected abstract BeanDefinition getJobTypeConfigurationBeanDefinition(final BeanDefinition jobCoreConfigurationBeanDefinition, final Element element);

protected abstract void setPropertiesValue(final Element element, final BeanDefinitionBuilder factory);
private BeanDefinition createLiteJobConfiguration(final Element element) {
return createLiteJobConfigurationBeanDefinition(element, createJobCoreBeanDefinition(element));
}

private BeanDefinition createJobConfiguration(final Element element) {
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(getJobConfigurationDTO());
String jobName = element.getAttribute(ID_ATTRIBUTE);
factory.addConstructorArgValue(jobName);
factory.addConstructorArgValue(element.getAttribute(CRON_ATTRIBUTE));
factory.addConstructorArgValue(element.getAttribute(SHARDING_TOTAL_COUNT_ATTRIBUTE));
addPropertyValueIfNotEmpty(SHARDING_ITEM_PARAMETERS_ATTRIBUTE, "shardingItemParameters", element, factory);
addPropertyValueIfNotEmpty(JOB_PARAMETER_ATTRIBUTE, "jobParameter", element, factory);
addPropertyValueIfNotEmpty(MONITOR_EXECUTION_ATTRIBUTE, "monitorExecution", element, factory);
addPropertyValueIfNotEmpty(MONITOR_PORT_ATTRIBUTE, "monitorPort", element, factory);
addPropertyValueIfNotEmpty(MAX_TIME_DIFF_SECONDS_ATTRIBUTE, "maxTimeDiffSeconds", element, factory);
addPropertyValueIfNotEmpty(FAILOVER_ATTRIBUTE, "failover", element, factory);
addPropertyValueIfNotEmpty(MISFIRE_ATTRIBUTE, "misfire", element, factory);
addPropertyValueIfNotEmpty(JOB_SHARDING_STRATEGY_CLASS_ATTRIBUTE, "jobShardingStrategyClass", element, factory);
addPropertyValueIfNotEmpty(DESCRIPTION_ATTRIBUTE, "description", element, factory);
addPropertyValueIfNotEmpty(DISABLED_ATTRIBUTE, "disabled", element, factory);
addPropertyValueIfNotEmpty(OVERWRITE_ATTRIBUTE, "overwrite", element, factory);
addPropertyValueIfNotEmpty(EXECUTOR_SERVICE_HANDLER_ATTRIBUTE, "executorServiceHandler", element, factory);
addPropertyValueIfNotEmpty(JOB_EXCEPTION_HANDLER_ATTRIBUTE, "jobExceptionHandler", element, factory);
setPropertiesValue(element, factory);
return factory.getBeanDefinition();
private BeanDefinition createLiteJobConfigurationBeanDefinition(final Element element, final BeanDefinition jobCoreBeanDefinition) {
BeanDefinitionBuilder result = BeanDefinitionBuilder.rootBeanDefinition(LiteJobConfiguration.class);
result.addConstructorArgValue(getJobTypeConfigurationBeanDefinition(jobCoreBeanDefinition, element));
result.addConstructorArgValue(element.getAttribute(MONITOR_EXECUTION_ATTRIBUTE));
result.addConstructorArgValue(element.getAttribute(MAX_TIME_DIFF_SECONDS_ATTRIBUTE));
result.addConstructorArgValue(element.getAttribute(MONITOR_PORT_ATTRIBUTE));
result.addConstructorArgValue(element.getAttribute(JOB_SHARDING_STRATEGY_CLASS_ATTRIBUTE));
result.addConstructorArgValue(element.getAttribute(DISABLED_ATTRIBUTE));
result.addConstructorArgValue(element.getAttribute(OVERWRITE_ATTRIBUTE));
return result.getBeanDefinition();
}

private BeanDefinition createJobCoreBeanDefinition(final Element element) {
BeanDefinitionBuilder jobCoreBeanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(JobCoreConfiguration.class);
jobCoreBeanDefinitionBuilder.addConstructorArgValue(element.getAttribute(ID_ATTRIBUTE));
jobCoreBeanDefinitionBuilder.addConstructorArgValue(element.getAttribute(CRON_ATTRIBUTE));
jobCoreBeanDefinitionBuilder.addConstructorArgValue(element.getAttribute(SHARDING_TOTAL_COUNT_ATTRIBUTE));
jobCoreBeanDefinitionBuilder.addConstructorArgValue(element.getAttribute(SHARDING_ITEM_PARAMETERS_ATTRIBUTE));
jobCoreBeanDefinitionBuilder.addConstructorArgValue(element.getAttribute(JOB_PARAMETER_ATTRIBUTE));
jobCoreBeanDefinitionBuilder.addConstructorArgValue(element.getAttribute(FAILOVER_ATTRIBUTE));
jobCoreBeanDefinitionBuilder.addConstructorArgValue(element.getAttribute(MISFIRE_ATTRIBUTE));
jobCoreBeanDefinitionBuilder.addConstructorArgValue(element.getAttribute(DESCRIPTION_ATTRIBUTE));
jobCoreBeanDefinitionBuilder.addConstructorArgValue(createJobPropertiesBeanDefinition(element));
return jobCoreBeanDefinitionBuilder.getBeanDefinition();
}

private BeanDefinition createJobPropertiesBeanDefinition(final Element element) {
BeanDefinitionBuilder result = BeanDefinitionBuilder.rootBeanDefinition(JobProperties.class);
Map<JobPropertiesEnum, String> map = new LinkedHashMap<>(JobPropertiesEnum.values().length, 1);
map.put(JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER, element.getAttribute(EXECUTOR_SERVICE_HANDLER_ATTRIBUTE));
map.put(JobPropertiesEnum.JOB_EXCEPTION_HANDLER, element.getAttribute(JOB_EXCEPTION_HANDLER_ATTRIBUTE));
result.addConstructorArgValue(map);
return result.getBeanDefinition();
}

private BeanDefinition createJobEventConfig(final Element element) {
Expand Down Expand Up @@ -138,13 +158,6 @@ private List<BeanDefinition> createJobListeners(final Element element) {
return result;
}

private void addPropertyValueIfNotEmpty(final String attributeName, final String propertyName, final Element element, final BeanDefinitionBuilder factory) {
String attributeValue = element.getAttribute(attributeName);
if (!Strings.isNullOrEmpty(attributeValue)) {
factory.addPropertyValue(propertyName, attributeValue);
}
}

@Override
protected boolean shouldGenerateId() {
return true;
Expand Down
Loading

0 comments on commit 2b4a527

Please sign in to comment.