Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SelectiveConsumer #176

Merged
merged 13 commits into from
Dec 24, 2020
4 changes: 1 addition & 3 deletions lib/amqp/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ defmodule AMQP.Application do

@impl true
def start(_type, _args) do
children = [
{AMQP.Channel.ReceiverManager, []}
]
children = []

opts = [strategy: :one_for_one, name: AMQP.Application]
Supervisor.start_link(children, opts)
Expand Down
52 changes: 24 additions & 28 deletions lib/amqp/basic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ defmodule AMQP.Basic do
"""

import AMQP.Core
alias AMQP.{Channel, Utils}
alias AMQP.Channel.ReceiverManager
alias AMQP.{Channel, Utils, SelectiveConsumer}

@type error :: {:error, reason :: :blocked | :closing}

Expand Down Expand Up @@ -306,7 +305,7 @@ defmodule AMQP.Basic do
* `{:basic_consume_ok, %{consumer_tag: consumer_tag}}` - Sent when the consumer \
process is registered with Basic.consume. The caller receives the same information \
as the return of Basic.consume;
* `{:basic_cancel, %{consumer_tag: consumer_tag, no_wait: no_wait}}` - Sent by the \
* `{:basic_cancel, %{consumer_tag: consumer_tag, nowait: nowait}}` - Sent by the \
broker when the consumer is unexpectedly cancelled (such as after a queue deletion)
* `{:basic_cancel_ok, %{consumer_tag: consumer_tag}}` - Sent to the consumer process after a call to Basic.cancel

Expand All @@ -325,22 +324,25 @@ defmodule AMQP.Basic do
* `:exclusive` - If set, requests exclusive consumer access, meaning that only
this consumer can consume from the given `queue`. Note that the client cannot
have exclusive access to a queue that already has consumers.
* `:no_wait` - If set, the consume operation is asynchronous. Defaults to
* `:nowait` - If set, the consume operation is asynchronous. Defaults to
`false`.
* `:arguments` - A list of arguments to pass when consuming (of type `t:AMQP.arguments/0`).
See the README for more information. Defaults to `[]`.

"""
@spec consume(Channel.t(), String.t(), pid | nil, keyword) :: {:ok, String.t()} | error
def consume(%Channel{} = chan, queue, consumer_pid \\ nil, options \\ []) do
nowait = Keyword.get(options, :no_wait, false) || Keyword.get(options, :nowait, false)
consumer_tag = Keyword.get(options, :consumer_tag, "")

basic_consume =
basic_consume(
queue: queue,
consumer_tag: Keyword.get(options, :consumer_tag, ""),
no_local: Keyword.get(options, :no_local, false),
no_ack: Keyword.get(options, :no_ack, false),
exclusive: Keyword.get(options, :exclusive, false),
nowait: Keyword.get(options, :no_wait, false),
nowait: nowait,
arguments: Keyword.get(options, :arguments, [])
)

Expand All @@ -350,31 +352,29 @@ defmodule AMQP.Basic do
# https://github.com/rabbitmq/rabbitmq-erlang-client/blob/master/src/amqp_selective_consumer.erl
#
# It acts like a broker and distributes the messages to the process registered with :amqp_channel.subscribe/3.
# AMQP also provides another broker (Receiver/ReceiverManager) that transfors a message from Erlang record
# AMQP also provides another broker (DirectConsumer/SelectiveConsumer) that transfors a message from Erlang record
# to Elixir friendly type and forwards the message to the process passed to this method.
#
# [RabbitMQ] -> [Channel = SelectiveConsumer] -> [AMQP.Channel.Receiver] -> [consumer_pid]
# [RabbitMQ] -> [Channel] -> [SelectiveConsumer] -> [consumer_pid]
#
# If custom_consumer is set when the channel is open, the message handling is up to the consumer implementation.
#
# [RabbitMQ] -> [channel.custom_consumer] -> ???
#
pid =
case chan.custom_consumer do
nil ->
%{pid: pid} =
ReceiverManager.register_handler(chan.pid, consumer_pid || self(), :consume)

