Skip to content

Commit

Permalink
fix #107
Browse files Browse the repository at this point in the history
  • Loading branch information
haocao committed Jun 7, 2016
1 parent 6c4d123 commit 7608a7d
Show file tree
Hide file tree
Showing 23 changed files with 55 additions and 39 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,6 @@ public class MyElasticJob extends AbstractSimpleElasticJob {
<reg:zookeeper id="regCenter" server-lists=" yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />

<!-- 配置作业-->
<job:simple id="myElasticJob" class="xxx.MyElasticJob" reg-center="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
<job:simple id="myElasticJob" class="xxx.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
</beans>
```
2 changes: 1 addition & 1 deletion README_en.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,6 @@ public class MyElasticJob extends AbstractThroughputDataFlowElasticJob<Foo> {
<reg:zookeeper id="regCenter" server-lists="yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />

<!--configure job -->
<job:simple id="oneOffElasticJob" class="xxx.MyElasticJob" reg-center="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
<job:simple id="oneOffElasticJob" class="xxx.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
</beans>
```
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
*
* @author zhangliang
*/
final class ConfigurationNode {
public final class ConfigurationNode {

private static final String ROOT = "config";

Expand Down Expand Up @@ -66,7 +66,7 @@ final class ConfigurationNode {

private final JobNodePath jobNodePath;

ConfigurationNode(final String jobName) {
public ConfigurationNode(final String jobName) {
jobNodePath = new JobNodePath(jobName);
}

Expand Down Expand Up @@ -106,7 +106,7 @@ public boolean isFailoverPath(final String path) {
* @param path 节点路径
* @return 是否为作业调度配置路径
*/
boolean isCronPath(final String path) {
public boolean isCronPath(final String path) {
return jobNodePath.getFullPath(CRON).equals(path);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public final class MonitorServiceEnableTest extends AbstractBaseStdJobTest {
private static final int MONITOR_PORT = 9000;

public MonitorServiceEnableTest() {
super(TestJob.class, -1);
super(TestJob.class, MONITOR_PORT);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ public List<Object> fetchData(final JobExecutionSingleShardingContext shardingCo
public int processData(final JobExecutionSingleShardingContext shardingContext, final List<Object> data) {
int result = 0;
for (Object each : data) {
if (jobCaller.processData(each)) {
result++;
try {
if (jobCaller.processData(each)) {
result++;
}
} catch (final NullPointerException ex) {
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ public List<Object> fetchData(final JobExecutionMultipleShardingContext sharding
public int processData(final JobExecutionMultipleShardingContext shardingContext, final List<Object> data) {
int result = 0;
for (Object each : data) {
if (jobCaller.processData(each)) {
result++;
try {
if (jobCaller.processData(each)) {
result++;
}
} catch (final NullPointerException ex) {
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ public List<Object> fetchData(final JobExecutionSingleShardingContext shardingCo
public int processData(final JobExecutionSingleShardingContext shardingContext, final List<Object> data) {
int result = 0;
for (Object each : data) {
if (jobCaller.processData(each)) {
result++;
try {
if (jobCaller.processData(each)) {
result++;
}
} catch (final NullPointerException ex) {
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ public List<Object> fetchData(final JobExecutionMultipleShardingContext sharding
public int processData(final JobExecutionMultipleShardingContext shardingContext, final List<Object> data) {
int result = 0;
for (Object each : data) {
if (jobCaller.processData(each)) {
result++;
try {
if (jobCaller.processData(each)) {
result++;
}
} catch (final NullPointerException ex) {
}
}
return result;
Expand Down
2 changes: 1 addition & 1 deletion elastic-job-doc/content/post/dump.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ weight=13
<reg:zookeeper id="regCenter" server-lists=" yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />

<!-- 配置作业-->
<job:simple id="oneOffElasticJob" monitor-port="9888" class="xxx.MyElasticJob" reg-center="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
<job:simple id="oneOffElasticJob" monitor-port="9888" class="xxx.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
</beans>
```

Expand Down
1 change: 1 addition & 0 deletions elastic-job-doc/content/post/release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ weight=1
1. [ISSUE #104](https://github.com/dangdangdotcom/elastic-job/issues/104) 移除@Deprecated代码
1. [ISSUE #105](https://github.com/dangdangdotcom/elastic-job/issues/105) 重构Spring命名空间驼峰式定义
1. [ISSUE #106](https://github.com/dangdangdotcom/elastic-job/issues/106) isStreaming配置化
1. [ISSUE #107](https://github.com/dangdangdotcom/elastic-job/issues/107) reg-center更名为registry-center-ref

## 1.0.8

Expand Down
12 changes: 6 additions & 6 deletions elastic-job-doc/content/post/user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,16 +215,16 @@ public class JobMain {
<reg:zookeeper id="regCenter" server-lists=" yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />

<!-- 配置简单作业-->
<job:simple id="simpleElasticJob" class="xxx.MySimpleElasticJob" reg-center="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
<job:simple id="simpleElasticJob" class="xxx.MySimpleElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />

<!-- 配置数据流作业-->
<job:dataflow id="throughputDataFlow" class="xxx.MyThroughputDataFlowElasticJob" reg-center="reg-center" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" process-count-interval-seconds="10" concurrent-data-process-thread-count="10" />
<job:dataflow id="throughputDataFlow" class="xxx.MyThroughputDataFlowElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" process-count-interval-seconds="10" concurrent-data-process-thread-count="10" />

<!-- 配置脚本作业-->
<job:script id="scriptElasticJob" reg-center="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" script-command-line="/your/file/path/demo.sh" />
<job:script id="scriptElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" script-command-line="/your/file/path/demo.sh" />

<!-- 配置带监听的简单作业-->
<job:simple id="listenerElasticJob" class="xxx.MySimpleListenerElasticJob" reg-center="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C">
<job:simple id="listenerElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C">
<job:listener class="xx.MySimpleJobListener"/>
<job:listener class="xx.MyOnceSimpleJobListener" started-timeout-milliseconds="1000" completed-timeout-milliseconds="2000" />
</job:simple>
Expand All @@ -237,7 +237,7 @@ public class JobMain {
| -----------------------------------|:------|:-------|:----|:---------------------------------------------------------------------------|
|id |String |`` | | 作业名称 |
|class |String || | 作业实现类,需实现`ElasticJob`接口,脚本型作业不需要配置 |
|reg-center |String |`` | | 注册中心`Bean`的引用,需引用`reg:zookeeper`的声明 |
|registry-center-ref |String |`` | | 注册中心`Bean`的引用,需引用`reg:zookeeper`的声明 |
|cron |String |`` | | `cron`表达式,用于配置作业触发时间 |
|sharding-total-count |int |`` | | 作业分片总数 |
|sharding-item-parameters |String || | 分片序列号和参数用等号分隔,多个键值对用逗号分隔<br />分片序列号从`0`开始,不可大于或等于作业分片总数<br />如:<br/>`0=a,1=b,2=c`|
Expand Down Expand Up @@ -286,7 +286,7 @@ job:script命名空间拥有job:simple命名空间的全部属性,以下仅列
| 属性名 |类型 |是否必填|缺省值|描述 |
| ------------------------------ |:------|:------|:----|:--------------------------------------------------------------------------------------------------|
|id |String |`` | | 注册中心在`Spring`容器中的主键 |
|serverLists |String |`` | | 连接`Zookeeper`服务器的列表<br />包括IP地址和端口号<br />多个地址用逗号分隔<br />如: host1:2181,host2:2181|
|server-lists |String |`` | | 连接`Zookeeper`服务器的列表<br />包括IP地址和端口号<br />多个地址用逗号分隔<br />如: host1:2181,host2:2181|
|namespace |String |`` | | `Zookeeper`的命名空间 |
|base-sleep-time-milliseconds |int ||1000 | 等待重试的间隔时间的初始值<br />单位:毫秒 |
|max-sleep-time-milliseconds |int ||3000 | 等待重试的间隔时间的最大值<br />单位:毫秒 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

<reg:zookeeper id="regCenter" server-lists="${serverLists}" namespace="${namespace}" base-sleep-time-milliseconds="${baseSleepTimeMilliseconds}" max-sleep-time-milliseconds="${maxSleepTimeMilliseconds}" max-retries="${maxRetries}" nested-port="${nestedPort}" nested-data-dir="${nestedDataDir}" />

<job:simple id="simpleElasticJob" class="com.dangdang.example.elasticjob.spring.job.SimpleJobDemo" reg-center="regCenter" sharding-total-count="${simpleJob.shardingTotalCount}" cron="${simpleJob.cron}" sharding-item-parameters="${simpleJob.shardingItemParameters}" monitor-execution="${simpleJob.monitorExecution}" monitor-port="${simpleJob.monitorPort}" failover="${simpleJob.failover}" description="${simpleJob.description}" disabled="${simpleJob.disabled}" overwrite="${simpleJob.overwrite}" />
<job:dataflow id="throughputDataFlowJob" class="com.dangdang.example.elasticjob.spring.job.ThroughputDataFlowJobDemo" reg-center="regCenter" sharding-total-count="${throughputDataFlowJob.shardingTotalCount}" cron="${throughputDataFlowJob.cron}" sharding-item-parameters="${throughputDataFlowJob.shardingItemParameters}" monitor-execution="${throughputDataFlowJob.monitorExecution}" failover="${throughputDataFlowJob.failover}" process-count-interval-seconds="${throughputDataFlowJob.processCountIntervalSeconds}" concurrent-data-process-thread-count="${throughputDataFlowJob.concurrentDataProcessThreadCount}" description="${throughputDataFlowJob.description}" disabled="${throughputDataFlowJob.disabled}" overwrite="${throughputDataFlowJob.overwrite}" streaming-process="${throughputDataFlowJob.streamingProcess}" />
<job:dataflow id="sequenceDataFlowJob" class="com.dangdang.example.elasticjob.spring.job.SequenceDataFlowJobDemo" reg-center="regCenter" sharding-total-count="${sequenceDataFlowJob.shardingTotalCount}" cron="${sequenceDataFlowJob.cron}" sharding-item-parameters="${sequenceDataFlowJob.shardingItemParameters}" monitor-execution="${sequenceDataFlowJob.monitorExecution}" failover="${sequenceDataFlowJob.failover}" process-count-interval-seconds="${sequenceDataFlowJob.processCountIntervalSeconds}" max-time-diff-seconds="${sequenceDataFlowJob.maxTimeDiffSeconds}" description="${sequenceDataFlowJob.description}" disabled="${sequenceDataFlowJob.disabled}" overwrite="${sequenceDataFlowJob.overwrite}" />
<job:simple id="simpleElasticJob" class="com.dangdang.example.elasticjob.spring.job.SimpleJobDemo" registry-center-ref="regCenter" sharding-total-count="${simpleJob.shardingTotalCount}" cron="${simpleJob.cron}" sharding-item-parameters="${simpleJob.shardingItemParameters}" monitor-execution="${simpleJob.monitorExecution}" monitor-port="${simpleJob.monitorPort}" failover="${simpleJob.failover}" description="${simpleJob.description}" disabled="${simpleJob.disabled}" overwrite="${simpleJob.overwrite}" />
<job:dataflow id="throughputDataFlowJob" class="com.dangdang.example.elasticjob.spring.job.ThroughputDataFlowJobDemo" registry-center-ref="regCenter" sharding-total-count="${throughputDataFlowJob.shardingTotalCount}" cron="${throughputDataFlowJob.cron}" sharding-item-parameters="${throughputDataFlowJob.shardingItemParameters}" monitor-execution="${throughputDataFlowJob.monitorExecution}" failover="${throughputDataFlowJob.failover}" process-count-interval-seconds="${throughputDataFlowJob.processCountIntervalSeconds}" concurrent-data-process-thread-count="${throughputDataFlowJob.concurrentDataProcessThreadCount}" description="${throughputDataFlowJob.description}" disabled="${throughputDataFlowJob.disabled}" overwrite="${throughputDataFlowJob.overwrite}" streaming-process="${throughputDataFlowJob.streamingProcess}" />
<job:dataflow id="sequenceDataFlowJob" class="com.dangdang.example.elasticjob.spring.job.SequenceDataFlowJobDemo" registry-center-ref="regCenter" sharding-total-count="${sequenceDataFlowJob.shardingTotalCount}" cron="${sequenceDataFlowJob.cron}" sharding-item-parameters="${sequenceDataFlowJob.shardingItemParameters}" monitor-execution="${sequenceDataFlowJob.monitorExecution}" failover="${sequenceDataFlowJob.failover}" process-count-interval-seconds="${sequenceDataFlowJob.processCountIntervalSeconds}" max-time-diff-seconds="${sequenceDataFlowJob.maxTimeDiffSeconds}" description="${sequenceDataFlowJob.description}" disabled="${sequenceDataFlowJob.disabled}" overwrite="${sequenceDataFlowJob.overwrite}" />
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

<reg:zookeeper id="regCenter" server-lists="${serverLists}" namespace="${namespace}" base-sleep-time-milliseconds="${baseSleepTimeMilliseconds}" max-sleep-time-milliseconds="${maxSleepTimeMilliseconds}" max-retries="${maxRetries}" nested-port="${nestedPort}" nested-data-dir="${nestedDataDir}" />

<job:simple id="simpleElasticJob" class="com.dangdang.example.elasticjob.spring.job.SimpleJobDemo" reg-center="regCenter" sharding-total-count="${simpleJob.shardingTotalCount}" cron="${simpleJob.cron}" sharding-item-parameters="${simpleJob.shardingItemParameters}" monitor-execution="${simpleJob.monitorExecution}" monitor-port="${simpleJob.monitorPort}" failover="${simpleJob.failover}" description="${simpleJob.description}" disabled="${simpleJob.disabled}" overwrite="${simpleJob.overwrite}">
<job:simple id="simpleElasticJob" class="com.dangdang.example.elasticjob.spring.job.SimpleJobDemo" registry-center-ref="regCenter" sharding-total-count="${simpleJob.shardingTotalCount}" cron="${simpleJob.cron}" sharding-item-parameters="${simpleJob.shardingItemParameters}" monitor-execution="${simpleJob.monitorExecution}" monitor-port="${simpleJob.monitorPort}" failover="${simpleJob.failover}" description="${simpleJob.description}" disabled="${simpleJob.disabled}" overwrite="${simpleJob.overwrite}">
<job:listener class="com.dangdang.example.elasticjob.spring.job.listener.SimpleListener" />
<job:listener class="com.dangdang.example.elasticjob.spring.job.listener.SimpleOnceListener" started-timeout-milliseconds="1000" completed-timeout-milliseconds="2000" />
</job:simple>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class BaseJobBeanDefinitionParserTag {

public static final String REGISTRY_CENTER_REF_ATTRIBUTE = "registry-center-ref";

public static final String CLASS_ATTRIBUTE = "class";

public static final String CRON_ATTRIBUTE = "cron";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static com.dangdang.ddframe.job.spring.namespace.constants.BaseJobBeanDefinitionParserTag.MONITOR_EXECUTION_ATTRIBUTE;
import static com.dangdang.ddframe.job.spring.namespace.constants.BaseJobBeanDefinitionParserTag.MONITOR_PORT_ATTRIBUTE;
import static com.dangdang.ddframe.job.spring.namespace.constants.BaseJobBeanDefinitionParserTag.OVERWRITE_ATTRIBUTE;
import static com.dangdang.ddframe.job.spring.namespace.constants.BaseJobBeanDefinitionParserTag.REGISTRY_CENTER_REF_ATTRIBUTE;
import static com.dangdang.ddframe.job.spring.namespace.constants.BaseJobBeanDefinitionParserTag.SHARDING_ITEM_PARAMETERS_ATTRIBUTE;
import static com.dangdang.ddframe.job.spring.namespace.constants.BaseJobBeanDefinitionParserTag.SHARDING_TOTAL_COUNT_ATTRIBUTE;
import static com.dangdang.ddframe.job.spring.namespace.constants.BaseJobBeanDefinitionParserTag.STARTED_TIMEOUT_MILLISECONDS_ATTRIBUTE;
Expand All @@ -64,7 +65,7 @@ public abstract class AbstractJobBeanDefinitionParser extends AbstractBeanDefini
protected AbstractBeanDefinition parseInternal(final Element element, final ParserContext parserContext) {
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
factory.setInitMethodName("init");
factory.addConstructorArgReference(element.getAttribute("reg-center"));
factory.addConstructorArgReference(element.getAttribute(REGISTRY_CENTER_REF_ATTRIBUTE));
factory.addConstructorArgReference(createJobConfiguration(element, parserContext));
factory.addConstructorArgValue(createJobListeners(element));
return factory.getBeanDefinition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected AbstractBeanDefinition parseInternal(final Element element, final Pars
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(PropertySourcesPlaceholderConfigurer.class);
factory.addPropertyValue("ignoreUnresolvablePlaceholders", true);
BeanDefinitionBuilder definitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(RegistryPropertySources.class);
definitionBuilder.addConstructorArgReference(element.getAttribute("registerRef"));
definitionBuilder.addConstructorArgReference(element.getAttribute("registry-center-ref"));
factory.addPropertyValue("propertySources", definitionBuilder.getBeanDefinition());
return factory.getBeanDefinition();
}
Expand Down
Loading

0 comments on commit 7608a7d

Please sign in to comment.