Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add direct consumer support #169

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions lib/amqp/basic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,17 @@ defmodule AMQP.Basic do

consumer_pid = consumer_pid || self()

receiver = ReceiverManager.register_handler(chan.pid, consumer_pid, :consume)
pid =
case chan.consumer_type do
:selective ->
%{pid: pid} = ReceiverManager.register_handler(chan.pid, consumer_pid, :consume)
pid

case :amqp_channel.subscribe(chan.pid, basic_consume, receiver.pid) do
:direct ->
consumer_pid
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to subscribe direct consumer pid? Isn't it subscribed automatically when channel is opened?

Copy link
Contributor Author

@sircinek sircinek Jul 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIU, amqp_channel.subscribe/3 starts a consumer for a queue that is bound to specific channel (and shares its lifetime (or until cancelled)), with opts specified in #basic_consume{}. The pid is in fact ignored by our (and rabbitmq) implementation of direct consumer, but the server method needs to be sent anyway.
Basically we can pass any pid there, but it doesn't make sense to start extra receiver as it will not get any messages.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you want to subscribe a channel when you have a direct consumer for it?

end

case :amqp_channel.subscribe(chan.pid, basic_consume, pid) do
basic_consume_ok(consumer_tag: consumer_tag) -> {:ok, consumer_tag}
error -> {:error, error}
end
Expand Down
43 changes: 35 additions & 8 deletions lib/amqp/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,28 @@ defmodule AMQP.Channel do

alias AMQP.{Connection, Channel}

defstruct [:conn, :pid]
@type t :: %Channel{conn: Connection.t(), pid: pid}
defstruct [:conn, :pid, :consumer_type]
@type t :: %Channel{conn: Connection.t(), pid: pid, consumer_type: consumer_type}
@type consumer_type :: :direct | :selective
@type opts :: [consumer_type: consumer_type, consumer_module: module, consumer_init_args: any]

@doc """
Opens a new Channel in a previously opened Connection.

## Options
* `:consumer_type` - specifies the type of consumer used as callback module.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be :default or :custom as currently :direct option lets you pass anything there.
It could also be more restrictive and allow only default callback module (DirectReceiver) implementation - which was my original intention - then this control flag had more sense. @ono what do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:default and :direct might be my preference. I am not comfortable with :custom sets DirectReceiver as a default consumer. :direct makes more sense to me. we can introduce :custom in future if required as an additional option.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other option is to eliminate this option and take {consumer_module, args} as a tuple like Erlang library does. Then make the following changes:

  • Rename DirectReciver to DirectConsumer
  • Add a proper documentation to DirectConsumer module

Then the user can use it simply like this:

chan = AMQP.Channel.open(conn) # call :amqp_connection.open_channel/1
chan = AMQP.Channel.open(conn, {AMQP.DirectConsumer, self()}) # call :amqp_connection.open_channel/2

This approach makes more sense to me as direct or non-direct is not a right concern on channel layer. What do you think?

Copy link
Contributor Author

@sircinek sircinek Jul 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do the latter, as it is better aligned with how Erlang library works.
Side note - maybe we would like to change Receiver to implement amqp_gen_consumer and be a callback module substituting default amqp_selective_receiver as it more or less does the same thing ? As a separate PR ofc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main purpose of Receiver is...

  • to convert the data from Erlang library(Record) to Elixir friendly data type(Map)
  • to receive messages for a channel in a single process so we can ensure the order of messages (deliver, returns etc.)

Now... I am a bit lost of the purpose of this PR 😓

To align 100%, could you write down an example code with direct_consumer and how it solves the problems?

available options: `:selective` or `:direct`. Defaults to `:selective`;
* `:consumer_module` - specifies consumer callback module. Used only with `:consumer_type: :direct` \
ignored otherwise. Defaults to `AMQP.Channel.DirectReceiver`;
* `:consumer_init_args` - arguments that will be passed to `init/1` function of
consumer callback module. Used only with `:consumer_type: :direct` ignored otherwise. \
If used with default `:consumer_module` it expects `t:Process.dest/0` of process \
receiving AMQP messages. Defaults to caller `pid`.
"""
@spec open(Connection.t()) :: {:ok, Channel.t()} | {:error, any}
def open(%Connection{pid: pid} = conn) do
case :amqp_connection.open_channel(pid) do
{:ok, chan_pid} -> {:ok, %Channel{conn: conn, pid: chan_pid}}
error -> error
end
@spec open(Connection.t(), opts) :: {:ok, Channel.t()} | {:error, any}
def open(%Connection{} = conn, opts \\ []) do
consumer_type = Keyword.get(opts, :consumer_type, :selective)
do_open_channel(conn, consumer_type, opts)
end

