-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Scheduling customized interface (0.9.4.1 and below)
From JStorm 0.9.0,JStorm provides a very powerful scheduling features, meet most needs Basically. Before learning how to use the new scheduling,Please to learn what features provided by the introduction of JStorm 0.9.0
The resource of JStorm is no longer the single port of worker,But it is showed by four dimensions CPU/Memory/Disk/Net
One task use one cpu slot the default,you can apply more slots of cpu when the task consume more cpu . the following code sample:
int boltParal = get("bolt.parallel", 1);
BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
Map totalBoltConf = new HashMap();
ConfigExtension.setCpuSlotsPerTask(totalBoltConf, 3);
totalBolt.addConfigurations(totalBoltConf);
One task use one memory slot the default,you can apply more slots of memory when the task consume more memory .the following code sample:
int boltParal = get("bolt.parallel", 1);
BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
Map totalBoltConf = new HashMap();
ConfigExtension.setMemSlotPerTask(totalBoltConf, 3);
totalBolt.addConfigurations(totalBoltConf);
Default task does not apply slot of disk,you can apply disk when the disk IO of the task is heavy,you can apply more slots when the task consume more disk . the following code sample:
int boltParal = get("bolt.parallel", 1);
BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
Map totalBoltConf = new HashMap();
ConfigExtension.setTaskAllocDisk(totalBoltConf, true); // 申请磁盘slot
totalBolt.addConfigurations(totalBoltConf);
Task can get the category of disk in the function of prepare when resource allocation success public class TotalCount implements IRichBolt { ....
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
//...
String diskSlot = ConfigExtension.getTaskAssignDiskSlot(stormConf);
//...
}
You can force the tasks of one component to work on different node int boltParal = get("bolt.parallel", 1); BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(), boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
Map totalBoltConf = new HashMap();
ConfigExtension.setTaskOnDifferentNode(totalBoltConf, true); // 申请task 运行在不同的节点上
totalBolt.addConfigurations(totalBoltConf);
You can force all the task of the topology to work on the same node to save the network io for some Small application Config.setNumAckers(conf, ackerParal); ConfigExtension.setUseSingleNode(conf, true); StormSubmitter.submitTopology(streamName, conf, builder.createTopology());
In some cases,You can customize a component task assigned to a specific port specific machine,nimbus will use the default allocation algorithm when the specified port is occupied or machine resources is insufficient.
Nimbus Allocation algorithm is as follows:
- The custom task allocation is in the high priority,The task will send to the next level of Allocation algorithm when the resources can not meet the demand.
- Use historical task allocation algorithm,Use the algorithm if the switch is opened using the historical task properties,The task will send to the next level of Allocation algorithm when the resources can not meet the demand.
- Use the default resource balancing algorithm,Calculate Remaining resources on the weights for each supervisor,the which supervisor takes the highest value of the weight will be assigned.
The sample code below:
int boltParal = get("bolt.parallel", 1);
BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
Map totalBoltConf = new HashMap();
List<ResourceAssignment> userDefineAssignments = new ArrayList<ResourceAssignment>();
for (int i = 0, base = 150; i < boltParal; i++, base++) {
ResourceAssignment assign = new ResourceAssignment();
assign.setCpuSlotNum(2);
assign.setMemSlotNum(2);
assign.setPort(6800 + i);
assign.setHostname("free-56-151.shucang.alipay.net"); //
userDefineAssignments.add(assign);
}
ConfigExtension.setUserDefineAssignment(totalBoltConf, userDefineAssignments); // Application using a custom resource
totalBolt.addConfigurations(totalBoltConf);
Assignments can be booked when the last successful run,What resources are allocated last task, this is the use of these resources.If the last resources are accounted for or not met, use the default allocation algorithm
int boltParal = get("bolt.parallel", 1);
BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
Map totalBoltConf = new HashMap();
ConfigExtension.setUseOldAssignment(totalBoltConf, true);
totalBolt.addConfigurations(totalBoltConf);
Config.setNumAckers(conf, ackerParal);
ConfigExtension.setUseOldAssignment(totalBoltConf, true);
StormSubmitter.submitTopology(streamName, conf,
builder.createTopology());