Skip to content

Ruby DSL Documentation

Colin Surprenant edited this page Jul 2, 2013 · 11 revisions

DSL usage

Your project can be created in a single file containing all spouts, bolts and topology classes or each classes can be in its own file, your choice. There are many examples for the DSL.

The DSL uses a callback metaphor to attach code to the topology/spout/bolt execution contexts using on_* DSL constructs (ex.: on_submit, on_send, ...). When using on_* you can attach you code in 3 different ways:

  • using a code block
on_receive (:ack => true, :anchor => true) {|tuple| do_something_with(tuple)}

on_receive :ack => true, :anchor => true do |tuple|
  do_something_with(tuple)
end
  • defining the corresponding method
on_receive :ack => true, :anchor => true
def on_receive(tuple)
  do_something_with(tuple)
end
  • defining an arbitrary method
on_receive :my_method, :ack => true, :anchor => true
def my_method(tuple)
  do_something_with(tuple)
end

The example SplitSentenceBolt shows the 3 different coding style.

Topology DSL

Normally Storm topology components are assigned and referenced using numeric ids. In the Topology DSL ids are optional. By default the DSL will use the component class name as an implicit symbolic id and bolt source ids can use these implicit ids. The DSL will automatically resolve and assign numeric ids upon topology submission. If two components are of the same class, creating a conflict, then the id can be explicitly defined using either a numeric value, a symbol or a string. Numeric values will be used as-is at topology submission while symbols and strings will be resolved and assigned a numeric id.

require 'red_storm'

class MyTopology < RedStorm::DSL::Topology

  spout SpoutClass, options do
   output_fields :field1, ...
   set "attribute_name", value
    config_method value
    ...
  end

  bolt BoltClass, options do
    source source_id, grouping
    set "attribute_name", value
    config_method value
    ...
  end

  configure :topology_name do |env|
    set "attribute_name", value
    config_method value
    ...
  end

  on_submit do |env|
    ...
  end
end

spout statement

spout SpoutClass, [optional constructor args], options do
  output_fields :field1, ...
  set "attribute_name", value
  config_method value
  ...
end
  • SpoutClass — spout Ruby class constant
  • [optional constructor args] - if the SpoutClass is a Java class, optional constructor args can be passed in this args array. optional
  • options
    • :id — spout explicit id (default is spout class name)
    • :parallelism — spout parallelism (default is 1)
  • the spout definition body is essentially for configuration options. See Storm Configuration documentation. -output_fields - define the output fields for this spout. alternatively, output_fields can we defined in the spout. optional
    • set — set the valid spout config attribute_name to the given value. The spout code block and config statements are optional.
      • topology.debug
      • topology.max.spout.pending
      • topology.max.task.parallelism
      • topology.kryo.register
    • config_method — the valid spout config method setter name. Some attributes have a dedicated setter method. Here we use the setter name in uderscore format and without the "set" prefix.
      • debug
      • max_spout_pending
      • max_task_parallelism

bolt statement

bolt BoltClass, [optional constructor args], options do
  output_fields :field1, ...
  source source_id, grouping
  set "attribute_name", value
  config_method value
  ...
end
  • BoltClass — bolt Ruby class constant
  • [optional constructor args] - if the BoltClass is a Java class, optional constructor args can be passed in this args array. optional
  • options
    • :id — bolt explicit id (default is bolt class name)
    • :parallelism — bolt parallelism (default is 1)
  • output_fields - define the output fields for this bolt. alternatively, output_fields can we defined in the bolt. optional
  • source - define a new source
    • source_id — source id reference. can be the source class name if unique or the explicit id if defined
    • grouping
      • :fields => ["field", ...] — fieldsGrouping using fields on the source_id
      • :shuffle — shuffleGrouping on the source_id
      • :global — globalGrouping on the source_id
      • :none — noneGrouping on the source_id
      • :all — allGrouping on the source_id
      • :direct — directGrouping on the source_id
  • set and config_method are for configuration options. See Storm Configuration documentation.
    • set — set the valid bolt config attribute_name to the given value. optional.
      • topology.debug
      • topology.max.task.parallelism
      • topology.kryo.register
    • config_method — the valid bolt config method setter name. Some attributes have a dedicated setter method. Here we use the setter name in uderscore format and without the "set" prefix. optional.
      • debug
      • max_task_parallelism

configure statement

configure :topology_name do |env|
  set "attribute_name", value
  config_method value
  ...
end

The configure statement is required.

  • :topology_name — alternate topology name (default is topology class name)
  • env — is set to :local or :cluster for you to set enviroment specific configurations
  • set and config_method are for Storm configuration options. See Storm Configuration documentation.
    • set — set the valid topology config attribute_name to the given value. optional.
      • topology.debug
      • topology.max.task.parallelism
      • topology.workers
      • ...
    • config_method — the valid topology config method setter name. Some attributes have a dedicated setter method. Here we use the setter name in uderscore format and without the "set" prefix. optional.
      • debug
      • max_task_parallelism
      • num_workers
      • ...

on_submit statement

on_submit do |env|
  ...
end

The on_submit statement is optional. Use it to execute code after the topology submission.

  • env — is set to :local or :cluster

