Liebre stands for hare in spanish.
This is a gem that handles RabbitMQ consumers, publishers and RPCs, based on bunny.
- It allows to create classes that will be invoked everytime a message it's received in its subscribed queue.
- It handles RPCs as a special Consumer where a callback message is returned to the exchange.
- You can use its Publisher to send message to an exchange.
- There is also a Publisher RPC method to send a message and wait for its response.
It is based on 2 config files:
- rabbitmq.yml: it contains RabbitMQ connection configurations and can be enviroment dependant (default path
config/rabbitmq.yml
), it must contain, at least thedefault
connection- without environment set:
default:
:host: localhost
:port: 5672
:user: guest
:pass: guest
:vhost: /
:threaded: true
:heartbeat: 2
- with environment set:
development:
default:
:host: localhost
:port: 5672
:user: guest
:pass: guest
:vhost: /
:threaded: true
:heartbeat: 2
rpc:
:host: localhost
:port: 5672
:user: guest
:pass: guest
:vhost: rpc
:threaded: true
:heartbeat: 2
test:
default:
:host: localhost
:port: 5672
:user: guest
:pass: guest
:vhost: /
:threaded: true
:heartbeat: 2
rpc:
:host: localhost
:port: 5672
:user: guest
:pass: guest
:vhost: rpc
:threaded: true
:heartbeat: 2
production:
...
- liebre.yml (default path
config/liebre.yml
)
rpc_request_timeout: 5
consumers:
some_consumer:
class_name: MyConsumer
rpc: false
pool_size: 1
num_threads: 4
exchange:
name: "consumer_exchange"
type: "fanout"
opts:
durable: false
queue:
name: "consumer_queue"
opts:
durable: false
bind:
routing_key: #key a string or an array of strings
- key_1
- key_2
some_rpc:
class_name: MyRPC
rpc: true
connection_name: rpc
pool_size: 1
exchange:
name: "rpc_exchange"
type: "fanout"
opts:
durable: false
queue:
name: "rpc_queue"
opts:
durable: false
publishers:
some_publisher:
exchange:
name: "consumer_exchange"
type: "fanout"
opts:
durable: false
rpc_publisher:
connection_name: rpc
exchange:
name: "rpc_exchange"
type: "fanout"
opts:
durable: false
An entry for each consumer in your app, consumer options:
class_name
(mandatory): The class that will be invoked everytime a message is receivedrpc
is a flag to specify the consumer behaviour (default false)connection_name
: the name of the connection to use (defaultdefault
)pool_size
of the connection channel (default 1)num_threads
: number of consumers of the queue (default 1)exchange
a hash of options:name
: the exchange nametype
: the exchange type (fanout, direct or topic)opts
: options hash to pass to bunny exchange function
queue
a hash of options:name
: the queue nameopts
: options hash to pass to bunny queue function
bind
a hash of options (optional):routing_key
: the binding routing key, it can be a single string or an array of strings
An entry for each exchange you want to publish to, options:
connection_name
: the name of the connection to use (defaultdefault
)exchange
a hash of options:name
: the exchange nametype
: the exchange type (fanout, direct or topic)opts
: options hash to pass to bunny exchange function
rpc_request_timeout
: set the timeout for an RPC request
You can change these defaults in an initializer with something like:
Liebre::Config.config_path = "your/custom/path"
Liebre::Config.connection_path = "your/custom/path"
Liebre::Config.env = "production"
Liebre::Config.logger = Logger.new(...)
There are 2 different consumer usages: Consumer
and RPC
.
You only need to create a class with this simple interface:
class MyConsumer
def initialize payload, meta
@payload = payload #the content of the message
@meta = meta #the meta information (also called properties)
end
def call
#do your stuff here
#return :ack to anknowledge the message,
#return :reject to requeue it
#or return :error to send it to the dead-letter-exchange
:ack
end
end
Every time a message is received, a new instance of this class will be created.
You only need to create a class with this simple interface:
class MyRPC
def initialize payload, meta, callback
@payload = payload #the content of the message
@meta = meta #the meta information (also called properties)
@callback = callback #a Proc that will publish an answer
end
def call
#do your stuff here
@callback.call("your response")
end
end
Every time a message is received, a new instance of this class will be created.
There are 2 ways to publish a message: enqueue
and enqueue_and_wait
.
publisher = Liebre::Publisher.new("some_publisher_name")
publisher.enqueue "hello", :routing_key => "consumer_queue"
publisher.enqueue "bye", :routing_key => "consumer_queue"
rpc_publisher = Liebre::Publisher.new("rpc_publisher")
response = publisher.enqueue_and_wait "hello", :routing_key => "consumer_queue"
another_response = publisher.rpc "bye", :routing_key => "consumer_queue"
Add the following to your Gemfile:
gem "liebre", ">~ 0.1"
In your Raketask add:
require "liebre/tasks"
Then you just need to run:
rake liebre:run
Alternative way:
require 'liebre'
Liebre::Runner.new.start