From 381c9095ba70cb84c2a0c194af3abc0c20b8ff30 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Sat, 12 Dec 2020 16:06:55 +0000 Subject: [PATCH 01/29] coverage tool --- .gitignore | 1 + mix.exs | 7 +++++-- mix.lock | 10 ++++++++++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 5f06f30..ef8bad9 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ erl_crash.dump *.ez /log /doc +/cover diff --git a/mix.exs b/mix.exs index 5d27b0e..5a792b2 100644 --- a/mix.exs +++ b/mix.exs @@ -22,7 +22,9 @@ defmodule AMQP.Mixfile do main: "readme", source_ref: "v#{@version}", source_url: "https://github.com/pma/amqp" - ] + ], + test_coverage: [tool: ExCoveralls], + preferred_cli_env: [coveralls: :test, "coveralls.detail": :test, "coveralls.post": :test, "coveralls.html": :test] ] end @@ -43,7 +45,8 @@ defmodule AMQP.Mixfile do {:inch_ex, "~> 0.5", only: :docs}, # Dev dependencies. - {:dialyxir, "~> 0.5", only: :dev, runtime: false} + {:dialyxir, "~> 0.5", only: :dev, runtime: false}, + {:excoveralls, "~> 0.10", only: :test} ] end diff --git a/mix.lock b/mix.lock index 8cb8a6e..7e91f4c 100644 --- a/mix.lock +++ b/mix.lock @@ -1,18 +1,28 @@ %{ "amqp_client": {:hex, :amqp_client, "3.8.9", "42fb24cfa606f87f77e84604c1ad896c26c236228196a294a07f88f8d2982695", [:make, :rebar3], [{:rabbit_common, "3.8.9", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "8ad70be479d2b8c7882aa2908245b490960fe5c619d2edd56eb24f90e1132b60"}, + "certifi": {:hex, :certifi, "2.5.2", "b7cfeae9d2ed395695dd8201c57a2d019c0c43ecaf8b8bcb9320b40d6662f340", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "3b3b5f36493004ac3455966991eaf6e768ce9884693d9968055aeeeb1e575040"}, "credentials_obfuscation": {:hex, :credentials_obfuscation, "2.2.0", "f8040672ff9644ccaefc40ffb8ec33ed80ac77df84ac5f206a1c079ca2aacc9d", [:rebar3], [], "hexpm", "52dd8585a2123e6e259253a7f4f849f1584404bd25cdfaf18247e198065c601e"}, "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm", "6c32a70ed5d452c6650916555b1f96c79af5fc4bf286997f8b15f213de786f73"}, "earmark": {:hex, :earmark, "1.4.5", "62ffd3bd7722fb7a7b1ecd2419ea0b458c356e7168c1f5d65caf09b4fbdd13c8", [:mix], [], "hexpm", "b7d0e6263d83dc27141a523467799a685965bf8b13b6743413f19a7079843f4f"}, "ex_doc": {:hex, :ex_doc, "0.22.1", "9bb6d51508778193a4ea90fa16eac47f8b67934f33f8271d5e1edec2dc0eee4c", [:mix], [{:earmark, "~> 1.4.0", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "d957de1b75cb9f78d3ee17820733dc4460114d8b1e11f7ee4fd6546e69b1db60"}, + "excoveralls": {:hex, :excoveralls, "0.13.3", "edc5f69218f84c2bf61b3609a22ddf1cec0fbf7d1ba79e59f4c16d42ea4347ed", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cc26f48d2f68666380b83d8aafda0fffc65dafcc8d8650358e0b61f6a99b1154"}, "goldrush": {:hex, :goldrush, "0.1.9", "f06e5d5f1277da5c413e84d5a2924174182fb108dabb39d5ec548b27424cd106", [:rebar3], [], "hexpm", "99cb4128cffcb3227581e5d4d803d5413fa643f4eb96523f77d9e6937d994ceb"}, + "hackney": {:hex, :hackney, "1.16.0", "5096ac8e823e3a441477b2d187e30dd3fff1a82991a806b2003845ce72ce2d84", [:rebar3], [{:certifi, "2.5.2", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.1", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.0", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.6", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "3bf0bebbd5d3092a3543b783bf065165fa5d3ad4b899b836810e513064134e18"}, + "idna": {:hex, :idna, "6.0.1", "1d038fb2e7668ce41fbf681d2c45902e52b3cb9e9c77b55334353b222c2ee50c", [:rebar3], [{:unicode_util_compat, "0.5.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a02c8a1c4fd601215bb0b0324c8a6986749f807ce35f25449ec9e69758708122"}, "inch_ex": {:hex, :inch_ex, "0.5.6", "418357418a553baa6d04eccd1b44171936817db61f4c0840112b420b8e378e67", [:mix], [{:poison, "~> 1.5 or ~> 2.0 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm", "7123ca0450686a61416a06cd38e26af18fd0f8c1cff5214770a957c6e0724338"}, + "jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"}, "jsx": {:hex, :jsx, "2.11.0", "08154624050333919b4ac1b789667d5f4db166dc50e190c4d778d1587f102ee0", [:rebar3], [], "hexpm", "eed26a0d04d217f9eecefffb89714452556cf90eb38f290a27a4d45b9988f8c0"}, "lager": {:hex, :lager, "3.8.0", "3402b9a7e473680ca179fc2f1d827cab88dd37dd1e6113090c6f45ef05228a1c", [:rebar3], [{:goldrush, "0.1.9", [hex: :goldrush, repo: "hexpm", optional: false]}], "hexpm", "f6cb541b688eab60730d8d286eb77256a5a9ad06eac10d43beaf55d07e68bbb6"}, "makeup": {:hex, :makeup, "1.0.3", "e339e2f766d12e7260e6672dd4047405963c5ec99661abdc432e6ec67d29ef95", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "2e9b4996d11832947731f7608fed7ad2f9443011b3b479ae288011265cdd3dad"}, "makeup_elixir": {:hex, :makeup_elixir, "0.14.1", "4f0e96847c63c17841d42c08107405a005a2680eb9c7ccadfd757bd31dabccfb", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f2438b1a80eaec9ede832b5c41cd4f373b38fd7aa33e3b22d9db79e640cbde11"}, + "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, + "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, "nimble_parsec": {:hex, :nimble_parsec, "0.6.0", "32111b3bf39137144abd7ba1cce0914533b2d16ef35e8abc5ec8be6122944263", [:mix], [], "hexpm", "27eac315a94909d4dc68bc07a4a83e06c8379237c5ea528a9acff4ca1c873c52"}, + "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"}, "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm", "fec8660eb7733ee4117b85f55799fd3833eb769a6df71ccf8903e8dc5447cfce"}, "rabbit_common": {:hex, :rabbit_common, "3.8.9", "5a80cb825013ada01921db6f20143295eff8c43284b96a4a40e52576397c9322", [:make, :rebar3], [{:credentials_obfuscation, "2.2.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:jsx, "2.11.0", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "3.8.0", [hex: :lager, repo: "hexpm", optional: false]}, {:ranch, "1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}, {:recon, "2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "e8138d2cf3fcef8e1c37f5e2c0fd39ca052b1fe98d21036508ed58f7edece33d"}, "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"}, "recon": {:hex, :recon, "2.5.1", "430ffa60685ac1efdfb1fe4c97b8767c92d0d92e6e7c3e8621559ba77598678a", [:mix, :rebar3], [], "hexpm", "5721c6b6d50122d8f68cccac712caa1231f97894bab779eff5ff0f886cb44648"}, + "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, + "unicode_util_compat": {:hex, :unicode_util_compat, "0.5.0", "8516502659002cec19e244ebd90d312183064be95025a319a6c7e89f4bccd65b", [:rebar3], [], "hexpm", "d48d002e15f5cc105a696cf2f1bbb3fc72b4b770a184d8420c8db20da2674b38"}, } From 784223b69672f1adbdccc280c4f0901a1b871d25 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Sat, 12 Dec 2020 16:23:42 +0000 Subject: [PATCH 02/29] Version 2.0.0-pre.0 --- mix.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mix.exs b/mix.exs index 5a792b2..33d3d39 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule AMQP.Mixfile do use Mix.Project - @version "1.6.0" + @version "2.0.0-pre.0" def project do [ From 82d656c7dba860ba8e2cd484842df97354da3100 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Thu, 24 Dec 2020 11:12:59 +0000 Subject: [PATCH 03/29] mix format --- mix.exs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/mix.exs b/mix.exs index 33d3d39..bdfb077 100644 --- a/mix.exs +++ b/mix.exs @@ -24,7 +24,12 @@ defmodule AMQP.Mixfile do source_url: "https://github.com/pma/amqp" ], test_coverage: [tool: ExCoveralls], - preferred_cli_env: [coveralls: :test, "coveralls.detail": :test, "coveralls.post": :test, "coveralls.html": :test] + preferred_cli_env: [ + coveralls: :test, + "coveralls.detail": :test, + "coveralls.post": :test, + "coveralls.html": :test + ] ] end From 35a94b1a96e2f9a4c0281900ea40173385fd5dd0 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Thu, 24 Dec 2020 11:15:07 +0000 Subject: [PATCH 04/29] Don't cache deps --- .github/workflows/elixir.yml | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/.github/workflows/elixir.yml b/.github/workflows/elixir.yml index 81dc711..e749d73 100644 --- a/.github/workflows/elixir.yml +++ b/.github/workflows/elixir.yml @@ -15,13 +15,6 @@ jobs: otp-version: 23.2 elixir-version: 1.11.2 - - name: Restore dependencies cache - uses: actions/cache@v2 - with: - path: deps - key: ${{ runner.os }}-mix-${{ hashFiles('**/mix.lock') }} - restore-keys: ${{ runner.os }}-mix- - - name: Install dependencies run: mix deps.get @@ -60,13 +53,6 @@ jobs: otp-version: ${{matrix.otp}} elixir-version: ${{matrix.elixir}} - - name: Restore dependencies cache - uses: actions/cache@v2 - with: - path: deps - key: ${{ runner.os }}-mix-${{ hashFiles('**/mix.lock') }} - restore-keys: ${{ runner.os }}-mix- - - name: Install dependencies run: mix deps.get From 4ca7593c0b3fd8e817e53819fecb2d1d4576d020 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Wed, 9 Dec 2020 22:52:51 +0000 Subject: [PATCH 05/29] Port :amqp_client's :amqp_selective_consumer to Elixir --- lib/amqp/basic.ex | 5 +- lib/amqp/channel.ex | 4 +- lib/amqp/core.ex | 6 + lib/amqp/selective_consumer.ex | 303 +++++++++++++++++++++++++++++++++ 4 files changed, 315 insertions(+), 3 deletions(-) create mode 100644 lib/amqp/selective_consumer.ex diff --git a/lib/amqp/basic.ex b/lib/amqp/basic.ex index 48c3fa3..e208671 100644 --- a/lib/amqp/basic.ex +++ b/lib/amqp/basic.ex @@ -4,7 +4,7 @@ defmodule AMQP.Basic do """ import AMQP.Core - alias AMQP.{Channel, Utils} + alias AMQP.{Channel, Utils, SelectiveConsumer} alias AMQP.Channel.ReceiverManager @type error :: {:error, reason :: :blocked | :closing} @@ -367,6 +367,9 @@ defmodule AMQP.Basic do pid + {SelectiveConsumer, _} -> + consumer_pid || self() + _ -> # when channel has a custom consumer, leave it to handle the given pid with `#handle_consume` callback. consumer_pid diff --git a/lib/amqp/channel.ex b/lib/amqp/channel.ex index c1d370a..c98880c 100644 --- a/lib/amqp/channel.ex +++ b/lib/amqp/channel.ex @@ -3,7 +3,7 @@ defmodule AMQP.Channel do Functions to operate on Channels. """ - alias AMQP.{Connection, Channel} + alias AMQP.{Connection, Channel, SelectiveConsumer} defstruct [:conn, :pid, :custom_consumer] @type t :: %Channel{conn: Connection.t(), pid: pid, custom_consumer: custom_consumer() | nil} @@ -18,7 +18,7 @@ defmodule AMQP.Channel do custom consumer. """ @spec open(Connection.t(), custom_consumer | nil) :: {:ok, Channel.t()} | {:error, any} - def open(%Connection{} = conn, custom_consumer \\ nil) do + def open(%Connection{} = conn, custom_consumer \\ {SelectiveConsumer, self()}) do do_open_channel(conn, custom_consumer) end diff --git a/lib/amqp/core.ex b/lib/amqp/core.ex index 5c68a42..a7302aa 100644 --- a/lib/amqp/core.ex +++ b/lib/amqp/core.ex @@ -201,6 +201,12 @@ defmodule AMQP.Core do Record.extract(:"basic.nack", from_lib: "rabbit_common/include/rabbit_framing.hrl") ) + Record.defrecord( + :basic_credit_drained, + :"basic.credit_drained", + Record.extract(:"basic.credit_drained", from_lib: "rabbit_common/include/rabbit_framing.hrl") + ) + Record.defrecord( :confirm_select, :"confirm.select", diff --git a/lib/amqp/selective_consumer.ex b/lib/amqp/selective_consumer.ex new file mode 100644 index 0000000..9019f0e --- /dev/null +++ b/lib/amqp/selective_consumer.ex @@ -0,0 +1,303 @@ +defmodule AMQP.SelectiveConsumer do + @moduledoc """ + TODO + """ + import AMQP.Core + alias AMQP.SelectiveConsumer + @behaviour :amqp_gen_consumer + + defstruct [consumers: %{}, unassigned: :undefined, monitors: %{}, default_consumer: :none] + @type t :: %SelectiveConsumer{ + consumers: %{String.t() => pid}, + unassigned: pid | :undefined, + monitors: %{pid => {integer, reference}}, + default_consumer: pid | :none + } + + @doc """ + Ported from :amqp_selective_consumer.register_default_consumer/2. + + This function registers a default consumer with the channel. + A default consumer is used when a subscription is made via + amqp_channel:call(ChannelPid, #'basic.consume'{}) (rather than {@module}:subscribe/3) and + hence there is no consumer pid registered with the consumer tag. In this case, the relevant + deliveries will be sent to the default consumer. + """ + @spec register_default_consumer(pid, pid) :: :ok + def register_default_consumer(channel_pid, consumer_pid) do + :amqp_channel.call_consumer(channel_pid, {:register_default_consumer, consumer_pid}) + end + + @impl true + def init(_state) do + {:ok, %SelectiveConsumer{}} + end + + @impl true + def handle_consume(basic_consume(consumer_tag: tag, nowait: nowait), pid, status) do + result = + case nowait do + true when tag == :undefined or is_nil(tag) or byte_size(tag) == 0 -> + :no_consumer_tag_specified + _ when is_binary(tag) and byte_size(tag) >= 0 -> + case resolve_consumer(tag, status) do + {:consumer, _} -> :consumer_tag_in_use + _ -> :ok + end + _ -> + :ok + end + + case {result, nowait} do + {:ok, true} -> + c = Map.put(status.consumers, tag, pid) + m = add_to_monitors(status.monitors, pid) + {:ok, %{status | consumers: c, monitors: m}} + + {:ok, false} -> + {:ok, %{status | unassigned: pid}} + + {error, true} -> + {:error, error, status}; + + {_error, false} -> + # Don't do anything (don't override existing consumers), the server will close the channel with an error. + {:ok, status} + end + end + + @impl true + def handle_consume_ok(basic_consume_ok(consumer_tag: tag) = consume_ok, _consume, %{unassigned: pid} = status) when is_pid(pid) do + c = Map.put(status.consumers, tag, pid) + m = add_to_monitors(status.monitors, pid) + + status = %{status | consumers: c, monitors: m, unassigned: :undefined} + {:ok, %{status | consumers: c, monitors: m}} + + deliver(consume_ok, status) + + {:ok, status} + end + + @impl true + def handle_cancel(basic_cancel(nowait: true), %{default_consumer: :none}) do + exit(:cancel_nowait_requires_default_consumer); + end + + def handle_cancel(basic_cancel(nowait: nowait) = cancel, status) do + case nowait do + true -> {:ok, do_cancel(cancel, status)} + false -> {:ok, status} + end + end + + defp do_cancel(cancel, status) do + tag = tag(cancel) + case Map.fetch(status.consumers, tag) do + {:ok, consumer} -> + c = Map.delete(status.consumers, tag) + m = remove_from_monitors(status.monitors, consumer) + {:ok, %{status | consumers: c, monitors: m}} + + _error -> + # untracked consumer + status + end + end + + @impl true + def handle_cancel_ok(basic_cancel_ok() = cancel_ok, _cancel, status) do + new_status = do_cancel(cancel_ok, status) + # use old status + deliver(cancel_ok, status) + + {:ok, new_status} + end + + @impl true + def handle_server_cancel(basic_cancel(nowait: true) = cancel, status) do + new_status = do_cancel(cancel, status) + # use old status + deliver(cancel, status) + + {:ok, new_status} + end + + @impl true + def handle_deliver(method, message, status) do + deliver(method, message, status) + {:ok, status} + end + + @impl true + def handle_deliver(method, message, delivery_ctx, status) do + deliver(method, message, delivery_ctx, status) + {:ok, status} + end + + @impl true + def handle_info({:DOWN, _ref, :process, pid, _reason}, status) do + m = Map.delete(status.monitors, pid) + d = if status.default_consumer == pid, do: :none, else: status.default_consumer + c = status.consumers |> Enum.reject(fn {_, v} -> v == pid end) |> Map.new() + + {:ok, %{status | consumers: c, monitors: m, default_consumer: d}} + end + + def handle_info(basic_credit_drained() = method, status) do + # TODO: support composing the message + deliver(method, status) + {:ok, status} + end + + @impl true + def handle_call({:register_default_consumer, pid}, _from, status) do + m = + if is_pid(status.default_consumer) do + remove_from_monitors(status.monitors, status.default_consumer) + else + status.monitors + end + |> add_to_monitors(pid) + + {:reply, :ok, %{status | monitors: m, default_consumer: pid}} + end + + @impl true + def terminate(_reason, status) do + status + end + + defp deliver(method, status) do + deliver(method, :undefined, status) + end + + defp deliver(method, message, status) do + deliver(method, message, :undefined, status) + end + + defp deliver(method, message, delivery_ctx, status) do + tag = tag(method) + composed = compose_message(method, message, delivery_ctx) + deliver_to_consumer_or_die(tag, composed, status) + end + + defp deliver_to_consumer_or_die(tag, message, status) do + case resolve_consumer(tag, status) do + {:consumer, pid} -> send(pid, message) + {:default, pid} -> send(pid, message); + _error -> exit(:unexpected_delivery_and_no_default_consumer) + end + end + + # AMQP original: convert Erlang record to map + defp compose_message(basic_consume_ok() = method, _message, _ctx) do + body = method |> basic_consume_ok() |> Enum.into(%{}) + {:basic_consume_ok, body} + end + + defp compose_message(basic_cancel_ok() = method, _message, _ctx) do + body = method |> basic_cancel_ok() |> Enum.into(%{}) + {:basic_cancel_ok, body} + end + + defp compose_message(basic_cancel() = method, _message, _ctx) do + body = method |> basic_cancel() |> Enum.into(%{}) + {:basic_cancel, body} + end + + defp compose_message( + basic_deliver( + consumer_tag: consumer_tag, + delivery_tag: delivery_tag, + redelivered: redelivered, + exchange: exchange, + routing_key: routing_key + ), + amqp_msg( + props: + p_basic( + content_type: content_type, + content_encoding: content_encoding, + headers: headers, + delivery_mode: delivery_mode, + priority: priority, + correlation_id: correlation_id, + reply_to: reply_to, + expiration: expiration, + message_id: message_id, + timestamp: timestamp, + type: type, + user_id: user_id, + app_id: app_id, + cluster_id: cluster_id + ), + payload: payload + ), _ctx) do + {:basic_deliver, payload, + %{ + consumer_tag: consumer_tag, + delivery_tag: delivery_tag, + redelivered: redelivered, + exchange: exchange, + routing_key: routing_key, + content_type: content_type, + content_encoding: content_encoding, + headers: headers, + persistent: delivery_mode == 2, + priority: priority, + correlation_id: correlation_id, + reply_to: reply_to, + expiration: expiration, + message_id: message_id, + timestamp: timestamp, + type: type, + user_id: user_id, + app_id: app_id, + cluster_id: cluster_id + }} + end + + # TODO: + # basic_credit_drained, basic_ack, basic_nack etc... + + defp resolve_consumer(tag, %{consumers: consumers, default_consumer: default}) do + case Map.fetch(consumers, tag) do + {:ok, pid} -> + {:consumer, pid} + + :error when is_pid(default) -> + {:default, default} + + _ -> + :error + end + end + + defp add_to_monitors(monitors, pid) do + case Map.fetch(monitors, pid) do + :error -> + Map.put(monitors, pid, {1, :erlang.monitor(:process, pid)}) + {:ok, {count, mref}} -> + Map.put(monitors, pid, {count + 1, mref}) + end + end + + defp remove_from_monitors(monitors, pid) do + case Map.fetch(monitors, pid) do + {:ok, {1, mref}} -> + :erlang.demonitor(mref) + Map.delete(monitors, pid) + + {:ok, {count, mref}} -> + Map.put(monitors, pid, {count - 1, mref}) + end + end + + defp tag(basic_consume(consumer_tag: tag)), do: tag + defp tag(basic_consume_ok(consumer_tag: tag)), do: tag + defp tag(basic_cancel(consumer_tag: tag)), do: tag + defp tag(basic_cancel_ok(consumer_tag: tag)), do: tag + defp tag(basic_deliver(consumer_tag: tag)), do: tag + defp tag(basic_credit_drained(consumer_tag: tag)), do: tag +end From f0ccd4c9f24e107992cfff13384e4e022c18f43c Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Sat, 12 Dec 2020 16:02:22 +0000 Subject: [PATCH 06/29] mix format --- lib/amqp/selective_consumer.ex | 36 ++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/lib/amqp/selective_consumer.ex b/lib/amqp/selective_consumer.ex index 9019f0e..ce660e6 100644 --- a/lib/amqp/selective_consumer.ex +++ b/lib/amqp/selective_consumer.ex @@ -6,13 +6,14 @@ defmodule AMQP.SelectiveConsumer do alias AMQP.SelectiveConsumer @behaviour :amqp_gen_consumer - defstruct [consumers: %{}, unassigned: :undefined, monitors: %{}, default_consumer: :none] + defstruct consumers: %{}, unassigned: :undefined, monitors: %{}, default_consumer: :none + @type t :: %SelectiveConsumer{ - consumers: %{String.t() => pid}, - unassigned: pid | :undefined, - monitors: %{pid => {integer, reference}}, - default_consumer: pid | :none - } + consumers: %{String.t() => pid}, + unassigned: pid | :undefined, + monitors: %{pid => {integer, reference}}, + default_consumer: pid | :none + } @doc """ Ported from :amqp_selective_consumer.register_default_consumer/2. @@ -39,11 +40,13 @@ defmodule AMQP.SelectiveConsumer do case nowait do true when tag == :undefined or is_nil(tag) or byte_size(tag) == 0 -> :no_consumer_tag_specified + _ when is_binary(tag) and byte_size(tag) >= 0 -> case resolve_consumer(tag, status) do {:consumer, _} -> :consumer_tag_in_use _ -> :ok end + _ -> :ok end @@ -58,7 +61,7 @@ defmodule AMQP.SelectiveConsumer do {:ok, %{status | unassigned: pid}} {error, true} -> - {:error, error, status}; + {:error, error, status} {_error, false} -> # Don't do anything (don't override existing consumers), the server will close the channel with an error. @@ -67,7 +70,12 @@ defmodule AMQP.SelectiveConsumer do end @impl true - def handle_consume_ok(basic_consume_ok(consumer_tag: tag) = consume_ok, _consume, %{unassigned: pid} = status) when is_pid(pid) do + def handle_consume_ok( + basic_consume_ok(consumer_tag: tag) = consume_ok, + _consume, + %{unassigned: pid} = status + ) + when is_pid(pid) do c = Map.put(status.consumers, tag, pid) m = add_to_monitors(status.monitors, pid) @@ -81,18 +89,19 @@ defmodule AMQP.SelectiveConsumer do @impl true def handle_cancel(basic_cancel(nowait: true), %{default_consumer: :none}) do - exit(:cancel_nowait_requires_default_consumer); + exit(:cancel_nowait_requires_default_consumer) end def handle_cancel(basic_cancel(nowait: nowait) = cancel, status) do case nowait do - true -> {:ok, do_cancel(cancel, status)} + true -> {:ok, do_cancel(cancel, status)} false -> {:ok, status} end end defp do_cancel(cancel, status) do tag = tag(cancel) + case Map.fetch(status.consumers, tag) do {:ok, consumer} -> c = Map.delete(status.consumers, tag) @@ -185,7 +194,7 @@ defmodule AMQP.SelectiveConsumer do defp deliver_to_consumer_or_die(tag, message, status) do case resolve_consumer(tag, status) do {:consumer, pid} -> send(pid, message) - {:default, pid} -> send(pid, message); + {:default, pid} -> send(pid, message) _error -> exit(:unexpected_delivery_and_no_default_consumer) end end @@ -233,7 +242,9 @@ defmodule AMQP.SelectiveConsumer do cluster_id: cluster_id ), payload: payload - ), _ctx) do + ), + _ctx + ) do {:basic_deliver, payload, %{ consumer_tag: consumer_tag, @@ -278,6 +289,7 @@ defmodule AMQP.SelectiveConsumer do case Map.fetch(monitors, pid) do :error -> Map.put(monitors, pid, {1, :erlang.monitor(:process, pid)}) + {:ok, {count, mref}} -> Map.put(monitors, pid, {count + 1, mref}) end From 584bac8f787ca83145147fb048b047645ba0da0f Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Tue, 22 Dec 2020 20:55:44 +0000 Subject: [PATCH 07/29] Support nowait option --- lib/amqp/basic.ex | 30 ++++++------ lib/amqp/selective_consumer.ex | 8 ++-- test/selective_consumer_test.exs | 81 ++++++++++++++++++++++++++++++++ 3 files changed, 101 insertions(+), 18 deletions(-) create mode 100644 test/selective_consumer_test.exs diff --git a/lib/amqp/basic.ex b/lib/amqp/basic.ex index e208671..5029206 100644 --- a/lib/amqp/basic.ex +++ b/lib/amqp/basic.ex @@ -306,7 +306,7 @@ defmodule AMQP.Basic do * `{:basic_consume_ok, %{consumer_tag: consumer_tag}}` - Sent when the consumer \ process is registered with Basic.consume. The caller receives the same information \ as the return of Basic.consume; - * `{:basic_cancel, %{consumer_tag: consumer_tag, no_wait: no_wait}}` - Sent by the \ + * `{:basic_cancel, %{consumer_tag: consumer_tag, nowait: nowait}}` - Sent by the \ broker when the consumer is unexpectedly cancelled (such as after a queue deletion) * `{:basic_cancel_ok, %{consumer_tag: consumer_tag}}` - Sent to the consumer process after a call to Basic.cancel @@ -325,7 +325,7 @@ defmodule AMQP.Basic do * `:exclusive` - If set, requests exclusive consumer access, meaning that only this consumer can consume from the given `queue`. Note that the client cannot have exclusive access to a queue that already has consumers. - * `:no_wait` - If set, the consume operation is asynchronous. Defaults to + * `:nowait` - If set, the consume operation is asynchronous. Defaults to `false`. * `:arguments` - A list of arguments to pass when consuming (of type `t:AMQP.arguments/0`). See the README for more information. Defaults to `[]`. @@ -333,6 +333,8 @@ defmodule AMQP.Basic do """ @spec consume(Channel.t(), String.t(), pid | nil, keyword) :: {:ok, String.t()} | error def consume(%Channel{} = chan, queue, consumer_pid \\ nil, options \\ []) do + nowait = Keyword.get(options, :no_wait, false) || Keyword.get(options, :nowait, false) + consumer_tag = Keyword.get(options, :consumer_tag, "") basic_consume = basic_consume( queue: queue, @@ -340,7 +342,7 @@ defmodule AMQP.Basic do no_local: Keyword.get(options, :no_local, false), no_ack: Keyword.get(options, :no_ack, false), exclusive: Keyword.get(options, :exclusive, false), - nowait: Keyword.get(options, :no_wait, false), + nowait: nowait, arguments: Keyword.get(options, :arguments, []) ) @@ -375,9 +377,10 @@ defmodule AMQP.Basic do consumer_pid end - case :amqp_channel.subscribe(chan.pid, basic_consume, pid) do - basic_consume_ok(consumer_tag: consumer_tag) -> {:ok, consumer_tag} - error -> {:error, error} + case {nowait, :amqp_channel.subscribe(chan.pid, basic_consume, pid)} do + {false, basic_consume_ok(consumer_tag: consumer_tag)} -> {:ok, consumer_tag} + {true, :ok} -> {:ok, consumer_tag} + {_, error} -> {:error, error} end end @@ -393,18 +396,17 @@ defmodule AMQP.Basic do ## Options - * `:no_wait` - If set, the cancel operation is asynchronous. Defaults to - `false`. - + * `:nowait` - If set, the cancel operation is asynchronous. Defaults to `false`. """ @spec cancel(Channel.t(), String.t(), keyword) :: {:ok, String.t()} | error def cancel(%Channel{pid: pid}, consumer_tag, options \\ []) do - basic_cancel = - basic_cancel(consumer_tag: consumer_tag, nowait: Keyword.get(options, :no_wait, false)) + nowait = Keyword.get(options, :no_wait, false) || Keyword.get(options, :nowait, false) + basic_cancel = basic_cancel(consumer_tag: consumer_tag, nowait: nowait) - case :amqp_channel.call(pid, basic_cancel) do - basic_cancel_ok(consumer_tag: consumer_tag) -> {:ok, consumer_tag} - error -> {:error, error} + case {nowait, :amqp_channel.call(pid, basic_cancel)} do + {false, basic_cancel_ok(consumer_tag: consumer_tag)} -> {:ok, consumer_tag} + {true, :ok} -> {:ok, consumer_tag} + {_, error} -> {:error, error} end end diff --git a/lib/amqp/selective_consumer.ex b/lib/amqp/selective_consumer.ex index ce660e6..cee0a83 100644 --- a/lib/amqp/selective_consumer.ex +++ b/lib/amqp/selective_consumer.ex @@ -3,7 +3,7 @@ defmodule AMQP.SelectiveConsumer do TODO """ import AMQP.Core - alias AMQP.SelectiveConsumer + alias AMQP.{Channel, SelectiveConsumer} @behaviour :amqp_gen_consumer defstruct consumers: %{}, unassigned: :undefined, monitors: %{}, default_consumer: :none @@ -24,9 +24,9 @@ defmodule AMQP.SelectiveConsumer do hence there is no consumer pid registered with the consumer tag. In this case, the relevant deliveries will be sent to the default consumer. """ - @spec register_default_consumer(pid, pid) :: :ok - def register_default_consumer(channel_pid, consumer_pid) do - :amqp_channel.call_consumer(channel_pid, {:register_default_consumer, consumer_pid}) + @spec register_default_consumer(Channel.t(), pid) :: :ok + def register_default_consumer(%Channel{pid: pid}, consumer_pid) do + :amqp_channel.call_consumer(pid, {:register_default_consumer, consumer_pid}) end @impl true diff --git a/test/selective_consumer_test.exs b/test/selective_consumer_test.exs new file mode 100644 index 0000000..d6717d5 --- /dev/null +++ b/test/selective_consumer_test.exs @@ -0,0 +1,81 @@ +defmodule AMQP.SelectiveConsumerTest do + use ExUnit.Case + + alias AMQP.{Basic, Connection, Channel, Queue, SelectiveConsumer} + + defmodule InspectConsumer do + def message_loop do + receive do + :stop -> + :ok + + m -> + IO.inspect(m) + message_loop() + end + end + end + + setup do + {:ok, conn} = Connection.open() + {:ok, chan} = Channel.open(conn) + {:ok, %{queue: queue}} = Queue.declare(chan) + defaultc = spawn(&InspectConsumer.message_loop/0) + SelectiveConsumer.register_default_consumer(chan, defaultc) + + on_exit(fn -> + :ok = Connection.close(conn) + if Process.alive?(chan.pid) do + send(defaultc, :stop) + Queue.delete(chan, queue) + Channel.close(chan) + end + end) + + {:ok, conn: conn, chan: chan, queue: queue} + end + + describe "with nowait option" do + test "Basic.consume", meta do + consumer_tag = "hello" + opts = [nowait: true, consumer_tag: consumer_tag] + {:ok, ^consumer_tag} = Basic.consume(meta[:chan], meta[:queue], self(), opts) + refute_receive {:basic_consume_ok, _} + + payload = "foo" + correlation_id = "correlation_id" + exchange = "" + routing_key = meta[:queue] + + Basic.publish(meta[:chan], exchange, routing_key, payload, correlation_id: correlation_id) + + assert_receive {:basic_deliver, ^payload, + %{ + consumer_tag: ^consumer_tag, + correlation_id: ^correlation_id, + routing_key: ^routing_key + }} + + {:ok, ^consumer_tag} = Basic.cancel(meta[:chan], consumer_tag) + end + + test "Basic.cancel and default consumer", meta do + {:ok, consumer_tag} = Basic.consume(meta[:chan], meta[:queue], self()) + + {:ok, ^consumer_tag} = Basic.cancel(meta[:chan], consumer_tag, nowait: true) + refute_receive {:basic_cancel_ok, _} + end + end + + describe "with direct ctx" do + + end + + describe "basic_credit_drained" do + + end + + describe "with multiple queues" do + + end +end From b2d8ebe8348fbbc1e9c1e9721bcad1ac652a670d Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Wed, 23 Dec 2020 14:11:20 +0000 Subject: [PATCH 08/29] Support multiple tags --- lib/amqp/selective_consumer.ex | 6 ++-- test/selective_consumer_test.exs | 54 +++++++++++++++++++------------- 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/lib/amqp/selective_consumer.ex b/lib/amqp/selective_consumer.ex index cee0a83..af1be4b 100644 --- a/lib/amqp/selective_consumer.ex +++ b/lib/amqp/selective_consumer.ex @@ -106,7 +106,7 @@ defmodule AMQP.SelectiveConsumer do {:ok, consumer} -> c = Map.delete(status.consumers, tag) m = remove_from_monitors(status.monitors, consumer) - {:ok, %{status | consumers: c, monitors: m}} + %{status | consumers: c, monitors: m} _error -> # untracked consumer @@ -173,8 +173,8 @@ defmodule AMQP.SelectiveConsumer do end @impl true - def terminate(_reason, status) do - status + def terminate(_reason, _status) do + :ok end defp deliver(method, status) do diff --git a/test/selective_consumer_test.exs b/test/selective_consumer_test.exs index d6717d5..894b9f1 100644 --- a/test/selective_consumer_test.exs +++ b/test/selective_consumer_test.exs @@ -20,16 +20,16 @@ defmodule AMQP.SelectiveConsumerTest do {:ok, conn} = Connection.open() {:ok, chan} = Channel.open(conn) {:ok, %{queue: queue}} = Queue.declare(chan) - defaultc = spawn(&InspectConsumer.message_loop/0) - SelectiveConsumer.register_default_consumer(chan, defaultc) + default_consumer = spawn(&InspectConsumer.message_loop/0) + SelectiveConsumer.register_default_consumer(chan, default_consumer) on_exit(fn -> - :ok = Connection.close(conn) if Process.alive?(chan.pid) do - send(defaultc, :stop) Queue.delete(chan, queue) + send(default_consumer, :stop) Channel.close(chan) end + :ok = Connection.close(conn) end) {:ok, conn: conn, chan: chan, queue: queue} @@ -42,24 +42,13 @@ defmodule AMQP.SelectiveConsumerTest do {:ok, ^consumer_tag} = Basic.consume(meta[:chan], meta[:queue], self(), opts) refute_receive {:basic_consume_ok, _} - payload = "foo" - correlation_id = "correlation_id" - exchange = "" - routing_key = meta[:queue] - - Basic.publish(meta[:chan], exchange, routing_key, payload, correlation_id: correlation_id) - - assert_receive {:basic_deliver, ^payload, - %{ - consumer_tag: ^consumer_tag, - correlation_id: ^correlation_id, - routing_key: ^routing_key - }} + Basic.publish(meta[:chan], "", meta[:queue], "hi") + assert_receive {:basic_deliver, "hi", %{consumer_tag: ^consumer_tag}} {:ok, ^consumer_tag} = Basic.cancel(meta[:chan], consumer_tag) end - test "Basic.cancel and default consumer", meta do + test "Basic.cancel", meta do {:ok, consumer_tag} = Basic.consume(meta[:chan], meta[:queue], self()) {:ok, ^consumer_tag} = Basic.cancel(meta[:chan], consumer_tag, nowait: true) @@ -67,15 +56,38 @@ defmodule AMQP.SelectiveConsumerTest do end end - describe "with direct ctx" do + describe "with multiple queues" do + setup meta do + {:ok, %{queue: queue2}} = Queue.declare(meta[:chan]) + + on_exit(fn -> + Queue.delete(meta[:chan], queue2) + end) + + {:ok, queue2: queue2} + end + test "does not delete the consumer until all gone", meta do + {:ok, consumer_tag1} = Basic.consume(meta[:chan], meta[:queue]) + {:ok, consumer_tag2} = Basic.consume(meta[:chan], meta[:queue2]) + + {:ok, ^consumer_tag1} = Basic.cancel(meta[:chan], consumer_tag1) + + Basic.publish(meta[:chan], "", meta[:queue2], "hi") + assert_receive {:basic_deliver, "hi", %{consumer_tag: ^consumer_tag2}} + + Basic.publish(meta[:chan], "", meta[:queue], "hola") + refute_receive {:basic_deliver, "hola", %{consumer_tag: ^consumer_tag1}} + + {:ok, ^consumer_tag2} = Basic.cancel(meta[:chan], consumer_tag2) + end end - describe "basic_credit_drained" do + describe "with direct ctx" do end - describe "with multiple queues" do + describe "basic_credit_drained" do end end From 389cf624697bb6875d1d8d3aa973f31d39ba4d40 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Wed, 23 Dec 2020 15:21:47 +0000 Subject: [PATCH 09/29] Disable delivery_ctx support --- lib/amqp/selective_consumer.ex | 38 +++++++++++++++++++++----------- test/selective_consumer_test.exs | 4 ---- 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/lib/amqp/selective_consumer.ex b/lib/amqp/selective_consumer.ex index af1be4b..6890954 100644 --- a/lib/amqp/selective_consumer.ex +++ b/lib/amqp/selective_consumer.ex @@ -139,9 +139,14 @@ defmodule AMQP.SelectiveConsumer do end @impl true - def handle_deliver(method, message, delivery_ctx, status) do - deliver(method, message, delivery_ctx, status) - {:ok, status} + def handle_deliver(_method, _message, _delivery_ctx, _status) do + # The handler is called with delivery_ctx for direct connection. + # Since the library is not supporting direct connection, returns an error. + # + # deliver(method, message, delivery_ctx, status) + # {:ok, status} + + {:error, :undefined} end @impl true @@ -182,15 +187,23 @@ defmodule AMQP.SelectiveConsumer do end defp deliver(method, message, status) do - deliver(method, message, :undefined, status) - end - - defp deliver(method, message, delivery_ctx, status) do tag = tag(method) - composed = compose_message(method, message, delivery_ctx) + composed = compose_message(method, message) deliver_to_consumer_or_die(tag, composed, status) end + # delivery_ctx support is yet to come. + # + # defp deliver(method, message, delivery_ctx, status) do + # tag = tag(method) + # composed = + # method + # |> compose_message(message) + # |> Tuple.append(delivery_ctx) + # + # deliver_to_consumer_or_die(tag, composed, status) + # end + defp deliver_to_consumer_or_die(tag, message, status) do case resolve_consumer(tag, status) do {:consumer, pid} -> send(pid, message) @@ -200,17 +213,17 @@ defmodule AMQP.SelectiveConsumer do end # AMQP original: convert Erlang record to map - defp compose_message(basic_consume_ok() = method, _message, _ctx) do + defp compose_message(basic_consume_ok() = method, _message) do body = method |> basic_consume_ok() |> Enum.into(%{}) {:basic_consume_ok, body} end - defp compose_message(basic_cancel_ok() = method, _message, _ctx) do + defp compose_message(basic_cancel_ok() = method, _message) do body = method |> basic_cancel_ok() |> Enum.into(%{}) {:basic_cancel_ok, body} end - defp compose_message(basic_cancel() = method, _message, _ctx) do + defp compose_message(basic_cancel() = method, _message) do body = method |> basic_cancel() |> Enum.into(%{}) {:basic_cancel, body} end @@ -242,8 +255,7 @@ defmodule AMQP.SelectiveConsumer do cluster_id: cluster_id ), payload: payload - ), - _ctx + ) ) do {:basic_deliver, payload, %{ diff --git a/test/selective_consumer_test.exs b/test/selective_consumer_test.exs index 894b9f1..167ba04 100644 --- a/test/selective_consumer_test.exs +++ b/test/selective_consumer_test.exs @@ -83,10 +83,6 @@ defmodule AMQP.SelectiveConsumerTest do end end - describe "with direct ctx" do - - end - describe "basic_credit_drained" do end From 75be928d769acf048bfdb69bd3ef999cc7b85260 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Wed, 23 Dec 2020 17:13:07 +0000 Subject: [PATCH 10/29] Support credit_drained --- lib/amqp/selective_consumer.ex | 9 +++++---- test/selective_consumer_test.exs | 16 ++++++++++++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/lib/amqp/selective_consumer.ex b/lib/amqp/selective_consumer.ex index 6890954..f564c06 100644 --- a/lib/amqp/selective_consumer.ex +++ b/lib/amqp/selective_consumer.ex @@ -159,7 +159,6 @@ defmodule AMQP.SelectiveConsumer do end def handle_info(basic_credit_drained() = method, status) do - # TODO: support composing the message deliver(method, status) {:ok, status} end @@ -228,6 +227,11 @@ defmodule AMQP.SelectiveConsumer do {:basic_cancel, body} end + defp compose_message(basic_credit_drained() = method, _message) do + body = method |> basic_credit_drained() |> Enum.into(%{}) + {:basic_credit_drained, body} + end + defp compose_message( basic_deliver( consumer_tag: consumer_tag, @@ -281,9 +285,6 @@ defmodule AMQP.SelectiveConsumer do }} end - # TODO: - # basic_credit_drained, basic_ack, basic_nack etc... - defp resolve_consumer(tag, %{consumers: consumers, default_consumer: default}) do case Map.fetch(consumers, tag) do {:ok, pid} -> diff --git a/test/selective_consumer_test.exs b/test/selective_consumer_test.exs index 167ba04..d0c9ca0 100644 --- a/test/selective_consumer_test.exs +++ b/test/selective_consumer_test.exs @@ -1,6 +1,7 @@ defmodule AMQP.SelectiveConsumerTest do use ExUnit.Case + import AMQP.Core alias AMQP.{Basic, Connection, Channel, Queue, SelectiveConsumer} defmodule InspectConsumer do @@ -84,6 +85,21 @@ defmodule AMQP.SelectiveConsumerTest do end describe "basic_credit_drained" do + test "forwards the message to the end consumer", meta do + {:ok, consumer_tag} = Basic.consume(meta[:chan], meta[:queue]) + # don't know how we can emit the message from the server so use send/2 + # to emulate the message + msg = basic_credit_drained(consumer_tag: consumer_tag) + consumer_pid = + meta[:chan].pid + |> :sys.get_state() + |> elem(3) + + send(consumer_pid, msg) + assert_receive {:basic_credit_drained, %{consumer_tag: ^consumer_tag}} + + {:ok, ^consumer_tag} = Basic.cancel(meta[:chan], consumer_tag) + end end end From 818f45b3043aec13ffedaee1a562cae911dc9a98 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Wed, 23 Dec 2020 20:03:38 +0000 Subject: [PATCH 11/29] Support return/confirm via consumer --- lib/amqp/basic.ex | 18 ++----- lib/amqp/confirm.ex | 6 +-- lib/amqp/selective_consumer.ex | 97 +++++++++++++++++++++++++++++++++- 3 files changed, 101 insertions(+), 20 deletions(-) diff --git a/lib/amqp/basic.ex b/lib/amqp/basic.ex index 5029206..8229be9 100644 --- a/lib/amqp/basic.ex +++ b/lib/amqp/basic.ex @@ -5,7 +5,6 @@ defmodule AMQP.Basic do import AMQP.Core alias AMQP.{Channel, Utils, SelectiveConsumer} - alias AMQP.Channel.ReceiverManager @type error :: {:error, reason :: :blocked | :closing} @@ -352,10 +351,10 @@ defmodule AMQP.Basic do # https://github.com/rabbitmq/rabbitmq-erlang-client/blob/master/src/amqp_selective_consumer.erl # # It acts like a broker and distributes the messages to the process registered with :amqp_channel.subscribe/3. - # AMQP also provides another broker (Receiver/ReceiverManager) that transfors a message from Erlang record + # AMQP also provides another broker (DirectConsumer/SelectiveConsumer) that transfors a message from Erlang record # to Elixir friendly type and forwards the message to the process passed to this method. # - # [RabbitMQ] -> [Channel = SelectiveConsumer] -> [AMQP.Channel.Receiver] -> [consumer_pid] + # [RabbitMQ] -> [Channel] -> [SelectiveConsumer] -> [consumer_pid] # # If custom_consumer is set when the channel is open, the message handling is up to the consumer implementation. # @@ -363,12 +362,6 @@ defmodule AMQP.Basic do # pid = case chan.custom_consumer do - nil -> - %{pid: pid} = - ReceiverManager.register_handler(chan.pid, consumer_pid || self(), :consume) - - pid - {SelectiveConsumer, _} -> consumer_pid || self() @@ -416,9 +409,8 @@ defmodule AMQP.Basic do The registered process will receive `{:basic_return, payload, meta}` tuples. """ @spec return(Channel.t(), pid) :: :ok - def return(%Channel{pid: pid}, return_handler_pid) do - receiver = ReceiverManager.register_handler(pid, return_handler_pid, :return) - :amqp_channel.register_return_handler(pid, receiver.pid) + def return(%Channel{} = chan, return_handler_pid) do + :amqp_channel.call_consumer(chan.pid, {:register_return_handler, chan, return_handler_pid}) end @doc """ @@ -427,8 +419,6 @@ defmodule AMQP.Basic do """ @spec cancel_return(Channel.t()) :: :ok def cancel_return(%Channel{pid: pid}) do - # Currently we don't remove the receiver. - # The receiver will be deleted automatically when channel is closed. :amqp_channel.unregister_return_handler(pid) end end diff --git a/lib/amqp/confirm.ex b/lib/amqp/confirm.ex index 72f84c1..ea3058a 100644 --- a/lib/amqp/confirm.ex +++ b/lib/amqp/confirm.ex @@ -5,7 +5,6 @@ defmodule AMQP.Confirm do import AMQP.Core alias AMQP.{Basic, Channel} - alias AMQP.Channel.ReceiverManager @doc """ Activates publishing confirmations on the channel. @@ -71,9 +70,8 @@ defmodule AMQP.Confirm do see https://www.rabbitmq.com/confirms.html """ @spec register_handler(Channel.t(), pid) :: :ok - def register_handler(%Channel{pid: chan_pid}, handler_pid) do - receiver = ReceiverManager.register_handler(chan_pid, handler_pid, :confirm) - :amqp_channel.register_confirm_handler(chan_pid, receiver.pid) + def register_handler(%Channel{} = chan, handler_pid) do + :amqp_channel.call_consumer(chan.pid, {:register_confirm_handler, chan, handler_pid}) end @doc """ diff --git a/lib/amqp/selective_consumer.ex b/lib/amqp/selective_consumer.ex index f564c06..22edd06 100644 --- a/lib/amqp/selective_consumer.ex +++ b/lib/amqp/selective_consumer.ex @@ -6,13 +6,14 @@ defmodule AMQP.SelectiveConsumer do alias AMQP.{Channel, SelectiveConsumer} @behaviour :amqp_gen_consumer - defstruct consumers: %{}, unassigned: :undefined, monitors: %{}, default_consumer: :none + defstruct consumers: %{}, unassigned: :undefined, monitors: %{}, default_consumer: :none, return_handler: :none, confirm_handler: :none @type t :: %SelectiveConsumer{ consumers: %{String.t() => pid}, unassigned: pid | :undefined, monitors: %{pid => {integer, reference}}, - default_consumer: pid | :none + default_consumer: pid | :none, + return_handler: pid | :none } @doc """ @@ -163,6 +164,27 @@ defmodule AMQP.SelectiveConsumer do {:ok, status} end + def handle_info({basic_return() = method, message}, %{return_handler: pid} = status) when is_pid(pid) do + composed = compose_message(method, message) + send(pid, composed) + + {:ok, status} + end + + def handle_info(basic_ack() = method, %{confirm_handler: pid} = status) when is_pid(pid) do + composed = compose_message(method, :undefined) + send(pid, composed) + + {:ok, status} + end + + def handle_info(basic_nack() = method, %{confirm_handler: pid} = status) when is_pid(pid) do + composed = compose_message(method, :undefined) + send(pid, composed) + + {:ok, status} + end + @impl true def handle_call({:register_default_consumer, pid}, _from, status) do m = @@ -176,6 +198,18 @@ defmodule AMQP.SelectiveConsumer do {:reply, :ok, %{status | monitors: m, default_consumer: pid}} end + def handle_call({:register_return_handler, chan, handler_pid}, _from, status) do + :amqp_channel.register_return_handler(chan.pid, self()) + + {:reply, :ok, %{status | return_handler: handler_pid}} + end + + def handle_call({:register_confirm_handler, chan, handler_pid}, _from, status) do + :amqp_channel.register_confirm_handler(chan.pid, self()) + + {:reply, :ok, %{status | confirm_handler: handler_pid}} + end + @impl true def terminate(_reason, _status) do :ok @@ -285,6 +319,65 @@ defmodule AMQP.SelectiveConsumer do }} end + defp compose_message( + basic_return( + reply_code: reply_code, + reply_text: reply_text, + exchange: exchange, + routing_key: routing_key + ), + amqp_msg( + props: + p_basic( + content_type: content_type, + content_encoding: content_encoding, + headers: headers, + delivery_mode: delivery_mode, + priority: priority, + correlation_id: correlation_id, + reply_to: reply_to, + expiration: expiration, + message_id: message_id, + timestamp: timestamp, + type: type, + user_id: user_id, + app_id: app_id, + cluster_id: cluster_id + ), + payload: payload + ) + ) do + {:basic_return, payload, + %{ + reply_code: reply_code, + reply_text: reply_text, + exchange: exchange, + routing_key: routing_key, + content_type: content_type, + content_encoding: content_encoding, + headers: headers, + persistent: delivery_mode == 2, + priority: priority, + correlation_id: correlation_id, + reply_to: reply_to, + expiration: expiration, + message_id: message_id, + timestamp: timestamp, + type: type, + user_id: user_id, + app_id: app_id, + cluster_id: cluster_id + }} + end + + defp compose_message(basic_ack(delivery_tag: delivery_tag, multiple: multiple), _message) do + {:basic_ack, delivery_tag, multiple} + end + + defp compose_message(basic_nack(delivery_tag: delivery_tag, multiple: multiple), _message) do + {:basic_nack, delivery_tag, multiple} + end + defp resolve_consumer(tag, %{consumers: consumers, default_consumer: default}) do case Map.fetch(consumers, tag) do {:ok, pid} -> From 1499b77dae9e750b5b60b8482036ffd51ee30ded Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Wed, 23 Dec 2020 20:10:41 +0000 Subject: [PATCH 12/29] Bye bye receiver --- lib/amqp/application.ex | 4 +- lib/amqp/channel/receiver.ex | 219 --------------------------- lib/amqp/channel/receiver_manager.ex | 128 ---------------- test/basic_test.exs | 9 -- test/channel/receiver_test.exs | 87 ----------- 5 files changed, 1 insertion(+), 446 deletions(-) delete mode 100644 lib/amqp/channel/receiver.ex delete mode 100644 lib/amqp/channel/receiver_manager.ex delete mode 100644 test/channel/receiver_test.exs diff --git a/lib/amqp/application.ex b/lib/amqp/application.ex index 2044f5b..f17036d 100644 --- a/lib/amqp/application.ex +++ b/lib/amqp/application.ex @@ -5,9 +5,7 @@ defmodule AMQP.Application do @impl true def start(_type, _args) do - children = [ - {AMQP.Channel.ReceiverManager, []} - ] + children = [] opts = [strategy: :one_for_one, name: AMQP.Application] Supervisor.start_link(children, opts) diff --git a/lib/amqp/channel/receiver.ex b/lib/amqp/channel/receiver.ex deleted file mode 100644 index a90aa82..0000000 --- a/lib/amqp/channel/receiver.ex +++ /dev/null @@ -1,219 +0,0 @@ -defmodule AMQP.Channel.Receiver do - @moduledoc false - - import AMQP.Core - alias AMQP.Channel.ReceiverManager - - @doc """ - Handles channel messages. - """ - @spec handle_message(pid(), pid(), map()) :: no_return - def handle_message(chan_pid, client_pid, handlers) do - receive do - {:DOWN, _ref, :process, _pid, _reason} -> - ReceiverManager.unregister_receiver(chan_pid, client_pid) - - {:EXIT, _ref, _reason} -> - ReceiverManager.unregister_receiver(chan_pid, client_pid) - - {:add_handler, handler, opts} -> - new_handlers = add_handler(handlers, handler, opts) - handle_message(chan_pid, client_pid, new_handlers) - - msg -> - with true <- Process.alive?(client_pid), - new_handlers <- do_handle_message(client_pid, handlers, msg), - size when size > 0 <- map_size(new_handlers) do - handle_message(chan_pid, client_pid, new_handlers) - else - _ -> ReceiverManager.unregister_receiver(chan_pid, client_pid) - end - end - end - - defp add_handler(handlers, :consume, opts) do - if opts[:tag] do - consumer_tags = (handlers[:consume] || []) ++ [opts[:tag]] - Map.put(handlers, :consume, consumer_tags |> Enum.uniq()) - else - handlers - end - end - - defp add_handler(handlers, handler, _) do - Map.put(handlers, handler, true) - end - - defp remove_handler(handlers, :consume, opts) do - if handlers[:consume] && opts[:tag] do - consumer_tags = List.delete(handlers[:consume], opts[:tag]) - - if length(consumer_tags) == 0 do - Map.delete(handlers, :consume) - else - Map.put(handlers, :consume, consumer_tags) - end - else - handlers - end - end - - defp remove_handler(handlers, handler, _) do - Map.delete(handlers, handler) - end - - # -- Confirm.register_handler - - defp do_handle_message( - client_pid, - handlers, - basic_ack(delivery_tag: delivery_tag, multiple: multiple) - ) do - send(client_pid, {:basic_ack, delivery_tag, multiple}) - handlers - end - - defp do_handle_message( - client_pid, - handlers, - basic_nack(delivery_tag: delivery_tag, multiple: multiple) - ) do - send(client_pid, {:basic_nack, delivery_tag, multiple}) - handlers - end - - # -- Basic.consume - - defp do_handle_message(client_pid, handlers, basic_consume_ok(consumer_tag: consumer_tag)) do - send(client_pid, {:basic_consume_ok, %{consumer_tag: consumer_tag}}) - add_handler(handlers, :consume, tag: consumer_tag) - end - - defp do_handle_message(client_pid, handlers, basic_cancel_ok(consumer_tag: consumer_tag)) do - send(client_pid, {:basic_cancel_ok, %{consumer_tag: consumer_tag}}) - remove_handler(handlers, :consume, tag: consumer_tag) - end - - defp do_handle_message( - client_pid, - handlers, - basic_cancel(consumer_tag: consumer_tag, nowait: no_wait) - ) do - send(client_pid, {:basic_cancel, %{consumer_tag: consumer_tag, no_wait: no_wait}}) - remove_handler(handlers, :consume, tag: consumer_tag) - end - - defp do_handle_message(client_pid, handlers, { - basic_deliver( - consumer_tag: consumer_tag, - delivery_tag: delivery_tag, - redelivered: redelivered, - exchange: exchange, - routing_key: routing_key - ), - amqp_msg( - props: - p_basic( - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - delivery_mode: delivery_mode, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - ), - payload: payload - ) - }) do - send( - client_pid, - {:basic_deliver, payload, - %{ - consumer_tag: consumer_tag, - delivery_tag: delivery_tag, - redelivered: redelivered, - exchange: exchange, - routing_key: routing_key, - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - persistent: delivery_mode == 2, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - }} - ) - - handlers - end - - defp do_handle_message(client_pid, handlers, { - basic_return( - reply_code: reply_code, - reply_text: reply_text, - exchange: exchange, - routing_key: routing_key - ), - amqp_msg( - props: - p_basic( - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - delivery_mode: delivery_mode, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - ), - payload: payload - ) - }) do - send( - client_pid, - {:basic_return, payload, - %{ - reply_code: reply_code, - reply_text: reply_text, - exchange: exchange, - routing_key: routing_key, - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - persistent: delivery_mode == 2, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - }} - ) - - handlers - end -end diff --git a/lib/amqp/channel/receiver_manager.ex b/lib/amqp/channel/receiver_manager.ex deleted file mode 100644 index 3562001..0000000 --- a/lib/amqp/channel/receiver_manager.ex +++ /dev/null @@ -1,128 +0,0 @@ -defmodule AMQP.Channel.ReceiverManager do - @moduledoc false - - # Manages receivers. - # - # AMQP relays messages from a channel to a client and convert - # [Record](https://hexdocs.pm/elixir/Record.html) to the data structure which - # is familiar with Elixir developer (Tuple, Map). - # We call the processes in between a channel and a client Receiver and - # this module manages them. - # - # This module ensures to have a single Receiver per a channel and a client. - # With that way, the sequence of the messages from channel would be always - # preserved. - - use GenServer - alias AMQP.Channel.Receiver - - @enforce_keys [:pid, :channel, :client] - @type t :: %__MODULE__{ - pid: pid(), - channel: pid(), - client: pid() - } - defstruct [:pid, :channel, :client] - - @doc false - @spec start_link(map) :: GenServer.on_start() - def start_link(ops \\ []) - def start_link([]), do: start_link(name: __MODULE__) - - def start_link(opts) do - GenServer.start_link(__MODULE__, %{}, opts) - end - - @impl true - def init(state = %{}) do - {:ok, state} - end - - @doc """ - Returns a receiver pid for the channel and client. - """ - @spec get_receiver(pid(), pid()) :: t() | nil - def get_receiver(channel, client) do - GenServer.call(__MODULE__, {:get, channel, client}) - end - - @doc """ - Unregisters a receiver from GenServer. - """ - @spec unregister_receiver(pid(), pid()) :: :ok - def unregister_receiver(channel, client) do - GenServer.call(__MODULE__, {:unregister, channel, client}) - end - - @doc """ - Registers a receiver pid for the channel and client. - """ - @spec register_handler(pid(), pid(), atom(), keyword()) :: t() | nil - def register_handler(channel, client, handler, opts \\ []) do - GenServer.call(__MODULE__, {:register_handler, channel, client, handler, opts}) - end - - @impl true - def handle_call({:get, channel, client}, _from, receivers) do - key = get_key(channel, client) - {:reply, Map.get(receivers, key), receivers} - end - - def handle_call({:unregister, channel, client}, _from, receivers) do - key = get_key(channel, client) - {:reply, :ok, Map.delete(receivers, key)} - end - - def handle_call({:register_handler, channel, client, handler, opts}, _from, receivers) do - receiver = - receivers - |> get_or_spawn_receiver(channel, client) - |> add_handler(handler, opts) - - key = get_key(channel, client) - {:reply, receiver, Map.put(receivers, key, receiver)} - end - - defp get_receiver(receivers, channel, client) do - key = get_key(channel, client) - - if (receiver = receivers[key]) && Process.alive?(receiver.pid) do - receiver - else - nil - end - end - - defp get_or_spawn_receiver(receivers, channel, client) do - if receiver = get_receiver(receivers, channel, client) do - receiver - else - spawn_receiver(channel, client) - end - end - - defp add_handler(receiver, handler, opts) do - send(receiver.pid, {:add_handler, handler, opts}) - receiver - end - - defp spawn_receiver(channel, client) do - receiver_pid = - spawn(fn -> - Process.flag(:trap_exit, true) - Process.monitor(channel) - Process.monitor(client) - Receiver.handle_message(channel, client, %{}) - end) - - %__MODULE__{ - pid: receiver_pid, - channel: channel, - client: client - } - end - - defp get_key(channel, client) do - "#{inspect(channel)}-#{inspect(client)}" - end -end diff --git a/test/basic_test.exs b/test/basic_test.exs index e6579d3..19433a9 100644 --- a/test/basic_test.exs +++ b/test/basic_test.exs @@ -2,7 +2,6 @@ defmodule BasicTest do use ExUnit.Case alias AMQP.{Basic, Connection, Channel, Queue} - alias Channel.ReceiverManager setup do {:ok, conn} = Connection.open() @@ -106,13 +105,5 @@ defmodule BasicTest do Process.flag(:trap_exit, true) assert {:normal, _} = catch_exit(Basic.cancel(meta[:chan], consumer_tag)) end - - test "removes a receiver when queue does not exist", meta do - catch_exit(Basic.consume(meta[:chan], "non-existent-queue")) - - :timer.sleep(100) - receiver = ReceiverManager.get_receiver(meta[:chan].pid, self()) - refute receiver - end end end diff --git a/test/channel/receiver_test.exs b/test/channel/receiver_test.exs deleted file mode 100644 index 330d7da..0000000 --- a/test/channel/receiver_test.exs +++ /dev/null @@ -1,87 +0,0 @@ -defmodule AMQP.Channel.ReceiverTest do - use ExUnit.Case - - alias AMQP.{Basic, Connection, Channel, Queue} - alias Channel.ReceiverManager - - setup do - {:ok, conn} = Connection.open() - {:ok, chan} = Channel.open(conn) - {:ok, %{queue: queue}} = Queue.declare(chan) - - on_exit(fn -> - Queue.delete(chan, queue) - :ok = Channel.close(chan) - :ok = Connection.close(conn) - end) - - {:ok, conn: conn, chan: chan, queue: queue} - end - - test "closes the receiver when channel is closed", meta do - {:ok, chan} = Channel.open(meta.conn) - - {:ok, consumer_tag} = Basic.consume(chan, meta.queue) - assert_receive {:basic_consume_ok, %{consumer_tag: ^consumer_tag}} - - receiver = ReceiverManager.get_receiver(chan.pid, self()) - assert Process.alive?(receiver.pid) - - :ok = Channel.close(chan) - :timer.sleep(100) - refute ReceiverManager.get_receiver(chan.pid, self()) - refute Process.alive?(receiver.pid) - end - - test "closes the receiver when the client is closed", meta do - {:ok, chan} = Channel.open(meta.conn) - - task = - Task.async(fn -> - {:ok, consumer_tag} = Basic.consume(chan, meta.queue) - assert_receive {:basic_consume_ok, %{consumer_tag: ^consumer_tag}} - ReceiverManager.get_receiver(chan.pid, self()) - end) - - receiver = Task.await(task) - :timer.sleep(100) - refute Process.alive?(task.pid) - refute Process.alive?(receiver.pid) - refute ReceiverManager.get_receiver(chan.pid, task.pid) - - :ok = Channel.close(chan) - end - - test "closes the receiver when all handlers are cancelled", meta do - {:ok, chan} = Channel.open(meta.conn) - - {:ok, consumer_tag} = Basic.consume(chan, meta.queue) - assert_receive {:basic_consume_ok, %{consumer_tag: ^consumer_tag}} - - receiver = ReceiverManager.get_receiver(chan.pid, self()) - assert Process.alive?(receiver.pid) - - {:ok, ^consumer_tag} = Basic.cancel(chan, consumer_tag) - :timer.sleep(100) - refute ReceiverManager.get_receiver(chan.pid, self()) - refute Process.alive?(receiver.pid) - - :ok = Channel.close(chan) - end - - test "unregisters the receiver when the register process dies", meta do - {:ok, chan} = Channel.open(meta.conn) - - {:ok, consumer_tag} = Basic.consume(chan, meta.queue) - assert_receive {:basic_consume_ok, %{consumer_tag: ^consumer_tag}} - - receiver = ReceiverManager.get_receiver(chan.pid, self()) - assert Process.alive?(receiver.pid) - - Process.exit(receiver.pid, :normal) - :timer.sleep(100) - refute ReceiverManager.get_receiver(chan.pid, self()) - - :ok = Channel.close(chan) - end -end From b7a7a6c78918801bb418ed3562410372fd9724c1 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Wed, 23 Dec 2020 20:44:54 +0000 Subject: [PATCH 13/29] Refactor consumers --- lib/amqp/consumer_helper.ex | 140 +++++++++++++++++++++++++++++++++ lib/amqp/direct_consumer.ex | 107 ++++++++++--------------- lib/amqp/selective_consumer.ex | 133 +------------------------------ test/direct_consumer_test.exs | 4 +- 4 files changed, 187 insertions(+), 197 deletions(-) create mode 100644 lib/amqp/consumer_helper.ex diff --git a/lib/amqp/consumer_helper.ex b/lib/amqp/consumer_helper.ex new file mode 100644 index 0000000..5a44bb5 --- /dev/null +++ b/lib/amqp/consumer_helper.ex @@ -0,0 +1,140 @@ +defmodule AMQP.ConsumerHelper do + @moduledoc false + + import AMQP.Core + + @doc false + def compose_message(method, message \\ :undefined) + + def compose_message(basic_consume_ok() = method, _message) do + body = method |> basic_consume_ok() |> Enum.into(%{}) + {:basic_consume_ok, body} + end + + def compose_message(basic_cancel_ok() = method, _message) do + body = method |> basic_cancel_ok() |> Enum.into(%{}) + {:basic_cancel_ok, body} + end + + def compose_message(basic_cancel() = method, _message) do + body = method |> basic_cancel() |> Enum.into(%{}) + {:basic_cancel, body} + end + + def compose_message(basic_credit_drained() = method, _message) do + body = method |> basic_credit_drained() |> Enum.into(%{}) + {:basic_credit_drained, body} + end + + def compose_message( + basic_deliver( + consumer_tag: consumer_tag, + delivery_tag: delivery_tag, + redelivered: redelivered, + exchange: exchange, + routing_key: routing_key + ), + amqp_msg( + props: + p_basic( + content_type: content_type, + content_encoding: content_encoding, + headers: headers, + delivery_mode: delivery_mode, + priority: priority, + correlation_id: correlation_id, + reply_to: reply_to, + expiration: expiration, + message_id: message_id, + timestamp: timestamp, + type: type, + user_id: user_id, + app_id: app_id, + cluster_id: cluster_id + ), + payload: payload + ) + ) do + {:basic_deliver, payload, + %{ + consumer_tag: consumer_tag, + delivery_tag: delivery_tag, + redelivered: redelivered, + exchange: exchange, + routing_key: routing_key, + content_type: content_type, + content_encoding: content_encoding, + headers: headers, + persistent: delivery_mode == 2, + priority: priority, + correlation_id: correlation_id, + reply_to: reply_to, + expiration: expiration, + message_id: message_id, + timestamp: timestamp, + type: type, + user_id: user_id, + app_id: app_id, + cluster_id: cluster_id + }} + end + + def compose_message( + basic_return( + reply_code: reply_code, + reply_text: reply_text, + exchange: exchange, + routing_key: routing_key + ), + amqp_msg( + props: + p_basic( + content_type: content_type, + content_encoding: content_encoding, + headers: headers, + delivery_mode: delivery_mode, + priority: priority, + correlation_id: correlation_id, + reply_to: reply_to, + expiration: expiration, + message_id: message_id, + timestamp: timestamp, + type: type, + user_id: user_id, + app_id: app_id, + cluster_id: cluster_id + ), + payload: payload + ) + ) do + {:basic_return, payload, + %{ + reply_code: reply_code, + reply_text: reply_text, + exchange: exchange, + routing_key: routing_key, + content_type: content_type, + content_encoding: content_encoding, + headers: headers, + persistent: delivery_mode == 2, + priority: priority, + correlation_id: correlation_id, + reply_to: reply_to, + expiration: expiration, + message_id: message_id, + timestamp: timestamp, + type: type, + user_id: user_id, + app_id: app_id, + cluster_id: cluster_id + }} + end + + def compose_message(basic_ack(delivery_tag: delivery_tag, multiple: multiple), _message) do + {:basic_ack, delivery_tag, multiple} + end + + def compose_message(basic_nack(delivery_tag: delivery_tag, multiple: multiple), _message) do + {:basic_nack, delivery_tag, multiple} + end +end diff --git a/lib/amqp/direct_consumer.ex b/lib/amqp/direct_consumer.ex index b2a1a94..1db5bdb 100644 --- a/lib/amqp/direct_consumer.ex +++ b/lib/amqp/direct_consumer.ex @@ -16,6 +16,7 @@ defmodule AMQP.DirectConsumer do For more information see: https://www.rabbitmq.com/erlang-client-user-guide.html#consumers-imlementation """ import AMQP.Core + import AMQP.ConsumerHelper @behaviour :amqp_gen_consumer ######################################################### @@ -35,85 +36,32 @@ defmodule AMQP.DirectConsumer do end @impl true - def handle_consume_ok(basic_consume_ok(consumer_tag: consumer_tag), _args, consumer) do - _ = send(consumer, {:basic_consume_ok, %{consumer_tag: consumer_tag}}) + def handle_consume_ok(method, _args, consumer) do + send(consumer, compose_message(method)) {:ok, consumer} end @impl true - def handle_cancel(basic_cancel(consumer_tag: consumer_tag, nowait: no_wait), consumer) do - _ = send(consumer, {:basic_cancel, %{consumer_tag: consumer_tag, no_wait: no_wait}}) + def handle_cancel(method, consumer) do + send(consumer, compose_message(method)) {:ok, consumer} end @impl true - def handle_cancel_ok(basic_cancel_ok(consumer_tag: consumer_tag), _args, consumer) do - _ = send(consumer, {:basic_cancel_ok, %{consumer_tag: consumer_tag}}) + def handle_cancel_ok(method, _args, consumer) do + send(consumer, compose_message(method)) {:ok, consumer} end @impl true - def handle_server_cancel(basic_cancel(consumer_tag: consumer_tag, nowait: no_wait), consumer) do - _ = send(consumer, {:basic_cancel, %{consumer_tag: consumer_tag, no_wait: no_wait}}) + def handle_server_cancel(method, consumer) do + send(consumer, compose_message(method)) {:ok, consumer} end @impl true - def handle_deliver( - basic_deliver( - consumer_tag: consumer_tag, - delivery_tag: delivery_tag, - redelivered: redelivered, - exchange: exchange, - routing_key: routing_key - ), - amqp_msg( - props: - p_basic( - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - delivery_mode: delivery_mode, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - ), - payload: payload - ), - consumer - ) do - send( - consumer, - {:basic_deliver, payload, - %{ - consumer_tag: consumer_tag, - delivery_tag: delivery_tag, - redelivered: redelivered, - exchange: exchange, - routing_key: routing_key, - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - persistent: delivery_mode == 2, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - }} - ) + def handle_deliver(method, message, consumer) do + send(consumer, compose_message(method, message)) {:ok, consumer} end @@ -135,11 +83,42 @@ defmodule AMQP.DirectConsumer do end def handle_info(down = {:DOWN, _, _, _, _}, consumer) do - _ = send(consumer, down) + send(consumer, down) + {:ok, consumer} + end + + def handle_info({basic_return() = method, message}, consumer) do + send(consumer, compose_message(method, message)) + + {:ok, consumer} + end + + def handle_info(basic_ack() = method, consumer) do + send(consumer, compose_message(method)) + + {:ok, consumer} + end + + def handle_info(basic_nack() = method, consumer) do + send(consumer, compose_message(method)) + {:ok, consumer} end @impl true + def handle_call({:register_return_handler, chan, consumer}, _from, consumer) do + :amqp_channel.register_return_handler(chan.pid, self()) + + {:reply, :ok, consumer} + end + + def handle_call({:register_confirm_handler, chan, consumer}, _from, consumer) do + :amqp_channel.register_confirm_handler(chan.pid, self()) + + {:reply, :ok, consumer} + end + + def handle_call(_req, _from, consumer) do {:reply, {:error, :undefined}, consumer} end diff --git a/lib/amqp/selective_consumer.ex b/lib/amqp/selective_consumer.ex index 22edd06..ebb9035 100644 --- a/lib/amqp/selective_consumer.ex +++ b/lib/amqp/selective_consumer.ex @@ -2,7 +2,9 @@ defmodule AMQP.SelectiveConsumer do @moduledoc """ TODO """ + import AMQP.Core + import AMQP.ConsumerHelper alias AMQP.{Channel, SelectiveConsumer} @behaviour :amqp_gen_consumer @@ -246,137 +248,6 @@ defmodule AMQP.SelectiveConsumer do end # AMQP original: convert Erlang record to map - defp compose_message(basic_consume_ok() = method, _message) do - body = method |> basic_consume_ok() |> Enum.into(%{}) - {:basic_consume_ok, body} - end - - defp compose_message(basic_cancel_ok() = method, _message) do - body = method |> basic_cancel_ok() |> Enum.into(%{}) - {:basic_cancel_ok, body} - end - - defp compose_message(basic_cancel() = method, _message) do - body = method |> basic_cancel() |> Enum.into(%{}) - {:basic_cancel, body} - end - - defp compose_message(basic_credit_drained() = method, _message) do - body = method |> basic_credit_drained() |> Enum.into(%{}) - {:basic_credit_drained, body} - end - - defp compose_message( - basic_deliver( - consumer_tag: consumer_tag, - delivery_tag: delivery_tag, - redelivered: redelivered, - exchange: exchange, - routing_key: routing_key - ), - amqp_msg( - props: - p_basic( - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - delivery_mode: delivery_mode, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - ), - payload: payload - ) - ) do - {:basic_deliver, payload, - %{ - consumer_tag: consumer_tag, - delivery_tag: delivery_tag, - redelivered: redelivered, - exchange: exchange, - routing_key: routing_key, - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - persistent: delivery_mode == 2, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - }} - end - - defp compose_message( - basic_return( - reply_code: reply_code, - reply_text: reply_text, - exchange: exchange, - routing_key: routing_key - ), - amqp_msg( - props: - p_basic( - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - delivery_mode: delivery_mode, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - ), - payload: payload - ) - ) do - {:basic_return, payload, - %{ - reply_code: reply_code, - reply_text: reply_text, - exchange: exchange, - routing_key: routing_key, - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - persistent: delivery_mode == 2, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - }} - end - - defp compose_message(basic_ack(delivery_tag: delivery_tag, multiple: multiple), _message) do - {:basic_ack, delivery_tag, multiple} - end - - defp compose_message(basic_nack(delivery_tag: delivery_tag, multiple: multiple), _message) do - {:basic_nack, delivery_tag, multiple} - end defp resolve_consumer(tag, %{consumers: consumers, default_consumer: default}) do case Map.fetch(consumers, tag) do diff --git a/test/direct_consumer_test.exs b/test/direct_consumer_test.exs index b24af97..07ed02c 100644 --- a/test/direct_consumer_test.exs +++ b/test/direct_consumer_test.exs @@ -14,7 +14,7 @@ defmodule DirectConsumerTest do send(receiver_pid, :stop) end) - {:ok, conn: conn, chan: chan} + {:ok, conn: conn, chan: chan, receiver_pid: receiver_pid} end def simple_receiver(pid) do @@ -33,7 +33,7 @@ defmodule DirectConsumerTest do end test "basic return", meta do - :ok = Basic.return(meta[:chan], self()) + :ok = Basic.return(meta[:chan], meta[:receiver_pid]) exchange = "" routing_key = "non-existent-queue" From 5b05b5dd299e94ae52630c37843793ee1a690ae8 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Wed, 23 Dec 2020 20:45:26 +0000 Subject: [PATCH 14/29] mix format --- lib/amqp/basic.ex | 1 + lib/amqp/consumer_helper.ex | 110 +++++++++++++++---------------- lib/amqp/direct_consumer.ex | 1 - lib/amqp/selective_consumer.ex | 10 ++- test/selective_consumer_test.exs | 2 + 5 files changed, 66 insertions(+), 58 deletions(-) diff --git a/lib/amqp/basic.ex b/lib/amqp/basic.ex index 8229be9..3d1e96f 100644 --- a/lib/amqp/basic.ex +++ b/lib/amqp/basic.ex @@ -334,6 +334,7 @@ defmodule AMQP.Basic do def consume(%Channel{} = chan, queue, consumer_pid \\ nil, options \\ []) do nowait = Keyword.get(options, :no_wait, false) || Keyword.get(options, :nowait, false) consumer_tag = Keyword.get(options, :consumer_tag, "") + basic_consume = basic_consume( queue: queue, diff --git a/lib/amqp/consumer_helper.ex b/lib/amqp/consumer_helper.ex index 5a44bb5..40abf16 100644 --- a/lib/amqp/consumer_helper.ex +++ b/lib/amqp/consumer_helper.ex @@ -27,34 +27,34 @@ defmodule AMQP.ConsumerHelper do end def compose_message( - basic_deliver( - consumer_tag: consumer_tag, - delivery_tag: delivery_tag, - redelivered: redelivered, - exchange: exchange, - routing_key: routing_key - ), - amqp_msg( - props: - p_basic( - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - delivery_mode: delivery_mode, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - ), - payload: payload - ) - ) do + basic_deliver( + consumer_tag: consumer_tag, + delivery_tag: delivery_tag, + redelivered: redelivered, + exchange: exchange, + routing_key: routing_key + ), + amqp_msg( + props: + p_basic( + content_type: content_type, + content_encoding: content_encoding, + headers: headers, + delivery_mode: delivery_mode, + priority: priority, + correlation_id: correlation_id, + reply_to: reply_to, + expiration: expiration, + message_id: message_id, + timestamp: timestamp, + type: type, + user_id: user_id, + app_id: app_id, + cluster_id: cluster_id + ), + payload: payload + ) + ) do {:basic_deliver, payload, %{ consumer_tag: consumer_tag, @@ -80,33 +80,33 @@ defmodule AMQP.ConsumerHelper do end def compose_message( - basic_return( - reply_code: reply_code, - reply_text: reply_text, - exchange: exchange, - routing_key: routing_key - ), - amqp_msg( - props: - p_basic( - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - delivery_mode: delivery_mode, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - ), - payload: payload - ) - ) do + basic_return( + reply_code: reply_code, + reply_text: reply_text, + exchange: exchange, + routing_key: routing_key + ), + amqp_msg( + props: + p_basic( + content_type: content_type, + content_encoding: content_encoding, + headers: headers, + delivery_mode: delivery_mode, + priority: priority, + correlation_id: correlation_id, + reply_to: reply_to, + expiration: expiration, + message_id: message_id, + timestamp: timestamp, + type: type, + user_id: user_id, + app_id: app_id, + cluster_id: cluster_id + ), + payload: payload + ) + ) do {:basic_return, payload, %{ reply_code: reply_code, diff --git a/lib/amqp/direct_consumer.ex b/lib/amqp/direct_consumer.ex index 1db5bdb..254dd1d 100644 --- a/lib/amqp/direct_consumer.ex +++ b/lib/amqp/direct_consumer.ex @@ -118,7 +118,6 @@ defmodule AMQP.DirectConsumer do {:reply, :ok, consumer} end - def handle_call(_req, _from, consumer) do {:reply, {:error, :undefined}, consumer} end diff --git a/lib/amqp/selective_consumer.ex b/lib/amqp/selective_consumer.ex index ebb9035..585c307 100644 --- a/lib/amqp/selective_consumer.ex +++ b/lib/amqp/selective_consumer.ex @@ -8,7 +8,12 @@ defmodule AMQP.SelectiveConsumer do alias AMQP.{Channel, SelectiveConsumer} @behaviour :amqp_gen_consumer - defstruct consumers: %{}, unassigned: :undefined, monitors: %{}, default_consumer: :none, return_handler: :none, confirm_handler: :none + defstruct consumers: %{}, + unassigned: :undefined, + monitors: %{}, + default_consumer: :none, + return_handler: :none, + confirm_handler: :none @type t :: %SelectiveConsumer{ consumers: %{String.t() => pid}, @@ -166,7 +171,8 @@ defmodule AMQP.SelectiveConsumer do {:ok, status} end - def handle_info({basic_return() = method, message}, %{return_handler: pid} = status) when is_pid(pid) do + def handle_info({basic_return() = method, message}, %{return_handler: pid} = status) + when is_pid(pid) do composed = compose_message(method, message) send(pid, composed) diff --git a/test/selective_consumer_test.exs b/test/selective_consumer_test.exs index d0c9ca0..bf7fe57 100644 --- a/test/selective_consumer_test.exs +++ b/test/selective_consumer_test.exs @@ -30,6 +30,7 @@ defmodule AMQP.SelectiveConsumerTest do send(default_consumer, :stop) Channel.close(chan) end + :ok = Connection.close(conn) end) @@ -91,6 +92,7 @@ defmodule AMQP.SelectiveConsumerTest do # don't know how we can emit the message from the server so use send/2 # to emulate the message msg = basic_credit_drained(consumer_tag: consumer_tag) + consumer_pid = meta[:chan].pid |> :sys.get_state() From 477da36a4145e5f6a8b380702ee6bd770cfa6c40 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Thu, 24 Dec 2020 11:12:15 +0000 Subject: [PATCH 15/29] Add moduledoc --- lib/amqp/selective_consumer.ex | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/amqp/selective_consumer.ex b/lib/amqp/selective_consumer.ex index 585c307..3384559 100644 --- a/lib/amqp/selective_consumer.ex +++ b/lib/amqp/selective_consumer.ex @@ -1,6 +1,12 @@ defmodule AMQP.SelectiveConsumer do @moduledoc """ - TODO + This is an Elixir reimplementation of `:amqp_selective_consumer` - [source](https://github.com/rabbitmq/rabbitmq-server/blob/master/deps/amqp_client/src/amqp_selective_consumer.erl) + + The module is used by default when you open a channel via `AMQP.Channel.open/2` and allows you + to end consumer processes via `Basic.consume/4` and receive messages from a queue. + + Usually you don't have to pay attention to this module as the interaction would be made through + `AMQP.Channel.open/2`, `Basic.consume/4`, `Basic.return/2`, `Confirm.register_handler/2` etc. """ import AMQP.Core From 20cdc3188c5ecb770318c80ca1fcc3da816596b6 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Thu, 24 Dec 2020 15:44:01 +0000 Subject: [PATCH 16/29] Handle nowait properly --- lib/amqp/basic.ex | 4 +-- lib/amqp/exchange.ex | 57 ++++++++++++++++++++++++++--------------- lib/amqp/queue.ex | 61 ++++++++++++++++++++++++++++---------------- test/queue_test.exs | 19 ++++++++++++++ 4 files changed, 97 insertions(+), 44 deletions(-) diff --git a/lib/amqp/basic.ex b/lib/amqp/basic.ex index 3d1e96f..6e98b2a 100644 --- a/lib/amqp/basic.ex +++ b/lib/amqp/basic.ex @@ -372,8 +372,8 @@ defmodule AMQP.Basic do end case {nowait, :amqp_channel.subscribe(chan.pid, basic_consume, pid)} do - {false, basic_consume_ok(consumer_tag: consumer_tag)} -> {:ok, consumer_tag} {true, :ok} -> {:ok, consumer_tag} + {_, basic_consume_ok(consumer_tag: consumer_tag)} -> {:ok, consumer_tag} {_, error} -> {:error, error} end end @@ -398,8 +398,8 @@ defmodule AMQP.Basic do basic_cancel = basic_cancel(consumer_tag: consumer_tag, nowait: nowait) case {nowait, :amqp_channel.call(pid, basic_cancel)} do - {false, basic_cancel_ok(consumer_tag: consumer_tag)} -> {:ok, consumer_tag} {true, :ok} -> {:ok, consumer_tag} + {_, basic_cancel_ok(consumer_tag: consumer_tag)} -> {:ok, consumer_tag} {_, error} -> {:error, error} end end diff --git a/lib/amqp/exchange.ex b/lib/amqp/exchange.ex index b1d6c5d..672795e 100644 --- a/lib/amqp/exchange.ex +++ b/lib/amqp/exchange.ex @@ -25,7 +25,7 @@ defmodule AMQP.Exchange do * `:auto_delete` - If set, deletes the Exchange once all queues unbind from it; * `:passive` - If set, returns an error if the Exchange does not already exist; * `:internal` - If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications. - * `:no_wait` - If set, the declare operation is asynchronous. Defaults to + * `:nowait` - If set, the declare operation is asynchronous. Defaults to `false`. * `:arguments` - A list of arguments to pass when declaring (of type `t:AMQP.arguments/0`). See the README for more information. Defaults to `[]`. @@ -33,6 +33,8 @@ defmodule AMQP.Exchange do """ @spec declare(Channel.t(), Basic.exchange(), type :: atom, keyword) :: :ok | Basic.error() def declare(%Channel{pid: pid}, exchange, type \\ :direct, options \\ []) do + nowait = get_nowait(options) + exchange_declare = exchange_declare( exchange: exchange, @@ -41,13 +43,14 @@ defmodule AMQP.Exchange do durable: Keyword.get(options, :durable, false), auto_delete: Keyword.get(options, :auto_delete, false), internal: Keyword.get(options, :internal, false), - nowait: Keyword.get(options, :no_wait, false), + nowait: nowait, arguments: Keyword.get(options, :arguments, []) ) - case :amqp_channel.call(pid, exchange_declare) do - exchange_declare_ok() -> :ok - error -> {:error, error} + case {nowait, :amqp_channel.call(pid, exchange_declare)} do + {true, :ok} -> :ok + {_, exchange_declare_ok()} -> :ok + {_, error} -> {:error, error} end end @@ -59,22 +62,25 @@ defmodule AMQP.Exchange do * `:if_unused` - If set, the server will only delete the exchange if it has no queue bindings. - * `:no_wait` - If set, the delete operation is asynchronous. Defaults to + * `:nowait` - If set, the delete operation is asynchronous. Defaults to `false`. """ @spec delete(Channel.t(), Basic.exchange(), keyword) :: :ok | Basic.error() def delete(%Channel{pid: pid}, exchange, options \\ []) do + nowait = get_nowait(options) + exchange_delete = exchange_delete( exchange: exchange, if_unused: Keyword.get(options, :if_unused, false), - nowait: Keyword.get(options, :no_wait, false) + nowait: nowait ) - case :amqp_channel.call(pid, exchange_delete) do - exchange_delete_ok() -> :ok - error -> {:error, error} + case {nowait, :amqp_channel.call(pid, exchange_delete)} do + {true, :ok} -> :ok + {_, exchange_delete_ok()} -> :ok + {_, error} -> {:error, error} end end @@ -85,7 +91,7 @@ defmodule AMQP.Exchange do ## Options * `:routing_key` - the routing key to use for the binding. Defaults to `""`. - * `:no_wait` - If set, the bind operation is asynchronous. Defaults to + * `:nowait` - If set, the bind operation is asynchronous. Defaults to `false`. * `:arguments` - A list of arguments to pass when binding (of type `t:AMQP.arguments/0`). See the README for more information. Defaults to `[]`. @@ -94,18 +100,21 @@ defmodule AMQP.Exchange do @spec bind(Channel.t(), destination :: String.t(), source :: String.t(), keyword) :: :ok | Basic.error() def bind(%Channel{pid: pid}, destination, source, options \\ []) do + nowait = get_nowait(options) + exchange_bind = exchange_bind( destination: destination, source: source, routing_key: Keyword.get(options, :routing_key, ""), - nowait: Keyword.get(options, :no_wait, false), + nowait: nowait, arguments: Keyword.get(options, :arguments, []) ) - case :amqp_channel.call(pid, exchange_bind) do - exchange_bind_ok() -> :ok - error -> {:error, error} + case {nowait, :amqp_channel.call(pid, exchange_bind)} do + {true, :ok} -> :ok + {_, exchange_bind_ok()} -> :ok + {_, error} -> {:error, error} end end @@ -116,7 +125,7 @@ defmodule AMQP.Exchange do ## Options * `:routing_key` - the routing key to use for the binding. Defaults to `""`. - * `:no_wait` - If set, the declare operation is asynchronous. Defaults to + * `:nowait` - If set, the declare operation is asynchronous. Defaults to `false`. * `:arguments` - A list of arguments to pass when declaring (of type `t:AMQP.arguments/0`). See the README for more information. Defaults to `[]`. @@ -125,18 +134,21 @@ defmodule AMQP.Exchange do @spec unbind(Channel.t(), destination :: String.t(), source :: String.t(), keyword) :: :ok | Basic.error() def unbind(%Channel{pid: pid}, destination, source, options \\ []) do + nowait = get_nowait(options) + exchange_unbind = exchange_unbind( destination: destination, source: source, routing_key: Keyword.get(options, :routing_key, ""), - nowait: Keyword.get(options, :no_wait, false), + nowait: nowait, arguments: Keyword.get(options, :arguments, []) ) - case :amqp_channel.call(pid, exchange_unbind) do - exchange_unbind_ok() -> :ok - error -> {:error, error} + case {nowait, :amqp_channel.call(pid, exchange_unbind)} do + {true, :ok} -> :ok + {_, exchange_unbind_ok()} -> :ok + {_, error} -> {:error, error} end end @@ -175,4 +187,9 @@ defmodule AMQP.Exchange do def topic(%Channel{} = channel, exchange, options \\ []) do declare(channel, exchange, :topic, options) end + + # support backward compatibility with old key name + defp get_nowait(opts) do + Keyword.get(opts, :nowait, false) || Keyword.get(opts, :no_wait, false) + end end diff --git a/lib/amqp/queue.ex b/lib/amqp/queue.ex index d69d527..d9c51d8 100644 --- a/lib/amqp/queue.ex +++ b/lib/amqp/queue.ex @@ -23,14 +23,16 @@ defmodule AMQP.Queue do Defaults to `false`. * `:passive` - If set, raises an error unless the queue already exists. Defaults to `false`. - * `:no_wait` - If set, the declare operation is asynchronous. Defaults to + * `:nowait` - If set, the declare operation is asynchronous. Defaults to `false`. * `:arguments` - A list of arguments to pass when declaring (of type `t:AMQP.arguments/0`). See the README for more information. Defaults to `[]`. """ - @spec declare(Channel.t(), Basic.queue(), keyword) :: {:ok, map} | Basic.error() + @spec declare(Channel.t(), Basic.queue(), keyword) :: {:ok, map} | :ok | Basic.error() def declare(%Channel{pid: pid}, queue \\ "", options \\ []) do + nowait = get_nowait(options) + queue_declare = queue_declare( queue: queue, @@ -38,19 +40,23 @@ defmodule AMQP.Queue do durable: Keyword.get(options, :durable, false), exclusive: Keyword.get(options, :exclusive, false), auto_delete: Keyword.get(options, :auto_delete, false), - nowait: Keyword.get(options, :no_wait, false), + nowait: nowait, arguments: Keyword.get(options, :arguments, []) |> Utils.to_type_tuple() ) - case :amqp_channel.call(pid, queue_declare) do - queue_declare_ok( - queue: queue, - message_count: message_count, - consumer_count: consumer_count - ) -> + case {nowait, :amqp_channel.call(pid, queue_declare)} do + {true, :ok} -> + :ok + + {_, + queue_declare_ok( + queue: queue, + message_count: message_count, + consumer_count: consumer_count + )} -> {:ok, %{queue: queue, message_count: message_count, consumer_count: consumer_count}} - error -> + {_, error} -> {:error, error} end end @@ -62,25 +68,28 @@ defmodule AMQP.Queue do * `:routing_key` - The routing key used to bind the queue to the exchange. Defaults to `""`. - * `:no_wait` - If `true`, the binding is not synchronous. Defaults to `false`. + * `:nowait` - If `true`, the binding is not synchronous. Defaults to `false`. * `:arguments` - A list of arguments to pass when binding (of type `t:AMQP.arguments/0`). See the README for more information. Defaults to `[]`. """ @spec bind(Channel.t(), Basic.queue(), Basic.exchange(), keyword) :: :ok | Basic.error() def bind(%Channel{pid: pid}, queue, exchange, options \\ []) do + nowait = get_nowait(options) + queue_bind = queue_bind( queue: queue, exchange: exchange, routing_key: Keyword.get(options, :routing_key, ""), - nowait: Keyword.get(options, :no_wait, false), + nowait: nowait, arguments: Keyword.get(options, :arguments, []) |> Utils.to_type_tuple() ) - case :amqp_channel.call(pid, queue_bind) do - queue_bind_ok() -> :ok - error -> {:error, error} + case {nowait, :amqp_channel.call(pid, queue_bind)} do + {true, :ok} -> :ok + {_, queue_bind_ok()} -> :ok + {_, error} -> {:error, error} end end @@ -119,22 +128,25 @@ defmodule AMQP.Queue do consumers. If the queue has consumers, it's not deleted and an error is returned. * `:if_empty` - If set, the server will only delete the queue if it has no messages. - * `:no_wait` - If set, the delete operation is asynchronous. + * `:nowait` - If set, the delete operation is asynchronous. """ - @spec delete(Channel.t(), Basic.queue(), keyword) :: {:ok, map} | Basic.error() + @spec delete(Channel.t(), Basic.queue(), keyword) :: {:ok, map} | :ok | Basic.error() def delete(%Channel{pid: pid}, queue, options \\ []) do + nowait = get_nowait(options) + queue_delete = queue_delete( queue: queue, if_unused: Keyword.get(options, :if_unused, false), if_empty: Keyword.get(options, :if_empty, false), - nowait: Keyword.get(options, :no_wait, false) + nowait: nowait ) - case :amqp_channel.call(pid, queue_delete) do - queue_delete_ok(message_count: message_count) -> {:ok, %{message_count: message_count}} - error -> {:error, error} + case {nowait, :amqp_channel.call(pid, queue_delete)} do + {true, :ok} -> :ok + {_, queue_delete_ok(message_count: message_count)} -> {:ok, %{message_count: message_count}} + {_, error} -> {:error, error} end end @@ -228,7 +240,7 @@ defmodule AMQP.Queue do do_consume(channel, fun, consumer_tag) - {:basic_cancel, %{consumer_tag: ^consumer_tag, no_wait: _}} -> + {:basic_cancel, %{consumer_tag: ^consumer_tag}} -> exit(:basic_cancel) {:basic_cancel_ok, %{consumer_tag: ^consumer_tag}} -> @@ -245,4 +257,9 @@ defmodule AMQP.Queue do def unsubscribe(%Channel{} = channel, consumer_tag) do Basic.cancel(channel, consumer_tag) end + + # support backward compatibility with old key name + defp get_nowait(opts) do + Keyword.get(opts, :nowait, false) || Keyword.get(opts, :no_wait, false) + end end diff --git a/test/queue_test.exs b/test/queue_test.exs index a15ff8f..0c0fb35 100644 --- a/test/queue_test.exs +++ b/test/queue_test.exs @@ -20,6 +20,11 @@ defmodule QueueTest do assert {:ok, %{message_count: 0}} = Queue.delete(meta[:chan], queue) end + test "delare queue with nowait option", meta do + assert :ok = Queue.declare(meta[:chan], "hello", nowait: true) + assert :ok = Queue.delete(meta[:chan], "hello", nowait: true) + end + test "declare queue with explicitly assigned name", meta do name = rand_name() @@ -43,6 +48,20 @@ defmodule QueueTest do assert {:ok, %{message_count: 0}} = Queue.delete(meta[:chan], queue) end + test "bind with nowait option", meta do + queue = rand_name() + exchange = rand_name() + assert :ok = Exchange.fanout(meta[:chan], exchange) + + assert {:ok, %{queue: ^queue, message_count: 0, consumer_count: 0}} = + Queue.declare(meta[:chan], queue) + + assert :ok = Queue.bind(meta[:chan], queue, exchange, nowait: true) + assert :ok = Queue.unbind(meta[:chan], queue, exchange) + assert :ok = Exchange.delete(meta[:chan], exchange) + assert {:ok, %{message_count: 0}} = Queue.delete(meta[:chan], queue) + end + test "status returns message count and consumer count", meta do queue = rand_name() From 5d816e2da7288422ac721d4e19ce82ab79f1436e Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Thu, 24 Dec 2020 16:07:05 +0000 Subject: [PATCH 17/29] Doesn't check the reason (it doesn't seem to be consistent) --- test/basic_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/basic_test.exs b/test/basic_test.exs index 19433a9..b32fa52 100644 --- a/test/basic_test.exs +++ b/test/basic_test.exs @@ -103,7 +103,7 @@ defmodule BasicTest do Channel.close(meta[:chan]) Process.flag(:trap_exit, true) - assert {:normal, _} = catch_exit(Basic.cancel(meta[:chan], consumer_tag)) + assert catch_exit(Basic.cancel(meta[:chan], consumer_tag)) end end end From fa1336042318ca15d86ba78eaf04883d9f21128b Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Sun, 27 Dec 2020 11:06:29 +0000 Subject: [PATCH 18/29] Accept integer for expiration and DateTime for timestamp --- lib/amqp/basic.ex | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/lib/amqp/basic.ex b/lib/amqp/basic.ex index 6e98b2a..9495c82 100644 --- a/lib/amqp/basic.ex +++ b/lib/amqp/basic.ex @@ -73,9 +73,9 @@ defmodule AMQP.Basic do priority: Keyword.get(options, :priority, :undefined), correlation_id: Keyword.get(options, :correlation_id, :undefined), reply_to: Keyword.get(options, :reply_to, :undefined), - expiration: Keyword.get(options, :expiration, :undefined), + expiration: Keyword.get(options, :expiration, :undefined) |> number_to_s(), message_id: Keyword.get(options, :message_id, :undefined), - timestamp: Keyword.get(options, :timestamp, :undefined), + timestamp: Keyword.get(options, :timestamp, :undefined) |> to_epoch(), type: Keyword.get(options, :type, :undefined), user_id: Keyword.get(options, :user_id, :undefined), app_id: Keyword.get(options, :app_id, :undefined), @@ -422,4 +422,10 @@ defmodule AMQP.Basic do def cancel_return(%Channel{pid: pid}) do :amqp_channel.unregister_return_handler(pid) end + + defp number_to_s(value) when is_number(value), do: to_string(value) + defp number_to_s(value), do: value + + defp to_epoch(%DateTime{} = value), do: DateTime.to_unix(value) + defp to_epoch(value), do: value end From 9b3a4cad8ce514dec74689f710bdb6e69c04d8b4 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Mon, 28 Dec 2020 11:12:15 +0000 Subject: [PATCH 19/29] Disable Erlang library's pregress report by default --- README.md | 33 ++++++++++++++++++++------------- lib/amqp/application.ex | 29 +++++++++++++++++++++++++++++ mix.exs | 2 +- 3 files changed, 50 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 379f815..7a34a17 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,10 @@ Simple Elixir wrapper for the Erlang RabbitMQ client. The API is based on Langohr, a Clojure client for RabbitMQ. +## Migration from 1.X to 2.X + +TO BE WRITTEN + ## Migration from 0.X to 1.X If you use amqp 0.X and plan to migrate to 1.0 please read our [migration guide](https://github.com/pma/amqp/wiki/Upgrade-from-0.X-to-1.0). @@ -167,7 +171,21 @@ Error converting Hello, World! to integer Error converting Hello, World! to integer ``` -## Stable RabbitMQ Connection +### Configuration + +#### Erlang library's progress report + +This library uses an official Erlang RabbitMQ client library internally and we found its logging is too verbose. +These are called progress reports by the Erlang library and you would see a lot of entries with info log level if you +use 1.x version. +AMQP disables that by default from version 2.0. +If you want to see more detailed logs, you can enable it by adding the following line on your config. + +```elixir +config :amqp, enable_progress_report: true +``` + +### Stable RabbitMQ Connection While the above example works, it does nothing to handle RabbitMQ connection outages. In case of an outage your Genserver will remain stale and won't @@ -235,7 +253,7 @@ When the connection drops or the server is down, the GenServer will stop. If you have put the GenServer module to your application tree, the Supervisor will automatically restart it. Then it will try to reconnect indefinitely until it succeeds. -## Types of arguments and headers +### Types of arguments and headers The parameter `arguments` in `Queue.declare`, `Exchange.declare`, `Basic.consume` and the parameter `headers` in `Basic.publish` are a list of tuples in the form `{name, type, value}`, where `name` is a binary containing the argument/header name, `type` is an atom describing the AMQP field type and `value` a term compatible with the AMQP field type. @@ -312,17 +330,6 @@ Try the following configuration. config :logger, handle_otp_reports: false ``` -Or try filtering out the messages at your application start: - -```elixir -:logger.add_primary_filter( - :ignore_rabbitmq_progress_reports, - {&:logger_filters.domain/2, {:stop, :equal, [:progress]}} -) -``` - -See [this comment](https://github.com/pma/amqp/issues/110#issuecomment-442761299) for the -details. #### Lager conflicts with Elixir logger diff --git a/lib/amqp/application.ex b/lib/amqp/application.ex index f17036d..36cb55e 100644 --- a/lib/amqp/application.ex +++ b/lib/amqp/application.ex @@ -2,12 +2,41 @@ defmodule AMQP.Application do @moduledoc false use Application + require Logger @impl true def start(_type, _args) do children = [] + load_config() + opts = [strategy: :one_for_one, name: AMQP.Application] Supervisor.start_link(children, opts) end + + defp load_config do + unless Application.get_env(:amqp, :enable_progress_report, false) do + disable_progress_report() + end + end + + @doc """ + Disables the progress report logging from Erlang library. + + The log outputs are very verbose and can contain credentials. + This AMQP library recommends to disable unless you want the information. + """ + @spec disable_progress_report :: :ok | {:error, any} + def disable_progress_report do + :logger.add_primary_filter( + :amqp_ignore_rabbitmq_progress_reports, + {&:logger_filters.domain/2, {:stop, :equal, [:progress]}} + ) + + :ok + rescue + e -> + Logger.warn("Failed to disable progress report by Erlang library: detail: #{inspect(e)}") + {:error, e} + end end diff --git a/mix.exs b/mix.exs index bdfb077..1a46b47 100644 --- a/mix.exs +++ b/mix.exs @@ -35,7 +35,7 @@ defmodule AMQP.Mixfile do def application do [ - applications: [:lager, :amqp_client], + applications: [:lager, :amqp_client, :logger], mod: {AMQP.Application, []} ] end From 78e79fa7e9861b1f000b53667099ed18b90bd27d Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Tue, 29 Dec 2020 10:04:09 +0000 Subject: [PATCH 20/29] Provide a function to re-enable the report --- lib/amqp/application.ex | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/lib/amqp/application.ex b/lib/amqp/application.ex index 36cb55e..f7a6898 100644 --- a/lib/amqp/application.ex +++ b/lib/amqp/application.ex @@ -32,11 +32,22 @@ defmodule AMQP.Application do :amqp_ignore_rabbitmq_progress_reports, {&:logger_filters.domain/2, {:stop, :equal, [:progress]}} ) - - :ok rescue e -> Logger.warn("Failed to disable progress report by Erlang library: detail: #{inspect(e)}") {:error, e} end + + @doc """ + Enables the progress report logging from Erlang library. + """ + @spec enable_progress_report :: :ok | {:error, any} + def enable_progress_report do + case :logger.remove_primary_filter(:amqp_ignore_rabbitmq_progress_reports) do + :ok -> :ok + # filter already removed + {:error, {:not_found, _}} -> :ok + error -> error + end + end end From b82ffa2e2ed78c0362f3188e8a238f1a429ac828 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Tue, 29 Dec 2020 14:52:11 +0000 Subject: [PATCH 21/29] Create a connection from config --- lib/amqp/application.ex | 15 +++- lib/amqp/application/connection.ex | 122 +++++++++++++++++++++++++++++ 2 files changed, 135 insertions(+), 2 deletions(-) create mode 100644 lib/amqp/application/connection.ex diff --git a/lib/amqp/application.ex b/lib/amqp/application.ex index f7a6898..b129677 100644 --- a/lib/amqp/application.ex +++ b/lib/amqp/application.ex @@ -6,9 +6,8 @@ defmodule AMQP.Application do @impl true def start(_type, _args) do - children = [] - load_config() + children = load_connections() |> IO.inspect() opts = [strategy: :one_for_one, name: AMQP.Application] Supervisor.start_link(children, opts) @@ -20,6 +19,18 @@ defmodule AMQP.Application do end end + defp load_connections do + conn = Application.get_env(:amqp, :connection) + conns = Application.get_env(:amqp, :connections, []) + conns = if conn, do: conns ++ [default: conn], else: conns + + Enum.map(conns, fn {name, opts} -> + arg = opts ++ [proc_name: name] + id = AMQP.Application.Connection.get_server_name(name) + Supervisor.child_spec({AMQP.Application.Connection, arg}, id: id) + end) + end + @doc """ Disables the progress report logging from Erlang library. diff --git a/lib/amqp/application/connection.ex b/lib/amqp/application/connection.ex new file mode 100644 index 0000000..7352cf1 --- /dev/null +++ b/lib/amqp/application/connection.ex @@ -0,0 +1,122 @@ +defmodule AMQP.Application.Connection do + @moduledoc """ + + """ + + use GenServer + require Logger + alias AMQP.Connection + + @default_interval 5_000 + + @doc """ + Starts a GenServer process linked to the current process. + + It expects options to be a combination of connection args, proc_name and retry_interval. + + ## Examples + + Combines name and retry interval with the connection options. + + iex> opts = [proc_name: :my_conn, retry_interval: 10_000, host: "localhost"] + iex> :ok = AMQP.Application.Connection.start_link(opts) + iex> {:ok, conn} = AMQP.Application.Connection.get_connection(:my_conn) + + Passes URL instead of options and use a default proc name when you need only a single connection. + + iex> opts = [url: "amqp://guest:guest@localhost"] + iex> {:ok, conn} = AMQP.Application.Connection.get_connection() + iex> {:ok, conn} = AMQP.Application.Connection.get_connection(:default) + """ + @spec start_link(keyword) :: GenServer.on_start() + def start_link(opts) do + {name, init_arg} = link_opts_to_init_arg(opts) + + GenServer.start_link(__MODULE__, init_arg, name: name) + end + + defp link_opts_to_init_arg(opts) do + proc_name = Keyword.get(opts, :proc_name, :default) + server_name = get_server_name(proc_name) + retry_interval = Keyword.get(opts, :retry_interval, @default_interval) + open_arg = Keyword.drop(opts, [:proc_name, :retry_interval]) + + init_arg = %{ + retry_interval: retry_interval, + open_arg: open_arg, + name: proc_name, + connection: nil + } + + {server_name, init_arg} + end + + @doc """ + Returns a GenServer reference for the connection name + """ + @spec get_server_name(binary | atom) :: binary + def get_server_name(name) do + :"amqp_connection_#{name}" + end + + @doc false + def get_state(name \\ :default) do + GenServer.call(get_server_name(name), :get_state) + end + + @doc """ + Returns a connection referred by the name. + """ + @spec get_connection(binary | atom) :: {:ok, Connection.t()} | {:error, any} + def get_connection(name \\ :default) do + case GenServer.call(get_server_name(name), :get_connection) do + nil -> {:error, :not_connected} + conn -> {:ok, conn} + end + end + + @impl true + def init(state) do + send(self(), :connect) + {:ok, state} + end + + @impl true + def handle_call(:get_state, _, state) do + {:reply, state, state} + end + + def handle_call(:get_connection, _, state) do + {:reply, state[:connection], state} + end + + @impl true + def handle_info(:connect, state) do + case do_open(state[:open_arg]) do + {:ok, conn} -> + # Get notifications when the connection goes down + Process.monitor(conn.pid) + {:noreply, %{state | connection: conn}} + + {:error, _} -> + Logger.error("Failed to connect to AMQP server (#{state[:name]}). Retrying later...") + + # Retry later + Process.send_after(self(), :connect, state[:retry_interval]) + {:noreply, state} + end + end + + def handle_info({:DOWN, _, :process, _pid, reason}, _) do + # Stop GenServer. Will be restarted by Supervisor. + {:stop, {:connection_lost, reason}, nil} + end + + defp do_open(options) do + if url = options[:url] do + Connection.open(url, Keyword.delete(options, :url)) + else + Connection.open(options) + end + end +end From 1b31d81e0e3db21bfc165782977423d36dbe0e4b Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Tue, 29 Dec 2020 17:14:48 +0000 Subject: [PATCH 22/29] Channel management --- lib/amqp/application.ex | 22 ++++- lib/amqp/application/channel.ex | 130 +++++++++++++++++++++++++++++ lib/amqp/application/connection.ex | 23 +++-- 3 files changed, 167 insertions(+), 8 deletions(-) create mode 100644 lib/amqp/application/channel.ex diff --git a/lib/amqp/application.ex b/lib/amqp/application.ex index b129677..34c1d3f 100644 --- a/lib/amqp/application.ex +++ b/lib/amqp/application.ex @@ -7,9 +7,15 @@ defmodule AMQP.Application do @impl true def start(_type, _args) do load_config() - children = load_connections() |> IO.inspect() + children = load_connections() ++ load_channels() + + opts = [ + strategy: :one_for_one, + name: AMQP.Application, + max_restarts: length(children) * 2, + max_seconds: 1 + ] - opts = [strategy: :one_for_one, name: AMQP.Application] Supervisor.start_link(children, opts) end @@ -31,6 +37,18 @@ defmodule AMQP.Application do end) end + defp load_channels do + chan = Application.get_env(:amqp, :channel) + chans = Application.get_env(:amqp, :channels, []) + chans = if chan, do: chans ++ [default: chan], else: chans + + Enum.map(chans, fn {name, opts} -> + arg = opts ++ [proc_name: name] + id = AMQP.Application.Channel.get_server_name(name) + Supervisor.child_spec({AMQP.Application.Channel, arg}, id: id) + end) + end + @doc """ Disables the progress report logging from Erlang library. diff --git a/lib/amqp/application/channel.ex b/lib/amqp/application/channel.ex new file mode 100644 index 0000000..fb3e924 --- /dev/null +++ b/lib/amqp/application/channel.ex @@ -0,0 +1,130 @@ +defmodule AMQP.Application.Channel do + @moduledoc false + + use GenServer + require Logger + alias AMQP.{Channel, Connection} + + @default_interval 5_000 + + @doc """ + Starts a GenServer process linked to the current process. + + ## Examples + + Combines name and retry interval with the connection options. + + iex> opts = [proc_name: :my_chan, retry_interval: 10_000, connection: :my_conn] + iex> :ok = AMQP.Application.Channel.start_link(opts) + iex> {:ok, chan} = AMQP.Application.Connection.get_connection(:my_chan) + + If you omit the proc_name, it uses :default. + + iex> :ok = AMQP.Application.Channel.start_link([]) + iex> {:ok, chan} = AMQP.Application.Connection.get_channel() + iex> {:ok, chan} = AMQP.Application.Connection.get_channel(:default) + """ + @spec start_link(keyword) :: GenServer.on_start() + def start_link(opts) do + {name, init_arg} = link_opts_to_init_arg(opts) + + GenServer.start_link(__MODULE__, init_arg, name: name) + end + + defp link_opts_to_init_arg(opts) do + proc_name = Keyword.get(opts, :proc_name, :default) + server_name = get_server_name(proc_name) + retry_interval = Keyword.get(opts, :retry_interval, @default_interval) + connection = Keyword.get(opts, :connection, proc_name) + + init_arg = %{ + retry_interval: retry_interval, + connection: connection, + name: proc_name, + channel: nil + } + + {server_name, init_arg} + end + + @doc """ + Returns a GenServer reference for the channel name + """ + @spec get_server_name(binary | atom) :: binary + def get_server_name(name) do + :"#{__MODULE__}::#{name}" + end + + @doc false + def get_state(name \\ :default) do + GenServer.call(get_server_name(name), :get_state) + end + + @doc """ + Returns pid for the server referred by the name. + + It is a wrapper of `GenServer.whereis/1`. + """ + @spec whereis(binary() | atom()) :: pid() | {atom(), node()} | nil + def whereis(name) do + name + |> get_server_name() + |> GenServer.whereis() + end + + @doc """ + Returns a channel referred by the name. + """ + @spec get_channel(binary | atom) :: {:ok, Connection.t()} | {:error, any} + def get_channel(name \\ :default) do + case GenServer.call(get_server_name(name), :get_channel) do + nil -> {:error, :channel_not_ready} + channel -> {:ok, channel} + end + end + + @impl true + def init(state) do + send(self(), :open) + {:ok, state} + end + + @impl true + def handle_call(:get_state, _, state) do + {:reply, state, state} + end + + def handle_call(:get_channel, _, state) do + {:reply, state[:channel], state} + end + + @impl true + def handle_info(:open, state) do + case AMQP.Application.Connection.get_connection(state[:connection]) do + {:ok, conn} -> + case Channel.open(conn) do + {:ok, chan} -> + Process.monitor(chan.pid) + {:noreply, %{state | channel: chan}} + + {:error, error} -> + Logger.error("Failed to open an AMQP channel(#{state[:name]}) - #{inspect(error)}") + Process.send_after(self(), :open, state[:retry_interval]) + {:noreply, state} + end + + _error -> + Logger.error( + "Failed to open an AMQP channel(#{state[:name]}). Connection (#{state[:connection]}) is not ready." + ) + + Process.send_after(self(), :open, state[:retry_interval]) + {:noreply, state} + end + end + + def handle_info({:DOWN, _, :process, _pid, reason}, state) do + # Stop GenServer. Will be restarted by Supervisor. + {:stop, {:channel_gone, reason}, nil} + end +end diff --git a/lib/amqp/application/connection.ex b/lib/amqp/application/connection.ex index 7352cf1..5a04450 100644 --- a/lib/amqp/application/connection.ex +++ b/lib/amqp/application/connection.ex @@ -1,7 +1,5 @@ defmodule AMQP.Application.Connection do - @moduledoc """ - - """ + @moduledoc false use GenServer require Logger @@ -25,6 +23,7 @@ defmodule AMQP.Application.Connection do Passes URL instead of options and use a default proc name when you need only a single connection. iex> opts = [url: "amqp://guest:guest@localhost"] + iex> :ok = AMQP.Application.Connection.start_link(opts) iex> {:ok, conn} = AMQP.Application.Connection.get_connection() iex> {:ok, conn} = AMQP.Application.Connection.get_connection(:default) """ @@ -56,7 +55,7 @@ defmodule AMQP.Application.Connection do """ @spec get_server_name(binary | atom) :: binary def get_server_name(name) do - :"amqp_connection_#{name}" + :"#{__MODULE__}::#{name}" end @doc false @@ -64,6 +63,18 @@ defmodule AMQP.Application.Connection do GenServer.call(get_server_name(name), :get_state) end + @doc """ + Returns pid for the server referred by the name. + + It is a wrapper of `GenServer.whereis/1`. + """ + @spec whereis(binary() | atom()) :: pid() | {atom(), node()} | nil + def whereis(name) do + name + |> get_server_name() + |> GenServer.whereis() + end + @doc """ Returns a connection referred by the name. """ @@ -107,9 +118,9 @@ defmodule AMQP.Application.Connection do end end - def handle_info({:DOWN, _, :process, _pid, reason}, _) do + def handle_info({:DOWN, _, :process, _pid, reason}, state) do # Stop GenServer. Will be restarted by Supervisor. - {:stop, {:connection_lost, reason}, nil} + {:stop, {:connection_gone, reason}, nil} end defp do_open(options) do From 73bd733a733f16276ddafda9f3040044e4fb02cb Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Sun, 3 Jan 2021 13:36:42 +0000 Subject: [PATCH 23/29] Test Application.Connection --- lib/amqp/application/channel.ex | 2 +- lib/amqp/application/connection.ex | 30 +++++++++++++++++++++++----- test/application/connection_test.exs | 24 ++++++++++++++++++++++ 3 files changed, 50 insertions(+), 6 deletions(-) create mode 100644 test/application/connection_test.exs diff --git a/lib/amqp/application/channel.ex b/lib/amqp/application/channel.ex index fb3e924..3fd19c0 100644 --- a/lib/amqp/application/channel.ex +++ b/lib/amqp/application/channel.ex @@ -123,7 +123,7 @@ defmodule AMQP.Application.Channel do end end - def handle_info({:DOWN, _, :process, _pid, reason}, state) do + def handle_info({:DOWN, _, :process, _pid, reason}, _state) do # Stop GenServer. Will be restarted by Supervisor. {:stop, {:channel_gone, reason}, nil} end diff --git a/lib/amqp/application/connection.ex b/lib/amqp/application/connection.ex index 5a04450..c504710 100644 --- a/lib/amqp/application/connection.ex +++ b/lib/amqp/application/connection.ex @@ -17,7 +17,7 @@ defmodule AMQP.Application.Connection do Combines name and retry interval with the connection options. iex> opts = [proc_name: :my_conn, retry_interval: 10_000, host: "localhost"] - iex> :ok = AMQP.Application.Connection.start_link(opts) + iex> {:ok, pid} = AMQP.Application.Connection.start_link(opts) iex> {:ok, conn} = AMQP.Application.Connection.get_connection(:my_conn) Passes URL instead of options and use a default proc name when you need only a single connection. @@ -89,6 +89,7 @@ defmodule AMQP.Application.Connection do @impl true def init(state) do send(self(), :connect) + Process.flag(:trap_exit, true) {:ok, state} end @@ -110,7 +111,7 @@ defmodule AMQP.Application.Connection do {:noreply, %{state | connection: conn}} {:error, _} -> - Logger.error("Failed to connect to AMQP server (#{state[:name]}). Retrying later...") + Logger.error("Failed to open AMQP connection (#{state[:name]}). Retrying later...") # Retry later Process.send_after(self(), :connect, state[:retry_interval]) @@ -118,9 +119,28 @@ defmodule AMQP.Application.Connection do end end - def handle_info({:DOWN, _, :process, _pid, reason}, state) do - # Stop GenServer. Will be restarted by Supervisor. - {:stop, {:connection_gone, reason}, nil} + def handle_info({:DOWN, _, :process, pid, _reason}, %{connection: %{pid: pid}} = state) when is_pid(pid) do + Logger.info("AMQP connection is gone (#{state[:name]}). Reconnecting...") + send(self(), :connect) + {:noreply, %{state | connection: nil}} + end + + def handle_info({:EXIT, _from, reason}, state) do + close(state[:connection]) + {:stop, reason, %{state | connection: nil}} + end + + @impl true + def terminate(_reason, state) do + close(state[:connection]) + %{state | connection: nil} + end + + defp close(nil), do: :ok + defp close(connection) do + if Process.alive?(connection.pid) do + Connection.close(connection) + end end defp do_open(options) do diff --git a/test/application/connection_test.exs b/test/application/connection_test.exs new file mode 100644 index 0000000..d6ada4b --- /dev/null +++ b/test/application/connection_test.exs @@ -0,0 +1,24 @@ +defmodule AMQP.Application.ConnectionTest do + use ExUnit.Case + alias AMQP.Application.Connection, as: AppConn + + test "opens and accesses to connections" do + opts = [proc_name: :my_conn, retry_interval: 10_000, url: "amqp://guest:guest@localhost"] + {:ok, pid} = AppConn.start_link(opts) + + {:ok, conn} = AppConn.get_connection(:my_conn) + assert %AMQP.Connection{} = conn + + Process.exit(pid, :normal) + end + + test "reconnects when the connection is gone" do + {:ok, _pid} = AppConn.start_link([]) + {:ok, %AMQP.Connection{} = conn1} = AppConn.get_connection() + AMQP.Connection.close(conn1) + :timer.sleep(50) + + assert {:ok, %AMQP.Connection{} = conn2} = AppConn.get_connection() + refute conn1 == conn2 + end +end From 6cf8f4b47fb5a5403bdfb5cffb64baaf069d52ca Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Sun, 3 Jan 2021 14:11:08 +0000 Subject: [PATCH 24/29] Test channel --- lib/amqp/application/channel.ex | 34 ++++++++++++++++++++++++---- lib/amqp/application/connection.ex | 30 +++++++++++++----------- test/application/channel_test.exs | 34 ++++++++++++++++++++++++++++ test/application/connection_test.exs | 2 +- 4 files changed, 81 insertions(+), 19 deletions(-) create mode 100644 test/application/channel_test.exs diff --git a/lib/amqp/application/channel.ex b/lib/amqp/application/channel.ex index 3fd19c0..ee11ae0 100644 --- a/lib/amqp/application/channel.ex +++ b/lib/amqp/application/channel.ex @@ -41,6 +41,7 @@ defmodule AMQP.Application.Channel do retry_interval: retry_interval, connection: connection, name: proc_name, + monitor_ref: nil, channel: nil } @@ -86,6 +87,7 @@ defmodule AMQP.Application.Channel do @impl true def init(state) do send(self(), :open) + Process.flag(:trap_exit, true) {:ok, state} end @@ -104,8 +106,8 @@ defmodule AMQP.Application.Channel do {:ok, conn} -> case Channel.open(conn) do {:ok, chan} -> - Process.monitor(chan.pid) - {:noreply, %{state | channel: chan}} + ref = Process.monitor(chan.pid) + {:noreply, %{state | channel: chan, monitor_ref: ref}} {:error, error} -> Logger.error("Failed to open an AMQP channel(#{state[:name]}) - #{inspect(error)}") @@ -123,8 +125,30 @@ defmodule AMQP.Application.Channel do end end - def handle_info({:DOWN, _, :process, _pid, reason}, _state) do - # Stop GenServer. Will be restarted by Supervisor. - {:stop, {:channel_gone, reason}, nil} + def handle_info({:DOWN, _, :process, pid, _reason}, %{channel: %{pid: pid}} = state) + when is_pid(pid) do + Logger.info("AMQP channel is gone (#{state[:name]}). Reopening...") + send(self(), :open) + {:noreply, %{state | channel: nil, monitor_ref: nil}} + end + + def handle_info({:EXIT, _from, reason}, state) do + close(state) + {:stop, reason, %{state | channel: nil, monitor_ref: nil}} + end + + @impl true + def terminate(_reason, state) do + close(state) + %{state | channel: nil, monitor_ref: nil} end + + defp close(%{channel: %Channel{} = channel, monior_ref: ref}) do + if Process.alive?(channel.pid) do + Process.demonitor(ref) + Channel.close(channel) + end + end + + defp close(_), do: :ok end diff --git a/lib/amqp/application/connection.ex b/lib/amqp/application/connection.ex index c504710..ca0d3ca 100644 --- a/lib/amqp/application/connection.ex +++ b/lib/amqp/application/connection.ex @@ -44,7 +44,8 @@ defmodule AMQP.Application.Connection do retry_interval: retry_interval, open_arg: open_arg, name: proc_name, - connection: nil + connection: nil, + monitor_ref: nil } {server_name, init_arg} @@ -107,8 +108,8 @@ defmodule AMQP.Application.Connection do case do_open(state[:open_arg]) do {:ok, conn} -> # Get notifications when the connection goes down - Process.monitor(conn.pid) - {:noreply, %{state | connection: conn}} + ref = Process.monitor(conn.pid) + {:noreply, %{state | connection: conn, monitor_ref: ref}} {:error, _} -> Logger.error("Failed to open AMQP connection (#{state[:name]}). Retrying later...") @@ -119,30 +120,33 @@ defmodule AMQP.Application.Connection do end end - def handle_info({:DOWN, _, :process, pid, _reason}, %{connection: %{pid: pid}} = state) when is_pid(pid) do + def handle_info({:DOWN, _, :process, pid, _reason}, %{connection: %{pid: pid}} = state) + when is_pid(pid) do Logger.info("AMQP connection is gone (#{state[:name]}). Reconnecting...") send(self(), :connect) - {:noreply, %{state | connection: nil}} + {:noreply, %{state | connection: nil, monitor_ref: nil}} end def handle_info({:EXIT, _from, reason}, state) do - close(state[:connection]) - {:stop, reason, %{state | connection: nil}} + close(state) + {:stop, reason, %{state | connection: nil, monitor_ref: nil}} end @impl true def terminate(_reason, state) do - close(state[:connection]) - %{state | connection: nil} + close(state) + %{state | connection: nil, monitor_ref: nil} end - defp close(nil), do: :ok - defp close(connection) do - if Process.alive?(connection.pid) do - Connection.close(connection) + defp close(%{connection: %Connection{} = conn, monior_ref: ref}) do + if Process.alive?(conn.pid) do + Process.demonitor(ref) + Connection.close(conn) end end + defp close(_), do: :ok + defp do_open(options) do if url = options[:url] do Connection.open(url, Keyword.delete(options, :url)) diff --git a/test/application/channel_test.exs b/test/application/channel_test.exs new file mode 100644 index 0000000..7559de5 --- /dev/null +++ b/test/application/channel_test.exs @@ -0,0 +1,34 @@ +defmodule AMQP.Application.ChnnelTest do + use ExUnit.Case + alias AMQP.Application.Connection, as: AppConn + alias AMQP.Application.Channel, as: AppChan + + setup do + {:ok, app_conn_pid} = AppConn.start_link([]) + + on_exit(fn -> + Process.exit(app_conn_pid, :normal) + end) + + [app_conn: app_conn_pid] + end + + test "opens and accesses channel" do + opts = [connection: :default, proc_name: :test_chan] + {:ok, pid} = AppChan.start_link(opts) + + assert {:ok, %AMQP.Channel{}} = AppChan.get_channel(:test_chan) + Process.exit(pid, :normal) + end + + test "reconnects when the channel is gone" do + opts = [connection: :default, proc_name: :test_chan] + {:ok, _pid} = AppChan.start_link(opts) + {:ok, %AMQP.Channel{} = chan1} = AppChan.get_channel(:test_chan) + AMQP.Channel.close(chan1) + :timer.sleep(50) + + assert {:ok, %AMQP.Channel{} = chan2} = AppChan.get_channel(:test_chan) + refute chan1 == chan2 + end +end diff --git a/test/application/connection_test.exs b/test/application/connection_test.exs index d6ada4b..5efd0aa 100644 --- a/test/application/connection_test.exs +++ b/test/application/connection_test.exs @@ -2,7 +2,7 @@ defmodule AMQP.Application.ConnectionTest do use ExUnit.Case alias AMQP.Application.Connection, as: AppConn - test "opens and accesses to connections" do + test "opens and accesses connections" do opts = [proc_name: :my_conn, retry_interval: 10_000, url: "amqp://guest:guest@localhost"] {:ok, pid} = AppConn.start_link(opts) From 2355bfd21f9a60097f5885fcbd89243291721755 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Sun, 3 Jan 2021 16:02:19 +0000 Subject: [PATCH 25/29] Access via AMQP.Application --- lib/amqp/application.ex | 159 ++++++++++++++++++++++++++++- lib/amqp/application/channel.ex | 18 ++-- lib/amqp/application/connection.ex | 8 +- 3 files changed, 177 insertions(+), 8 deletions(-) diff --git a/lib/amqp/application.ex b/lib/amqp/application.ex index 34c1d3f..0782627 100644 --- a/lib/amqp/application.ex +++ b/lib/amqp/application.ex @@ -1,5 +1,7 @@ defmodule AMQP.Application do - @moduledoc false + @moduledoc """ + Provides access to configured connections and channels. + """ use Application require Logger @@ -79,4 +81,159 @@ defmodule AMQP.Application do error -> error end end + + @doc """ + Provides an easy way to access an AMQP connection. + + The connection will be monitored by AMQP's GenServer and it will automatically try to reconnect when the connection is gone. + + ## Usage + + When you want to have a single connection in your app: + + config :amqp, connection: [ + url: "amqp://guest:guest@localhost:15672" + ] + + You can also use any options available on `AMQP.Connection.open/2`: + + config :amqp, connection: [ + host: "localhost", + port: 15672 + username: "guest", + password: "guest" + ] + + Then the connection will be open at the start of the application and you can access via this function. + + iex> {:ok, conn} = AMQP.Application.get_connection() + + By default, it tries to connect to your local RabbitMQ. You can simply pass the empty keyword list too: + + config :amqp, connection: [] # == [url: "amqp://0.0.0.0"] + + You can set up multiple connections wth `:connections` key: + + config :amqp, connections: [ + business_report: [ + url: "amqp://host1" + ], + analytics: [ + url: "amqp://host2" + ] + ] + + Then you can access each connection with its name. + + iex> {:ok, conn1} = AMQP.Application.get_connection(:business_report) + iex> {:ok, conn2} = AMQP.Application.get_connection(:analytics) + + The defaut name is :default so These two configurations are equivalent: + + config :amqp, connection: [] + config :amqp, connections: [default: []] + + ## Configuration options + + * `:retry_interval` - The retry interval in milliseconds when the connection is failed to open (default `5000`) + * `:url` - AMQP URI for the connection + + See also `AMQP.Connection.open/2` for all available options. + """ + @spec get_connection(binary | atom) :: {:ok, AMQP.Connection.t()} | {:error, any} + def get_connection(name \\ :default) do + AMQP.Application.Connection.get_connection(name) + end + + @doc """ + Provides an easy way to access an AMQP channel. + + AMQP.Application provides a wrapper on top of `AMQP.Channel` with . + The channel will be monitored by AMQP's GenServer and it will automatically try to reopen when the channel is gone. + + ## Usage + + When you want to have a single channel in your app: + + config :amqp, + connection: [url: "amqp://guest:guest@localhost:15672"], + channel: [] + + Then the channel will be open at the start of the application and you can access it via this function. + + iex> {:ok, chan} = AMQP.Application.get_channel() + + You can also set up multiple channels wth `:channels` key: + + config :amqp, + connections: [ + business_report: [url: "amqp://host1"], + analytics: [url: "amqp://host2"] + ], + channels: [ + bisiness_report: [connection: :business_report], + analytics: [connection: :analytics] + ] + + Then you can access each channel with its name. + + iex> {:ok, conn1} = AMQP.Application.get_channel(:business_report) + iex> {:ok, conn2} = AMQP.Application.get_channel(:analytics) + + You can also have multiple channels for a single connection. + + config :amqp, + connection: [], + channels: [ + consumer: [], + producer: [] + ] + + ## Configuration options + + * `:connection` - The connection name configured with `connection` or `connections` (default `:default`) + * `:retry_interval` - The retry interval in milliseconds when the channel is failed to open (default `5000`) + + ## Caveat + + Although AMQP will reopen the named channel automatically when it is closed for some reasons, + your application still needs to monitor the channel for a consumer process. + Be aware the channel reponed doesn't automatically recover the subscription of your consumer + + Here is a sample GenServer module that monitors the channel and re-subscribe the channel. + + defmodule AppConsumer do + use GenServer + @channel :default + @queue "myqueue" + + .... + + def handle_info(:subscribe, state) do + subscribe() + {noreply, state} + end + + def handle_info({:DOWN, _, :process, pid, reason}, state) do + send(self(), :subscribe) + {:noreply, state} + end + + defp subscribe() do + case AMQP.Application.get_channel(@channel) do + {:ok, chan} -> + Process.monitor(chan.pid) + AMQP.Basic.consume(@channel, @queue) + + _error -> + Process.send_after(self(), :subscribe, 1000) + {:error, :retrying} + end + end + end + """ + @spec get_channel(binary | atom) :: {:ok, AMQP.Channel.t()} | {:error, any} + def get_channel(name \\ :default) do + AMQP.Application.Channel.get_channel(name) + end end diff --git a/lib/amqp/application/channel.ex b/lib/amqp/application/channel.ex index ee11ae0..ddd1939 100644 --- a/lib/amqp/application/channel.ex +++ b/lib/amqp/application/channel.ex @@ -1,9 +1,11 @@ defmodule AMQP.Application.Channel do @moduledoc false + # This module will stay as a private module at least during 2.0.x. + # There might be non backward compatible changes on this module on 2.1.x. use GenServer require Logger - alias AMQP.{Channel, Connection} + alias AMQP.Channel @default_interval 5_000 @@ -16,13 +18,13 @@ defmodule AMQP.Application.Channel do iex> opts = [proc_name: :my_chan, retry_interval: 10_000, connection: :my_conn] iex> :ok = AMQP.Application.Channel.start_link(opts) - iex> {:ok, chan} = AMQP.Application.Connection.get_connection(:my_chan) + iex> {:ok, chan} = AMQP.Application.Channel.get_channel(:my_chan) If you omit the proc_name, it uses :default. iex> :ok = AMQP.Application.Channel.start_link([]) - iex> {:ok, chan} = AMQP.Application.Connection.get_channel() - iex> {:ok, chan} = AMQP.Application.Connection.get_channel(:default) + iex> {:ok, chan} = AMQP.Application.Channel.get_channel() + iex> {:ok, chan} = AMQP.Application.Channel.get_channel(:default) """ @spec start_link(keyword) :: GenServer.on_start() def start_link(opts) do @@ -76,7 +78,7 @@ defmodule AMQP.Application.Channel do @doc """ Returns a channel referred by the name. """ - @spec get_channel(binary | atom) :: {:ok, Connection.t()} | {:error, any} + @spec get_channel(binary | atom) :: {:ok, Channel.t()} | {:error, any} def get_channel(name \\ :default) do case GenServer.call(get_server_name(name), :get_channel) do nil -> {:error, :channel_not_ready} @@ -97,7 +99,11 @@ defmodule AMQP.Application.Channel do end def handle_call(:get_channel, _, state) do - {:reply, state[:channel], state} + if state[:channel] && Process.alive?(state[:channel].pid) do + {:reply, state[:channel], state} + else + {:reply, nil, state} + end end @impl true diff --git a/lib/amqp/application/connection.ex b/lib/amqp/application/connection.ex index ca0d3ca..4557ae7 100644 --- a/lib/amqp/application/connection.ex +++ b/lib/amqp/application/connection.ex @@ -1,5 +1,7 @@ defmodule AMQP.Application.Connection do @moduledoc false + # This module will stay as a private module at least during 2.0.x. + # There might be non backward compatible changes on this module on 2.1.x. use GenServer require Logger @@ -100,7 +102,11 @@ defmodule AMQP.Application.Connection do end def handle_call(:get_connection, _, state) do - {:reply, state[:connection], state} + if state[:connection] && Process.alive?(state[:connection].pid) do + {:reply, state[:connection], state} + else + {:reply, nil, state} + end end @impl true From 375cdf71de50ef0aaaca1eab7c027e5bcb6a176e Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Sun, 3 Jan 2021 16:21:12 +0000 Subject: [PATCH 26/29] v2.0.0-rc.1 --- README.md | 10 ++++++++-- mix.exs | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 7a34a17..4014471 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,13 @@ The API is based on Langohr, a Clojure client for RabbitMQ. ## Migration from 1.X to 2.X -TO BE WRITTEN +TO BE WRITTEN. + +However it will work without any changes for most 1.X users. +The changes are mostly internal or additional. +There is one minor backward incompatible change for `nowait` option but it won't affect most users. + +See [the pull request](https://github.com/pma/amqp/pull/177) for the details and progress. ## Migration from 0.X to 1.X @@ -23,7 +29,7 @@ Add AMQP as a dependency in your `mix.exs` file. ```elixir def deps do [ - {:amqp, "~> 1.6.0"} + {:amqp, "~> 2.0.0-rc.1"} ] end ``` diff --git a/mix.exs b/mix.exs index 1a46b47..854f5d1 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule AMQP.Mixfile do use Mix.Project - @version "2.0.0-pre.0" + @version "2.0.0-rc.1" def project do [ From a57645db1888abe2c82e9cd5d8d849e09fe247ed Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Sat, 9 Jan 2021 15:45:56 +0000 Subject: [PATCH 27/29] Unsupport Connection.open/3 --- lib/amqp/connection.ex | 50 +++++----------------------------------- test/connection_test.exs | 7 ------ 2 files changed, 6 insertions(+), 51 deletions(-) diff --git a/lib/amqp/connection.ex b/lib/amqp/connection.ex index 6c6d84b..c30e637 100644 --- a/lib/amqp/connection.ex +++ b/lib/amqp/connection.ex @@ -26,9 +26,9 @@ defmodule AMQP.Connection do """ @spec open(keyword | String.t()) :: {:ok, t()} | {:error, atom()} | {:error, any()} - def open(uri_or_options \\ []) when is_binary(uri_or_options) or is_list(uri_or_options) do - open(uri_or_options, :undefined) - end + def open(uri_or_options \\ []) + def open(uri) when is_binary(uri), do: open(uri, []) + def open(options) when is_list(options), do: open("", options) @doc """ Opens an new Connection to an AMQP broker. @@ -92,40 +92,10 @@ defmodule AMQP.Connection do fail_if_no_peer_cert: true ] ) - - ## Backward compatibility for connection name - - RabbitMQ supports user-specified connection names since version 3.6.2. - - Previously AMQP took a connection name as a separate parameter on `open/2` and `open/3` and it is still supported in this version. - - iex> options = [host: "localhost", port: 5672, virtual_host: "/", username: "guest", password: "guest"] - iex> AMQP.Connection.open(options, :undefined) - {:ok, %AMQP.Connection{}} - - iex> AMQP.Connection.open("amqp://guest:guest@localhost", "my-connection") - {:ok, %AMQP.Connection{}} - - iex> AMQP.Connection.open("amqp://guest:guest@localhost", "my-connection", options) - {:ok, %AMQP.Connection{}} - - However the connection name parameter is now deprecated and might not be supported in the future versions. - You are recommented to pass it with `:name` option instead: - - iex> AMQP.Connection.open("amqp://guest:guest@localhost", name: "my-connection") - {:ok, %AMQP.Connection{}} """ - @spec open(String.t() | keyword, keyword | String.t() | :undefined) :: - {:ok, t()} | {:error, atom()} | {:error, any()} - def open(uri, options) - - def open(uri, name) when is_binary(uri) and (is_binary(name) or name == :undefined) do - do_open(uri, name, _options = []) - end - - def open(options, name) when is_list(options) and (is_binary(name) or name == :undefined) do - {name_from_opts, options} = take_connection_name(options) - name = if name == :undefined, do: name_from_opts, else: name + @spec open(String.t(), keyword) :: {:ok, t()} | {:error, atom()} | {:error, any()} + def open(uri, options) when is_nil(uri) or uri == "" do + {name, options} = take_connection_name(options) options |> merge_options_to_default() @@ -138,14 +108,6 @@ defmodule AMQP.Connection do do_open(uri, name, options) end - @doc false - @deprecated "Use :name in open/2 instead" - @spec open(String.t(), String.t() | :undefined, keyword) :: - {:ok, t()} | {:error, atom()} | {:error, any()} - def open(uri, name, options) when is_binary(uri) and is_list(options) do - do_open(uri, name, options) - end - defp do_open(uri, name, options) do case uri |> String.to_charlist() |> :amqp_uri.parse() do {:ok, amqp_params} -> amqp_params |> merge_options_to_amqp_params(options) |> do_open(name) diff --git a/test/connection_test.exs b/test/connection_test.exs index 8fb9791..be12987 100644 --- a/test/connection_test.exs +++ b/test/connection_test.exs @@ -59,13 +59,6 @@ defmodule ConnectionTest do assert :ok = Connection.close(conn) end - test "open connection with uri, name, and options (deprected but still spported)" do - assert {:ok, conn} = - Connection.open("amqp://nonexistent:5672", "my-connection", host: 'localhost') - - assert :ok = Connection.close(conn) - end - test "override uri with options" do uri = "amqp://foo:bar@amqp.test.com:12345" {:ok, amqp_params} = uri |> String.to_charlist() |> :amqp_uri.parse() From c22c793cbe74d23a16961a78301df29d1fc764a5 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Tue, 12 Jan 2021 19:50:53 +0000 Subject: [PATCH 28/29] Update README --- README.md | 128 ++++++++++++++---------------------------------------- 1 file changed, 33 insertions(+), 95 deletions(-) diff --git a/README.md b/README.md index 4014471..82d90f0 100644 --- a/README.md +++ b/README.md @@ -8,19 +8,12 @@ Simple Elixir wrapper for the Erlang RabbitMQ client. The API is based on Langohr, a Clojure client for RabbitMQ. -## Migration from 1.X to 2.X +## Upgrading guides -TO BE WRITTEN. +If you use old version and plan to migrate to 1.0 please read our upgrade guides: -However it will work without any changes for most 1.X users. -The changes are mostly internal or additional. -There is one minor backward incompatible change for `nowait` option but it won't affect most users. - -See [the pull request](https://github.com/pma/amqp/pull/177) for the details and progress. - -## Migration from 0.X to 1.X - -If you use amqp 0.X and plan to migrate to 1.0 please read our [migration guide](https://github.com/pma/amqp/wiki/Upgrade-from-0.X-to-1.0). +* [0.x to 1.0](https://github.com/pma/amqp/wiki/Upgrade-from-0.X-to-1.0) +* [1.x to 2.0](https://github.com/pma/amqp/wiki/2.0-Release-Notes#breaking-changes-and-upgrade-guide) ## Usage @@ -191,73 +184,33 @@ If you want to see more detailed logs, you can enable it by adding the following config :amqp, enable_progress_report: true ``` -### Stable RabbitMQ Connection - -While the above example works, it does nothing to handle RabbitMQ connection -outages. In case of an outage your Genserver will remain stale and won't -receive any messages from the broker as the connection is never restarted. +#### Connections and channels -Luckily, implementing a reconnection logic is quite straight forward. Since the -connection record holds the pid of the connection itself, we can monitor it -and get a notification when it goes down. +You can define a connection and channel in your config and AMQP will automatically... -Example implementation: +* Open the connection and channel at the start of the application +* Automatically try to reconnect if they are disconnected ```elixir -defmodule MyApp.AMQP do - use GenServer - require Logger - alias AMQP.Connection - - @host "amqp://localhost" - @reconnect_interval 10_000 - - def start_link(opts \\ [name: __MODULE__]) do - GenServer.start_link(__MODULE__, nil, opts) - end - - def init(_) do - send(self(), :connect) - {:ok, nil} - end - - def get_connection do - case GenServer.call(__MODULE__, :get) do - nil -> {:error, :not_connected} - conn -> {:ok, conn} - end - end - - def handle_call(:get, _, conn) do - {:reply, conn, conn} - end +config :amqp, + connections: [ + myconn: [url: "amqp://guest:guest@myhost:12345"], + ], + channels: [ + mychan: [connection: :myconn] + ] +``` - def handle_info(:connect, conn) do - case Connection.open(@host) do - {:ok, conn} -> - # Get notifications when the connection goes down - Process.monitor(conn.pid) - {:noreply, conn} - - {:error, _} -> - Logger.error("Failed to connect #{@host}. Reconnecting later...") - # Retry later - Process.send_after(self(), :connect, @reconnect_interval) - {:noreply, nil} - end - end +You can access the connection/channel via `AMQP.Application`. - def handle_info({:DOWN, _, :process, _pid, reason}, _) do - # Stop GenServer. Will be restarted by Supervisor. - {:stop, {:connection_lost, reason}, nil} - end -end +```elixir +iex> {:ok, chan} = AMQP.Application.get_channel(:mychan) +iex> :ok = AMQP.Basic.publish(chan, "", "", "Hello") ``` -Now, when the server starts, it will try to reconnect indefinitely until it succeeds. -When the connection drops or the server is down, the GenServer will stop. -If you have put the GenServer module to your application tree, the Supervisor will automatically restart it. -Then it will try to reconnect indefinitely until it succeeds. +When a channel is dwon and reconnected, you have make sure your consumer subscribes to a channel again. + +See the documentation for `AMQP.Application.get_connection/1` and `AMQP.Application.get_channel/1` for more details. ### Types of arguments and headers @@ -287,35 +240,21 @@ Valid argument names in `Exchange.declare` include: ## Troubleshooting / FAQ -#### Connections and Channels - -If this is your first time using RabbitMQ, we recommend you to start designing your application like this way: - -- Open and manage a single connection for an application -- Open/close a channel per process (don't share a channel between multiple processes) - -Once you saw things in action you can now consider optimising the performance by increasing number of connections etc. - -Note it's completely safe to share a single connection between multiple processes. -However it is not recommended to share a channel between multiple processes. -It's technically possible but you want to understand the implications when you do. - -Make sure you close the channel after used to avoid any potential memory leaks and warnings from RabbitMQ client library. - #### Consumer stops receiving messages It usually happens when your code doesn't send acknowledgement(ack, nack or reject) after receiving a message. -You want to investigate if... -- an exception was raised and how it would be handled -- :exit signal was thrown and how it would be handled -- a message processing took long time. +If you use GenServer for your consumer, try storing number of messages the server is currently processing to the GenServer state. +If the number equals `prefetch_count`, those messages were left without acknowledgements and that's why consumer have stopped receiving more messages. + +Also review the following points: -If you use GenServer in consumer, try storing number of messages the server is -currently processing to the GenServer state. -If the number equals `prefetch_count`, those messages were left without -acknowledgements and that's why consumer have stopped receiving more -messages. +- when an exception was raised how it would be handled +- when :exit signal was thrown how it would be handled +- when a message processing took long time what could happen + +Also make sure that the consumer to monitor the channel. +When the channel is gone, you have to reopen and subscribe to a new channel again. #### The version compatibiliy @@ -336,7 +275,6 @@ Try the following configuration. config :logger, handle_otp_reports: false ``` - #### Lager conflicts with Elixir logger Lager is used by rabbit_common and it is not Elixir's best friend yet. From e3feddd9729450455111b62d536ecc76886e97fb Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Tue, 12 Jan 2021 19:56:10 +0000 Subject: [PATCH 29/29] Tweak README --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 82d90f0..99128f3 100644 --- a/README.md +++ b/README.md @@ -208,7 +208,7 @@ iex> {:ok, chan} = AMQP.Application.get_channel(:mychan) iex> :ok = AMQP.Basic.publish(chan, "", "", "Hello") ``` -When a channel is dwon and reconnected, you have make sure your consumer subscribes to a channel again. +When a channel is down and reconnected, you have to make sure your consumer subscribes to a channel again. See the documentation for `AMQP.Application.get_connection/1` and `AMQP.Application.get_channel/1` for more details. @@ -244,8 +244,8 @@ Valid argument names in `Exchange.declare` include: It usually happens when your code doesn't send acknowledgement(ack, nack or reject) after receiving a message. -If you use GenServer for your consumer, try storing number of messages the server is currently processing to the GenServer state. -If the number equals `prefetch_count`, those messages were left without acknowledgements and that's why consumer have stopped receiving more messages. +If you use GenServer for your consumer, try storing the number of messages the server is currently processing to the GenServer state. +If the number equals `prefetch_count`, those messages were left without acknowledgements and that's why the consumer has stopped receiving more messages. Also review the following points: @@ -253,8 +253,8 @@ Also review the following points: - when :exit signal was thrown how it would be handled - when a message processing took long time what could happen -Also make sure that the consumer to monitor the channel. -When the channel is gone, you have to reopen and subscribe to a new channel again. +Also make sure that the consumer monitors the channel pid. +When the channel is gone, you have to reopen it and subscribe to a new channel again. #### The version compatibiliy