Skip to content

Commit

Permalink
Handling shutdown reason in direct_consumer, and clean up consumer af…
Browse files Browse the repository at this point in the history
…terwards. (#184)

* Handling shutdown reason in direct_consumer, and clean up consumer.
* Introduced ignore_shutdown flag to the state of the direct_consumer, and fixed deprecation warning in a test case.
* Tiny formatting to make it uniform
  • Loading branch information
benonymus authored Mar 1, 2021
1 parent a9a3159 commit 3250cf2
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 45 deletions.
112 changes: 68 additions & 44 deletions lib/amqp/direct_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,104 +24,128 @@ defmodule AMQP.DirectConsumer do
#########################################################

@impl true
def init(consumer) do
_ref = Process.monitor(consumer)
{:ok, consumer}
def init({pid, options}) do
_ref = Process.monitor(pid)
ignore_shutdown = Keyword.get(options, :ignore_shutdown, false)

{:ok, %{consumer: pid, ignore_shutdown: ignore_shutdown}}
end

def init(pid), do: init({pid, []})

@impl true
def handle_consume(basic_consume(), _pid, consumer) do
def handle_consume(basic_consume(), _pid, state) do
# silently discard
{:ok, consumer}
{:ok, state}
end

@impl true
def handle_consume_ok(method, _args, consumer) do
send(consumer, compose_message(method))
{:ok, consumer}
def handle_consume_ok(method, _args, state) do
send(state.consumer, compose_message(method))

{:ok, state}
end

@impl true
def handle_cancel(method, consumer) do
send(consumer, compose_message(method))
{:ok, consumer}
def handle_cancel(method, state) do
send(state.consumer, compose_message(method))

{:ok, state}
end

@impl true
def handle_cancel_ok(method, _args, consumer) do
send(consumer, compose_message(method))
{:ok, consumer}
def handle_cancel_ok(method, _args, state) do
send(state.consumer, compose_message(method))

{:ok, state}
end

@impl true
def handle_server_cancel(method, consumer) do
send(consumer, compose_message(method))
{:ok, consumer}
def handle_server_cancel(method, state) do
send(state.consumer, compose_message(method))

{:ok, state}
end

@impl true
def handle_deliver(method, message, consumer) do
send(consumer, compose_message(method, message))
def handle_deliver(_method, _msg, %{consumer: nil} = _state) do
{:error, :no_consumer, nil}
end

{:ok, consumer}
@impl true
def handle_deliver(method, message, state) do
send(state.consumer, compose_message(method, message))

{:ok, state}
end

@impl true
def handle_deliver(_, _args, _ctx, %{consumer: nil} = _state) do
{:error, :no_consumer, nil}
end

@impl true
def handle_deliver(basic_deliver(), _args, _ctx, _consumer) do
def handle_deliver(basic_deliver(), _args, _ctx, _state) do
# there's no support for direct connection
# this callback implementation should be added with library support
{:error, :undefined}
end

@impl true
def handle_info({:DOWN, _mref, :process, consumer, :normal}, consumer) do
{:ok, consumer}
def handle_info({:DOWN, _mref, :process, state, reason}, %{ignore_shutdown: true} = _state)
when reason in [:normal, :shutdown] do
{:ok, Map.put(state, :consumer, nil)}
end

def handle_info({:DOWN, _mref, :process, consumer, info}, consumer) do
{:error, {:consumer_died, info}, consumer}
def handle_info({:DOWN, _mref, :process, state, :normal}, state) do
{:ok, state}
end

def handle_info(down = {:DOWN, _, _, _, _}, consumer) do
send(consumer, down)
{:ok, consumer}
def handle_info({:DOWN, _mref, :process, state, info}, state) do
{:error, {:consumer_died, info}, state}
end

def handle_info(down = {:DOWN, _, _, _, _}, state) do
send(state.consumer, down)

{:ok, state}
end

def handle_info({basic_return() = method, message}, consumer) do
send(consumer, compose_message(method, message))
def handle_info({basic_return() = method, message}, state) do
send(state.consumer, compose_message(method, message))

{:ok, consumer}
{:ok, state}
end

def handle_info(basic_ack() = method, consumer) do
send(consumer, compose_message(method))
def handle_info(basic_ack() = method, state) do
send(state.consumer, compose_message(method))

{:ok, consumer}
{:ok, state}
end

def handle_info(basic_nack() = method, consumer) do
send(consumer, compose_message(method))
def handle_info(basic_nack() = method, state) do
send(state.consumer, compose_message(method))

{:ok, consumer}
{:ok, state}
end

@impl true
def handle_call({:register_return_handler, chan, consumer}, _from, consumer) do
def handle_call({:register_return_handler, chan, _consumer}, _from, state) do
:amqp_channel.register_return_handler(chan.pid, self())

{:reply, :ok, consumer}
{:reply, :ok, state}
end

def handle_call({:register_confirm_handler, chan, consumer}, _from, consumer) do
def handle_call({:register_confirm_handler, chan, _consumer}, _from, state) do
:amqp_channel.register_confirm_handler(chan.pid, self())

{:reply, :ok, consumer}
{:reply, :ok, state}
end

def handle_call(_req, _from, consumer) do
{:reply, {:error, :undefined}, consumer}
def handle_call(_req, _from, state) do
{:reply, {:error, :undefined}, state}
end

@impl true
def terminate(_reason, consumer), do: consumer
def terminate(_reason, state), do: state
end
2 changes: 1 addition & 1 deletion test/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ defmodule ConnectionTest do

test "open connection with uri, name, and options (deprecated but still supported)" do
assert {:ok, conn} =
Connection.open("amqp://nonexistent:5672", "my-connection", host: 'localhost')
Connection.open("amqp://nonexistent:5672", name: "my-connection", host: 'localhost')

assert :ok = Connection.close(conn)
end
Expand Down

0 comments on commit 3250cf2

Please sign in to comment.