For example, you can use on_submit to shutdown the LocalCluster after some time. The LocalCluster instance is available usign the cluster method.

on_submit do |env|
  if env == :local
    sleep(5)
    cluster.shutdown
  end
end

Examples

Spout DSL

require 'red_storm'

class MySpout < RedStorm::DSL::Spout
  output_fields :field, ...

  configure do
    set "attribute_name", value
    config_method value
    ...
  end

  on_send options do
    ...
  end

  on_init do
    ...
  end

  on_close do
    ...
  end

  on_ack do |msg_id|
    ...
  end

  on_fail do |msg_id|
    ...
  end

  on_activate do
    ...
  end

  on_deactivate do
    ...
  end
end

output_fields statement

output_fields :field, ...

Define the output fields for this spout. Note that output_fields can alternatively be defined in the topology definition.

  • :field — the field name, can be symbol or string.

configure statement

set "attribute_name", value
config_method value

Set configuration attribute specific for this spout. See Storm Configuration documentation.

  • set — set the valid spout config attribute_name to the given value.
    • topology.debug
    • topology.max.spout.pending
    • topology.max.task.parallelism
    • topology.kryo.register
  • config_method — the valid spout config method setter name. Some attributes have a dedicated setter method. Here we use the setter name in uderscore format and without the "set" prefix.
    • debug
    • max_spout_pending
    • max_task_parallelism

on_send statement

on_send options do
  ...
end

on_send relates to the Java spout nextTuple method and is called periodically by storm to allow the spout to output a tuple. When using auto-emit (default) and unreliable mode (default), the block return value will be auto-emited, with nil as message_id and tuple-tracking will be disabled and on_ack and on_fail will never be called. A single value return will be emited as a single-field tuple. An array of values [a, b] will be emited as a multiple-fields tuple.

If you enable reliable mode and auto-emit, the block return value must be an array and you must supply a message_id as the first element of the array.

When not using auto-emit the reliable_emit(message_id, value, ...) or unreliable_emit(value, ...) methods can be used to emit a single tuple.

Normally a spout should only output a single tuple per on_send invocation.

  • :options
    • :emit — set to false to disable auto-emit (default is true)
    • :reliable — set to true to enable reliable mode when using auto-emit (default is false)

on_init statement

on_init do
  ...
end

on_init relates to the Java spout open method. When on_init is called, the config, context and collector are set to return the Java spout config Map, TopologyContext and SpoutOutputCollector.

on_close statement

on_close do
  ...
end

on_close relates to the Java spout close method.

on_ack statement

on_ack do |msg_id|
  ...
end

on_ack relates to the Java spout ack method.

on_fail statement

on_fail do |msg_id|
  ...
end

on_fail relates to the Java spout fail method.

on_activate statement

on_activate do
  ...
end

on_activate relates to the Java spout activate method.

on_deactivate statement

on_deactivate do
  ...
end

on_deactivate relates to the Java spout deactivate method.

Examples

Bolt DSL

require 'red_storm'

class MyBolt < RedStorm::DSL::Bolt
  output_fields :field, ...

  configure do
    set "attribute_name", value
    config_method value
    ...
  end

  on_receive options do
    ...
  end

  on_init do
    ...
  end

  on_close do
    ...
  end
end

output_fields statement

output_fields :field, ...

Define the output fields for this bolt. Note that output_fields can alternatively be defined in the topology definition.

  • :field — the field name, can be symbol or string.

configure statement

set "attribute_name", value
config_method value

Set configuration attribute specific for this bolt. See Storm Configuration documentation.

  • set — set the valid bolt config attribute_name to the given value.
    • topology.debug
    • topology.max.task.parallelism
    • topology.kryo.register
  • config_method — the valid bolt config method setter name. Some attributes have a dedicated setter method. Here we use the setter name in uderscore format and without the "set" prefix.
    • debug
    • max_task_parallelism

on_receive statement

on_receive options do
  ...
end

on_receive relates to the Java bolt execute method and is called upon tuple reception by Storm. When using auto-emit, the block return value will be auto emited. A single value return will be emited as a single-field tuple. An array of values [a, b] will be emited as a multiple-fields tuple. An array of arrays [ [a, b], [c, d] ] will be emited as multiple-fields multiple tuples.

When not using auto-emit, the unanchored_emit(value, ...) and anchored_emit(tuple, value, ...) methods can be used to emit a single tuple.

When using auto-anchor (disabled by default) the sent tuples will be anchored to the received tuple. When using auto-ack (disabled by default) the received tuple will be ack'ed after emitting the return value. When not using auto-ack, the ack(tuple) and fail(tuple) methods can be used to ack or fail the tuple.

Note that setting auto-ack and auto-anchor is valid only when auto-emit is enabled.

  • :options
    • :emit — set to false to disable auto-emit (default is true)
    • :ack — set to true to enable auto-ack (default is false)
    • :anchor — set to true to enable auto-anchor (default is false)

on_init statement

on_init do
  ...
end

on_init relates to the Java bolt prepare method. When on_init is called, the config, context and collector are set to return the Java spout config Map, TopologyContext and SpoutOutputCollector.

on_close statement

on_close do
  ...
end

on_close relates to the Java bolt cleanup method.

Examples