pid
{SelectiveConsumer, _} ->
consumer_pid || self()

_ ->
# when channel has a custom consumer, leave it to handle the given pid with `#handle_consume` callback.
consumer_pid
end

case :amqp_channel.subscribe(chan.pid, basic_consume, pid) do
basic_consume_ok(consumer_tag: consumer_tag) -> {:ok, consumer_tag}
error -> {:error, error}
case {nowait, :amqp_channel.subscribe(chan.pid, basic_consume, pid)} do
{true, :ok} -> {:ok, consumer_tag}
{_, basic_consume_ok(consumer_tag: consumer_tag)} -> {:ok, consumer_tag}
{_, error} -> {:error, error}
end
end

Expand All @@ -390,18 +390,17 @@ defmodule AMQP.Basic do

## Options

* `:no_wait` - If set, the cancel operation is asynchronous. Defaults to
`false`.

* `:nowait` - If set, the cancel operation is asynchronous. Defaults to `false`.
"""
@spec cancel(Channel.t(), String.t(), keyword) :: {:ok, String.t()} | error
def cancel(%Channel{pid: pid}, consumer_tag, options \\ []) do
basic_cancel =
basic_cancel(consumer_tag: consumer_tag, nowait: Keyword.get(options, :no_wait, false))
nowait = Keyword.get(options, :no_wait, false) || Keyword.get(options, :nowait, false)
basic_cancel = basic_cancel(consumer_tag: consumer_tag, nowait: nowait)

case :amqp_channel.call(pid, basic_cancel) do
basic_cancel_ok(consumer_tag: consumer_tag) -> {:ok, consumer_tag}
error -> {:error, error}
case {nowait, :amqp_channel.call(pid, basic_cancel)} do
{true, :ok} -> {:ok, consumer_tag}
{_, basic_cancel_ok(consumer_tag: consumer_tag)} -> {:ok, consumer_tag}
{_, error} -> {:error, error}
end
end

Expand All @@ -411,9 +410,8 @@ defmodule AMQP.Basic do
The registered process will receive `{:basic_return, payload, meta}` tuples.
"""
@spec return(Channel.t(), pid) :: :ok
def return(%Channel{pid: pid}, return_handler_pid) do
receiver = ReceiverManager.register_handler(pid, return_handler_pid, :return)
:amqp_channel.register_return_handler(pid, receiver.pid)
def return(%Channel{} = chan, return_handler_pid) do
:amqp_channel.call_consumer(chan.pid, {:register_return_handler, chan, return_handler_pid})
end

@doc """
Expand All @@ -422,8 +420,6 @@ defmodule AMQP.Basic do
"""
@spec cancel_return(Channel.t()) :: :ok
def cancel_return(%Channel{pid: pid}) do
# Currently we don't remove the receiver.
# The receiver will be deleted automatically when channel is closed.
:amqp_channel.unregister_return_handler(pid)
end
end
4 changes: 2 additions & 2 deletions lib/amqp/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule AMQP.Channel do
Functions to operate on Channels.
"""

alias AMQP.{Connection, Channel}
alias AMQP.{Connection, Channel, SelectiveConsumer}

defstruct [:conn, :pid, :custom_consumer]
@type t :: %Channel{conn: Connection.t(), pid: pid, custom_consumer: custom_consumer() | nil}
Expand All @@ -18,7 +18,7 @@ defmodule AMQP.Channel do
custom consumer.
"""
@spec open(Connection.t(), custom_consumer | nil) :: {:ok, Channel.t()} | {:error, any}
def open(%Connection{} = conn, custom_consumer \\ nil) do
def open(%Connection{} = conn, custom_consumer \\ {SelectiveConsumer, self()}) do
do_open_channel(conn, custom_consumer)
end

Expand Down
219 changes: 0 additions & 219 deletions lib/amqp/channel/receiver.ex

This file was deleted.

Loading