Skip to content

Commit

Permalink
Merge pull request #172 from haljin/add-direct-consumer-support
Browse files Browse the repository at this point in the history
Add direct consumer support
  • Loading branch information
ono authored Oct 5, 2020
2 parents 4f8fa15 + 2c8b137 commit e63d923
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 12 deletions.
15 changes: 12 additions & 3 deletions lib/amqp/basic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,20 @@ defmodule AMQP.Basic do
arguments: Keyword.get(options, :arguments, [])
)

consumer_pid = consumer_pid || self()
pid =
case chan.custom_consumer do
nil ->
%{pid: pid} =
ReceiverManager.register_handler(chan.pid, consumer_pid || self(), :consume)

receiver = ReceiverManager.register_handler(chan.pid, consumer_pid, :consume)
pid

case :amqp_channel.subscribe(chan.pid, basic_consume, receiver.pid) do
_ ->
# when channel has a custom consumer, leave it to handle the given pid with `#handle_consume` callback.
consumer_pid
end

case :amqp_channel.subscribe(chan.pid, basic_consume, pid) do
basic_consume_ok(consumer_tag: consumer_tag) -> {:ok, consumer_tag}
error -> {:error, error}
end
Expand Down
36 changes: 28 additions & 8 deletions lib/amqp/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@ defmodule AMQP.Channel do

alias AMQP.{Connection, Channel}

defstruct [:conn, :pid]
@type t :: %Channel{conn: Connection.t(), pid: pid}
defstruct [:conn, :pid, :custom_consumer]
@type t :: %Channel{conn: Connection.t(), pid: pid, custom_consumer: custom_consumer() | nil}
@type custom_consumer :: {module(), args :: any()}

@doc """
Opens a new Channel in a previously opened Connection.
Allows optionally to pass a `t:custom_consumer/0` to start a custom consumer implementation. The
consumer must implement the `:amqp_gen_consumer` behavior from `:amqp_client`. See
`:amqp_connection.open_channel/2` for more details and `AMQP.DirectConsumer` for an example of a
custom consumer.
"""
@spec open(Connection.t()) :: {:ok, Channel.t()} | {:error, any}
def open(%Connection{pid: pid} = conn) do
case :amqp_connection.open_channel(pid) do
{:ok, chan_pid} -> {:ok, %Channel{conn: conn, pid: chan_pid}}
error -> error
end
@spec open(Connection.t(), custom_consumer | nil) :: {:ok, Channel.t()} | {:error, any}
def open(%Connection{} = conn, custom_consumer \\ nil) do
do_open_channel(conn, custom_consumer)
end

