Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling shutdown reason in direct_consumer, and clean up consumer afterwards. #184

Merged
merged 3 commits into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 68 additions & 44 deletions lib/amqp/direct_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,104 +24,128 @@ defmodule AMQP.DirectConsumer do
#########################################################

@impl true
def init(consumer) do
_ref = Process.monitor(consumer)
{:ok, consumer}
def init({pid, options}) do
_ref = Process.monitor(pid)
ignore_shutdown = Keyword.get(options, :ignore_shutdown, false)

{:ok, %{consumer: pid, ignore_shutdown: ignore_shutdown}}
end

def init(pid), do: init({pid, []})

@impl true
def handle_consume(basic_consume(), _pid, consumer) do
def handle_consume(basic_consume(), _pid, state) do
# silently discard
{:ok, consumer}
{:ok, state}
end

@impl true
def handle_consume_ok(method, _args, consumer) do
send(consumer, compose_message(method))
{:ok, consumer}
def handle_consume_ok(method, _args, state) do
send(state.consumer, compose_message(method))

{:ok, state}
end

@impl true
def handle_cancel(method, consumer) do
send(consumer, compose_message(method))
{:ok, consumer}
def handle_cancel(method, state) do
send(state.consumer, compose_message(method))

{:ok, state}
end

@impl true
def handle_cancel_ok(method, _args, consumer) do
send(consumer, compose_message(method))
{:ok, consumer}
def handle_cancel_ok(method, _args, state) do
send(state.consumer, compose_message(method))

{:ok, state}
end

@impl true
def handle_server_cancel(method, consumer) do
send(consumer, compose_message(method))
{:ok, consumer}
def handle_server_cancel(method, state) do
send(state.consumer, compose_message(method))

{:ok, state}
end

@impl true
def handle_deliver(method, message, consumer) do
send(consumer, compose_message(method, message))
def handle_deliver(_method, _msg, %{consumer: nil} = _state) do
{:error, :no_consumer, nil}
end

{:ok, consumer}
@impl true
def handle_deliver(method, message, state) do
send(state.consumer, compose_message(method, message))

{:ok, state}
end

@impl true
def handle_deliver(_, _args, _ctx, %{consumer: nil} = _state) do
{:error, :no_consumer, nil}
end

@impl true
def handle_deliver(basic_deliver(), _args, _ctx, _consumer) do
def handle_deliver(basic_deliver(), _args, _ctx, _state) do
# there's no support for direct connection
# this callback implementation should be added with library support
{:error, :undefined}
end

@impl true
def handle_info({:DOWN, _mref, :process, consumer, :normal}, consumer) do
{:ok, consumer}
def handle_info({:DOWN, _mref, :process, state, reason}, %{ignore_shutdown: true} = _state)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't want to drop the pattern match for consumer pid here and other places.

when reason in [:normal, :shutdown] do
{:ok, Map.put(state, :consumer, nil)}
end

def handle_info({:DOWN, _mref, :process, consumer, info}, consumer) do
{:error, {:consumer_died, info}, consumer}
def handle_info({:DOWN, _mref, :process, state, :normal}, state) do
{:ok, state}
end

def handle_info(down = {:DOWN, _, _, _, _}, consumer) do
send(consumer, down)
{:ok, consumer}
def handle_info({:DOWN, _mref, :process, state, info}, state) do
{:error, {:consumer_died, info}, state}
end

def handle_info(down = {:DOWN, _, _, _, _}, state) do
send(state.consumer, down)

{:ok, state}
end

def handle_info({basic_return() = method, message}, consumer) do
send(consumer, compose_message(method, message))
def handle_info({basic_return() = method, message}, state) do
send(state.consumer, compose_message(method, message))

{:ok, consumer}
{:ok, state}
end

def handle_info(basic_ack() = method, consumer) do
send(consumer, compose_message(method))
def handle_info(basic_ack() = method, state) do
send(state.consumer, compose_message(method))

{:ok, consumer}
{:ok, state}
end

def handle_info(basic_nack() = method, consumer) do
send(consumer, compose_message(method))
def handle_info(basic_nack() = method, state) do
send(state.consumer, compose_message(method))

{:ok, consumer}
{:ok, state}
end

@impl true
def handle_call({:register_return_handler, chan, consumer}, _from, consumer) do
def handle_call({:register_return_handler, chan, _consumer}, _from, state) do
:amqp_channel.register_return_handler(chan.pid, self())

{:reply, :ok, consumer}
{:reply, :ok, state}
end

def handle_call({:register_confirm_handler, chan, consumer}, _from, consumer) do
def handle_call({:register_confirm_handler, chan, _consumer}, _from, state) do
:amqp_channel.register_confirm_handler(chan.pid, self())

{:reply, :ok, consumer}
{:reply, :ok, state}
end

def handle_call(_req, _from, consumer) do
{:reply, {:error, :undefined}, consumer}
def handle_call(_req, _from, state) do
{:reply, {:error, :undefined}, state}
end

@impl true
def terminate(_reason, consumer), do: consumer
def terminate(_reason, state), do: state
end
2 changes: 1 addition & 1 deletion test/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ defmodule ConnectionTest do

test "open connection with uri, name, and options (deprected but still spported)" do
assert {:ok, conn} =
Connection.open("amqp://nonexistent:5672", "my-connection", host: 'localhost')
Connection.open("amqp://nonexistent:5672", name: "my-connection", host: 'localhost')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intended. Testing a deprecated option for backward compatibility.


assert :ok = Connection.close(conn)
end
Expand Down