From 3250cf2339b68d514af182983a2367b54b1e4025 Mon Sep 17 00:00:00 2001 From: Bence Janos Toth Date: Mon, 1 Mar 2021 19:33:13 +0100 Subject: [PATCH] Handling shutdown reason in direct_consumer, and clean up consumer afterwards. (#184) * Handling shutdown reason in direct_consumer, and clean up consumer. * Introduced ignore_shutdown flag to the state of the direct_consumer, and fixed deprecation warning in a test case. * Tiny formatting to make it uniform --- lib/amqp/direct_consumer.ex | 112 ++++++++++++++++++++++-------------- test/connection_test.exs | 2 +- 2 files changed, 69 insertions(+), 45 deletions(-) diff --git a/lib/amqp/direct_consumer.ex b/lib/amqp/direct_consumer.ex index 254dd1d..25910dc 100644 --- a/lib/amqp/direct_consumer.ex +++ b/lib/amqp/direct_consumer.ex @@ -24,104 +24,128 @@ defmodule AMQP.DirectConsumer do ######################################################### @impl true - def init(consumer) do - _ref = Process.monitor(consumer) - {:ok, consumer} + def init({pid, options}) do + _ref = Process.monitor(pid) + ignore_shutdown = Keyword.get(options, :ignore_shutdown, false) + + {:ok, %{consumer: pid, ignore_shutdown: ignore_shutdown}} end + def init(pid), do: init({pid, []}) + @impl true - def handle_consume(basic_consume(), _pid, consumer) do + def handle_consume(basic_consume(), _pid, state) do # silently discard - {:ok, consumer} + {:ok, state} end @impl true - def handle_consume_ok(method, _args, consumer) do - send(consumer, compose_message(method)) - {:ok, consumer} + def handle_consume_ok(method, _args, state) do + send(state.consumer, compose_message(method)) + + {:ok, state} end @impl true - def handle_cancel(method, consumer) do - send(consumer, compose_message(method)) - {:ok, consumer} + def handle_cancel(method, state) do + send(state.consumer, compose_message(method)) + + {:ok, state} end @impl true - def handle_cancel_ok(method, _args, consumer) do - send(consumer, compose_message(method)) - {:ok, consumer} + def handle_cancel_ok(method, _args, state) do + send(state.consumer, compose_message(method)) + + {:ok, state} end @impl true - def handle_server_cancel(method, consumer) do - send(consumer, compose_message(method)) - {:ok, consumer} + def handle_server_cancel(method, state) do + send(state.consumer, compose_message(method)) + + {:ok, state} end @impl true - def handle_deliver(method, message, consumer) do - send(consumer, compose_message(method, message)) + def handle_deliver(_method, _msg, %{consumer: nil} = _state) do + {:error, :no_consumer, nil} + end - {:ok, consumer} + @impl true + def handle_deliver(method, message, state) do + send(state.consumer, compose_message(method, message)) + + {:ok, state} + end + + @impl true + def handle_deliver(_, _args, _ctx, %{consumer: nil} = _state) do + {:error, :no_consumer, nil} end @impl true - def handle_deliver(basic_deliver(), _args, _ctx, _consumer) do + def handle_deliver(basic_deliver(), _args, _ctx, _state) do # there's no support for direct connection # this callback implementation should be added with library support {:error, :undefined} end @impl true - def handle_info({:DOWN, _mref, :process, consumer, :normal}, consumer) do - {:ok, consumer} + def handle_info({:DOWN, _mref, :process, state, reason}, %{ignore_shutdown: true} = _state) + when reason in [:normal, :shutdown] do + {:ok, Map.put(state, :consumer, nil)} end - def handle_info({:DOWN, _mref, :process, consumer, info}, consumer) do - {:error, {:consumer_died, info}, consumer} + def handle_info({:DOWN, _mref, :process, state, :normal}, state) do + {:ok, state} end - def handle_info(down = {:DOWN, _, _, _, _}, consumer) do - send(consumer, down) - {:ok, consumer} + def handle_info({:DOWN, _mref, :process, state, info}, state) do + {:error, {:consumer_died, info}, state} + end + + def handle_info(down = {:DOWN, _, _, _, _}, state) do + send(state.consumer, down) + + {:ok, state} end - def handle_info({basic_return() = method, message}, consumer) do - send(consumer, compose_message(method, message)) + def handle_info({basic_return() = method, message}, state) do + send(state.consumer, compose_message(method, message)) - {:ok, consumer} + {:ok, state} end - def handle_info(basic_ack() = method, consumer) do - send(consumer, compose_message(method)) + def handle_info(basic_ack() = method, state) do + send(state.consumer, compose_message(method)) - {:ok, consumer} + {:ok, state} end - def handle_info(basic_nack() = method, consumer) do - send(consumer, compose_message(method)) + def handle_info(basic_nack() = method, state) do + send(state.consumer, compose_message(method)) - {:ok, consumer} + {:ok, state} end @impl true - def handle_call({:register_return_handler, chan, consumer}, _from, consumer) do + def handle_call({:register_return_handler, chan, _consumer}, _from, state) do :amqp_channel.register_return_handler(chan.pid, self()) - {:reply, :ok, consumer} + {:reply, :ok, state} end - def handle_call({:register_confirm_handler, chan, consumer}, _from, consumer) do + def handle_call({:register_confirm_handler, chan, _consumer}, _from, state) do :amqp_channel.register_confirm_handler(chan.pid, self()) - {:reply, :ok, consumer} + {:reply, :ok, state} end - def handle_call(_req, _from, consumer) do - {:reply, {:error, :undefined}, consumer} + def handle_call(_req, _from, state) do + {:reply, {:error, :undefined}, state} end @impl true - def terminate(_reason, consumer), do: consumer + def terminate(_reason, state), do: state end diff --git a/test/connection_test.exs b/test/connection_test.exs index 0e056b3..aa53a29 100644 --- a/test/connection_test.exs +++ b/test/connection_test.exs @@ -61,7 +61,7 @@ defmodule ConnectionTest do test "open connection with uri, name, and options (deprecated but still supported)" do assert {:ok, conn} = - Connection.open("amqp://nonexistent:5672", "my-connection", host: 'localhost') + Connection.open("amqp://nonexistent:5672", name: "my-connection", host: 'localhost') assert :ok = Connection.close(conn) end