An AMQP adapter for Conduit.
This package can be installed as:
-
Add
conduit_amqp
to your list of dependencies inmix.exs
:def deps do [{:conduit_amqp, "~> 0.6.3"}] end
-
Ensure
conduit_amqp
is started before your application:def application do [applications: [:conduit_amqp]] end
# config/config.exs
config :my_app, MyApp.Broker,
adapter: ConduitAMQP,
url: "amqp://my_app:[email protected]"
# Stop lager redirecting :error_logger messages
config :lager, :error_logger_redirect, false
# Stop lager removing Logger's :error_logger handler
config :lager, :error_logger_whitelist, [Logger.ErrorHandler]
For the full set of options, see ConduitAQMP.
You can define exchanges with the exchange
macro in the
configure
block of your Broker. The exchange
macro accepts
the name of the exchange and options for the exchange.
:type
- Either:topic
,:fanout
,:direct
, or:headers
. Defaults to:topic
.:durable
- If set, keeps the Exchange between restarts of the broker. Defaults tofalse
.:auto_delete
- If set, deletes the Exchange once all queues unbind from it. Defaults tofalse
.:passive
- If set, returns an error if the Exchange does not already exist. Defaults tofalse
.:internal
- If set, the exchange may not be used directly by publishers. Defaults tofalse
.
See exchange.declare for more details.
defmodule MyApp.Broker do
use Conduit.Broker, otp_app: :my_app
configure do
exchange "my.topic", type: :topic, durable: true
end
end
You can define queues with the queue
macro in the
configure
block of your Broker. The queue
macro accepts
the name of the queue and options for the exchange.
:durable
- If set, keeps the Queue between restarts of the broker. Defaults tofalse
.:auto_delete
- If set, deletes the Queue once all subscribers disconnect. Defaults tofalse
.:exclusive
- If set, only one subscriber can consume from the Queue. Defaults tofalse
.:passive
- If set, raises an error unless the queue already exists. Defaults tofalse
.:from
- A list of routing keys to bind the queue to.:exchange
- Name of the exchange used to bind the queue to the routing keys.
See queue.declare for more details.
defmodule MyApp.Broker do
use Conduit.Broker, otp_app: :my_app
configure do
queue "my.queue", from: ["#.created.user"], exchange: "amq.topic", durable: true
end
end
Inside an incoming
block for a broker, you can define subscriptions to queues. Conduit will route messages on those
queues to your subscribers.
defmodule MyApp.Broker do
incoming MyApp do
subscribe :my_subscriber, MySubscriber, from: "my.queue"
subscribe :my_other_subscriber, MyOtherSubscriber,
from: "my.other.queue",
prefetch_size: 20
end
end
:from
- Accepts a string or function that resolves to the queue to consume from. Defaults to the name of the route if not specified.:prefetch_size
- Size of prefetch buffer in octets. Defaults to0
, which means no specific limit. This can also be configured globally by passing this same option when configuring your Broker.:prefetch_count
- Number of messages to prefetch. Defaults to0
, which means no specific limit. This can also be configured globally by passing this same option when configuring your Broker.:consumer_tag
- Specifies the identifier for the consumer. The consumer tag is local to a channel, so two clients can use the same consumer tags. If this field is empty the server will generate a unique tag.:no_local
- If the no-local field is set the server will not send messages to the connection that published them. Defaults tofalse
.:no_ack
- If this field is set the server does not expect acknowledgements for messages. That is, when a message is delivered to the client the server assumes the delivery will succeed and immediately dequeues it. Defaults tofalse
.:exclusive
- Request exclusive consumer access, meaning only this consumer can access the queue. Defaults tofalse
.:nowait
- If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception. Defaults tofalse
.:arguments
- A set of arguments for the consume. Defaults to[]
.
Note: It's highly recommended to set :prefetch_size
or :prefetch_count
to a non-zero value to limit the memory consumed when a queue is backed up.
See basic.qos and basic.consume for more details on options.
Inside an outgoing
block for a broker, you can define publications to exchanges. Conduit will deliver messages using the
options specified. You can override these options, by passing different options to your broker's publish/3
.
defmodule MyApp.Broker do
outgoing do
publish :destination_route,
to: "my.routing_key",
exchange: "amq.topic"
publish :other_destination_route,
to: "my.other.routing_key",
exchange: "amq.topic"
end
end
:to
- The routing key for the message. If the message already has its destination set, this option will be ignored.:exchange
- The exchange to publish to. This option is required.
:publisher_confirms
- This configures publisher confirms. Should be one of:no_confirmation
(the default),:wait
(if it should just return an:timeout
atom on failure) or:die
(raises an exception on timeout).:publisher_confirms_timeout
- The timeout in milliseconds for the server acknowledgement. Should be:infinity
for no timeout (the default) or an integer number of milliseconds.
See basic.publish for more details.
%Message{}
|> put_body(%{"my" => "message"})
|> Broker.publish(:destination_route)
When ConduitAMQP is used as an adapter for Conduit, it starts ConduitAMQP as a child supervisor. ConduitAMQP starts:
- ConduitAQMP.ConnPool - Creates and supervises a pool of AMQP connections.
- ConduitAMQP.PubSub - Creates and supervises ConduitAMQP.PubPool and ConduitAMQP.SubPool.
- ConduitAMQP.Subscribers - A supervisor for subscribers that process messages.