@doc """
Expand All @@ -29,4 +32,21 @@ defmodule AMQP.Channel do
error -> {:error, error}
end
end

defp do_open_channel(conn, nil) do
case :amqp_connection.open_channel(conn.pid) do
{:ok, chan_pid} -> {:ok, %Channel{conn: conn, pid: chan_pid}}
error -> error
end
end

defp do_open_channel(conn, custom_consumer) do
case :amqp_connection.open_channel(conn.pid, custom_consumer) do
{:ok, chan_pid} ->
{:ok, %Channel{conn: conn, pid: chan_pid, custom_consumer: custom_consumer}}

error ->
error
end
end
end
149 changes: 149 additions & 0 deletions lib/amqp/direct_consumer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
defmodule AMQP.DirectConsumer do
@moduledoc """
`AMQP.DirectConsumer` is an example custom consumer. It's argument is a pid of a process which the channel is
meant to forward the messages to.
When using the `DirectConsumer` the channel will forward the messages directly to the specified pid, as well as monitor
that pid, so that when it exits the channel will exit as well (closing the channel).
## Usage
iex> AMQP.Channel.open(conn, {AMQP.DirectConsumer, self()})
This will forward all the messages from the channel to the calling process.
This is an Elixir reimplementation of `:amqp_direct_consumer`. ( https://github.com/rabbitmq/rabbitmq-erlang-client/blob/master/src/amqp_direct_consumer.erl)
For more information see: https://www.rabbitmq.com/erlang-client-user-guide.html#consumers-imlementation
"""
import AMQP.Core
@behaviour :amqp_gen_consumer

#########################################################
### amqp_gen_consumer callbacks
#########################################################

@impl true
def init(consumer) do
_ref = Process.monitor(consumer)
{:ok, consumer}
end

@impl true
def handle_consume(basic_consume(), _pid, consumer) do
# silently discard
{:ok, consumer}
end

@impl true
def handle_consume_ok(basic_consume_ok(consumer_tag: consumer_tag), _args, consumer) do
_ = send(consumer, {:basic_consume_ok, %{consumer_tag: consumer_tag}})
{:ok, consumer}
end

@impl true
def handle_cancel(basic_cancel(consumer_tag: consumer_tag, nowait: no_wait), consumer) do
_ = send(consumer, {:basic_cancel, %{consumer_tag: consumer_tag, no_wait: no_wait}})
{:ok, consumer}
end

@impl true
def handle_cancel_ok(basic_cancel_ok(consumer_tag: consumer_tag), _args, consumer) do
_ = send(consumer, {:basic_cancel_ok, %{consumer_tag: consumer_tag}})
{:ok, consumer}
end

@impl true
def handle_server_cancel(basic_cancel(consumer_tag: consumer_tag, nowait: no_wait), consumer) do
_ = send(consumer, {:basic_cancel, %{consumer_tag: consumer_tag, no_wait: no_wait}})
{:ok, consumer}
end

@impl true
def handle_deliver(
basic_deliver(
consumer_tag: consumer_tag,
delivery_tag: delivery_tag,
redelivered: redelivered,
exchange: exchange,
routing_key: routing_key
),
amqp_msg(
props:
p_basic(
content_type: content_type,
content_encoding: content_encoding,
headers: headers,
delivery_mode: delivery_mode,
priority: priority,
correlation_id: correlation_id,
reply_to: reply_to,
expiration: expiration,
message_id: message_id,
timestamp: timestamp,
type: type,
user_id: user_id,
app_id: app_id,
cluster_id: cluster_id
),
payload: payload
),
consumer
) do
send(
consumer,
{:basic_deliver, payload,
%{
consumer_tag: consumer_tag,
delivery_tag: delivery_tag,
redelivered: redelivered,
exchange: exchange,
routing_key: routing_key,
content_type: content_type,
content_encoding: content_encoding,
headers: headers,
persistent: delivery_mode == 2,
priority: priority,
correlation_id: correlation_id,
reply_to: reply_to,
expiration: expiration,
message_id: message_id,
timestamp: timestamp,
type: type,
user_id: user_id,
app_id: app_id,
cluster_id: cluster_id
}}
)

{:ok, consumer}
end

@impl true
def handle_deliver(basic_deliver(), _args, _ctx, _consumer) do
# there's no support for direct connection
# this callback implementation should be added with library support
{:error, :undefined}
end

@impl true
def handle_info({:DOWN, _mref, :process, consumer, :normal}, consumer) do
{:ok, consumer}
end

def handle_info({:DOWN, _mref, :process, consumer, info}, consumer) do
{:error, {:consumer_died, info}, consumer}
end

def handle_info(down = {:DOWN, _, _, _, _}, consumer) do
_ = send(consumer, down)
{:ok, consumer}
end

@impl true
def handle_call(_req, _from, consumer) do
{:reply, {:error, :undefined}, consumer}
end

@impl true
def terminate(_reason, consumer), do: consumer
end
2 changes: 1 addition & 1 deletion test/channel/receiver_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule AMQP.Channel.ReceiverTest do
:ok = Channel.close(chan)
end

test "closes the receiver when all handers are cancelled", meta do
test "closes the receiver when all handlers are cancelled", meta do
{:ok, chan} = Channel.open(meta.conn)

{:ok, consumer_tag} = Basic.consume(chan, meta.queue)
Expand Down
111 changes: 111 additions & 0 deletions test/direct_consumer_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
defmodule DirectConsumerTest do
use ExUnit.Case

alias AMQP.{Basic, Connection, Channel, Queue}

setup do
{:ok, conn} = Connection.open()
test = self()
receiver_pid = spawn(fn -> simple_receiver(test) end)
{:ok, chan} = Channel.open(conn, {AMQP.DirectConsumer, receiver_pid})

on_exit(fn ->
:ok = Connection.close(conn)
send(receiver_pid, :stop)
end)

{:ok, conn: conn, chan: chan}
end

def simple_receiver(pid) do
receive do
:stop ->
:ok

m ->
send(pid, m)
simple_receiver(pid)
end
end

test "basic publish to default exchange", meta do
assert :ok = Basic.publish(meta[:chan], "", "", "ping")
end

test "basic return", meta do
:ok = Basic.return(meta[:chan], self())

exchange = ""
routing_key = "non-existent-queue"
payload = "payload"

Basic.publish(meta[:chan], exchange, routing_key, payload, mandatory: true)

assert_receive {:basic_return, ^payload,
%{routing_key: ^routing_key, exchange: ^exchange, reply_text: "NO_ROUTE"}}

:ok = Basic.cancel_return(meta[:chan])

Basic.publish(meta[:chan], exchange, routing_key, payload, mandatory: true)

refute_receive {:basic_return, _payload, _properties}
end

describe "basic consume" do
setup meta do
{:ok, %{queue: queue}} = Queue.declare(meta[:chan])

on_exit(fn ->
Queue.delete(meta[:chan], queue)
end)

{:ok, Map.put(meta, :queue, queue)}
end

test "consumer receives :basic_consume_ok message", meta do
{:ok, consumer_tag} = Basic.consume(meta[:chan], meta[:queue])
assert_receive {:basic_consume_ok, %{consumer_tag: ^consumer_tag}}
{:ok, ^consumer_tag} = Basic.cancel(meta[:chan], consumer_tag)
end

test "consumer receives :basic_deliver message", meta do
{:ok, consumer_tag} = Basic.consume(meta[:chan], meta[:queue])

payload = "foo"
correlation_id = "correlation_id"
exchange = ""
routing_key = meta[:queue]

Basic.publish(meta[:chan], exchange, routing_key, payload, correlation_id: correlation_id)

assert_receive {:basic_deliver, ^payload,
%{
consumer_tag: ^consumer_tag,
correlation_id: ^correlation_id,
routing_key: ^routing_key
}}

{:ok, ^consumer_tag} = Basic.cancel(meta[:chan], consumer_tag)
end

test "consumer receives :basic_cancel_ok message", meta do
{:ok, consumer_tag} = Basic.consume(meta[:chan], meta[:queue])
{:ok, ^consumer_tag} = Basic.cancel(meta[:chan], consumer_tag)

assert_receive {:basic_cancel_ok, %{consumer_tag: ^consumer_tag}}
end

test "consumer receives :basic_cancel message", meta do
{:ok, consumer_tag} = Basic.consume(meta[:chan], meta[:queue])
{:ok, _} = Queue.delete(meta[:chan], meta[:queue])

assert_receive {:basic_cancel, %{consumer_tag: ^consumer_tag}}
end

test "cancel returns {:ok, consumer_tag}", meta do
{:ok, consumer_tag} = Basic.consume(meta[:chan], meta[:queue])

assert {:ok, ^consumer_tag} = Basic.cancel(meta[:chan], consumer_tag)
end
end
end

0 comments on commit e63d923

Please sign in to comment.