@doc """
Expand All @@ -29,4 +39,21 @@ defmodule AMQP.Channel do
error -> {:error, error}
end
end

defp do_open_channel(conn, type = :selective, _opts) do
case :amqp_connection.open_channel(conn.pid) do
{:ok, chan_pid} -> {:ok, %Channel{consumer_type: type, conn: conn, pid: chan_pid}}
error -> error
end
end

defp do_open_channel(conn, type = :direct, opts) do
consumer_module = Keyword.get(opts, :consumer_module, AMQP.Channel.DirectReceiver)
consumer_init_args = Keyword.get(opts, :consumer_init_args, self())

case :amqp_connection.open_channel(conn.pid, {consumer_module, consumer_init_args}) do
{:ok, chan_pid} -> {:ok, %Channel{consumer_type: type, conn: conn, pid: chan_pid}}
error -> error
end
end
end
139 changes: 139 additions & 0 deletions lib/amqp/channel/direct_receiver.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
defmodule AMQP.Channel.DirectReceiver do
@moduledoc false

# Direct consumer callback module that implements `:amqp_gen_consumer` behavior.
# Based on `amqp_direct_consumer.erl`.
# For more information see: https://github.com/rabbitmq/rabbitmq-erlang-client/blob/master/src/amqp_direct_consumer.erl .

import AMQP.Core
@behaviour :amqp_gen_consumer

#########################################################
### amqp_gen_consumer callbacks
#########################################################

@doc false
def init(consumer) do
_ref = Process.monitor(consumer)
{:ok, consumer}
end

@doc false
def handle_consume(basic_consume(), _pid, consumer) do
# silently discard
{:ok, consumer}
end

@doc false
def handle_consume_ok(basic_consume_ok(consumer_tag: consumer_tag), _args, consumer) do
_ = send(consumer, {:basic_consume_ok, %{consumer_tag: consumer_tag}})
{:ok, consumer}
end

@doc false
def handle_cancel(basic_cancel(consumer_tag: consumer_tag, nowait: no_wait), consumer) do
_ = send(consumer, {:basic_cancel, %{consumer_tag: consumer_tag, no_wait: no_wait}})
{:ok, consumer}
end

@doc false
def handle_cancel_ok(basic_cancel_ok(consumer_tag: consumer_tag), _args, consumer) do
_ = send(consumer, {:basic_cancel_ok, %{consumer_tag: consumer_tag}})
{:ok, consumer}
end

@doc false
def handle_server_cancel(basic_cancel(consumer_tag: consumer_tag, nowait: no_wait), consumer) do
_ = send(consumer, {:basic_cancel, %{consumer_tag: consumer_tag, no_wait: no_wait}})
{:ok, consumer}
end

@doc false
def handle_deliver(
basic_deliver(
consumer_tag: consumer_tag,
delivery_tag: delivery_tag,
redelivered: redelivered,
exchange: exchange,
routing_key: routing_key
),
amqp_msg(
props:
p_basic(
content_type: content_type,
content_encoding: content_encoding,
headers: headers,
delivery_mode: delivery_mode,
priority: priority,
correlation_id: correlation_id,
reply_to: reply_to,
expiration: expiration,
message_id: message_id,
timestamp: timestamp,
type: type,
user_id: user_id,
app_id: app_id,
cluster_id: cluster_id
),
payload: payload
),
consumer
) do
send(
consumer,
{:basic_deliver, payload,
%{
consumer_tag: consumer_tag,
delivery_tag: delivery_tag,
redelivered: redelivered,
exchange: exchange,
routing_key: routing_key,
content_type: content_type,
content_encoding: content_encoding,
headers: headers,
persistent: delivery_mode == 2,
priority: priority,
correlation_id: correlation_id,
reply_to: reply_to,
expiration: expiration,
message_id: message_id,
timestamp: timestamp,
type: type,
user_id: user_id,
app_id: app_id,
cluster_id: cluster_id
}}
)

{:ok, consumer}
end

@doc false
def handle_deliver(basic_deliver(), _args, _ctx, _consumer) do
# there's no support for direct connection
# this callback implementation should be added with library support
{:error, :undefined}
end

@doc false
def handle_info({:DOWN, _mref, :process, consumer, :normal}, consumer) do
{:ok, consumer}
end

def handle_info({:DOWN, _mref, :process, consumer, info}, consumer) do
{:error, {:consumer_died, info}, consumer}
end

def handle_info(down = {:DOWN, _, _, _, _}, consumer) do
_ = send(consumer, down)
{:ok, consumer}
end

@doc false
def handle_call(_req, _from, consumer) do
{:reply, {:error, :undefined}, consumer}
end

@doc false
def terminate(_reason, consumer), do: consumer
end
Loading