diff --git a/.github/workflows/elixir.yml b/.github/workflows/elixir.yml index 81dc711..e749d73 100644 --- a/.github/workflows/elixir.yml +++ b/.github/workflows/elixir.yml @@ -15,13 +15,6 @@ jobs: otp-version: 23.2 elixir-version: 1.11.2 - - name: Restore dependencies cache - uses: actions/cache@v2 - with: - path: deps - key: ${{ runner.os }}-mix-${{ hashFiles('**/mix.lock') }} - restore-keys: ${{ runner.os }}-mix- - - name: Install dependencies run: mix deps.get @@ -60,13 +53,6 @@ jobs: otp-version: ${{matrix.otp}} elixir-version: ${{matrix.elixir}} - - name: Restore dependencies cache - uses: actions/cache@v2 - with: - path: deps - key: ${{ runner.os }}-mix-${{ hashFiles('**/mix.lock') }} - restore-keys: ${{ runner.os }}-mix- - - name: Install dependencies run: mix deps.get diff --git a/.gitignore b/.gitignore index 5f06f30..ef8bad9 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ erl_crash.dump *.ez /log /doc +/cover diff --git a/README.md b/README.md index 379f815..99128f3 100644 --- a/README.md +++ b/README.md @@ -8,9 +8,12 @@ Simple Elixir wrapper for the Erlang RabbitMQ client. The API is based on Langohr, a Clojure client for RabbitMQ. -## Migration from 0.X to 1.X +## Upgrading guides -If you use amqp 0.X and plan to migrate to 1.0 please read our [migration guide](https://github.com/pma/amqp/wiki/Upgrade-from-0.X-to-1.0). +If you use old version and plan to migrate to 1.0 please read our upgrade guides: + +* [0.x to 1.0](https://github.com/pma/amqp/wiki/Upgrade-from-0.X-to-1.0) +* [1.x to 2.0](https://github.com/pma/amqp/wiki/2.0-Release-Notes#breaking-changes-and-upgrade-guide) ## Usage @@ -19,7 +22,7 @@ Add AMQP as a dependency in your `mix.exs` file. ```elixir def deps do [ - {:amqp, "~> 1.6.0"} + {:amqp, "~> 2.0.0-rc.1"} ] end ``` @@ -167,75 +170,49 @@ Error converting Hello, World! to integer Error converting Hello, World! to integer ``` -## Stable RabbitMQ Connection - -While the above example works, it does nothing to handle RabbitMQ connection -outages. In case of an outage your Genserver will remain stale and won't -receive any messages from the broker as the connection is never restarted. +### Configuration -Luckily, implementing a reconnection logic is quite straight forward. Since the -connection record holds the pid of the connection itself, we can monitor it -and get a notification when it goes down. +#### Erlang library's progress report -Example implementation: +This library uses an official Erlang RabbitMQ client library internally and we found its logging is too verbose. +These are called progress reports by the Erlang library and you would see a lot of entries with info log level if you +use 1.x version. +AMQP disables that by default from version 2.0. +If you want to see more detailed logs, you can enable it by adding the following line on your config. ```elixir -defmodule MyApp.AMQP do - use GenServer - require Logger - alias AMQP.Connection - - @host "amqp://localhost" - @reconnect_interval 10_000 +config :amqp, enable_progress_report: true +``` - def start_link(opts \\ [name: __MODULE__]) do - GenServer.start_link(__MODULE__, nil, opts) - end +#### Connections and channels - def init(_) do - send(self(), :connect) - {:ok, nil} - end +You can define a connection and channel in your config and AMQP will automatically... - def get_connection do - case GenServer.call(__MODULE__, :get) do - nil -> {:error, :not_connected} - conn -> {:ok, conn} - end - end +* Open the connection and channel at the start of the application +* Automatically try to reconnect if they are disconnected - def handle_call(:get, _, conn) do - {:reply, conn, conn} - end +```elixir +config :amqp, + connections: [ + myconn: [url: "amqp://guest:guest@myhost:12345"], + ], + channels: [ + mychan: [connection: :myconn] + ] +``` - def handle_info(:connect, conn) do - case Connection.open(@host) do - {:ok, conn} -> - # Get notifications when the connection goes down - Process.monitor(conn.pid) - {:noreply, conn} - - {:error, _} -> - Logger.error("Failed to connect #{@host}. Reconnecting later...") - # Retry later - Process.send_after(self(), :connect, @reconnect_interval) - {:noreply, nil} - end - end +You can access the connection/channel via `AMQP.Application`. - def handle_info({:DOWN, _, :process, _pid, reason}, _) do - # Stop GenServer. Will be restarted by Supervisor. - {:stop, {:connection_lost, reason}, nil} - end -end +```elixir +iex> {:ok, chan} = AMQP.Application.get_channel(:mychan) +iex> :ok = AMQP.Basic.publish(chan, "", "", "Hello") ``` -Now, when the server starts, it will try to reconnect indefinitely until it succeeds. -When the connection drops or the server is down, the GenServer will stop. -If you have put the GenServer module to your application tree, the Supervisor will automatically restart it. -Then it will try to reconnect indefinitely until it succeeds. +When a channel is down and reconnected, you have to make sure your consumer subscribes to a channel again. -## Types of arguments and headers +See the documentation for `AMQP.Application.get_connection/1` and `AMQP.Application.get_channel/1` for more details. + +### Types of arguments and headers The parameter `arguments` in `Queue.declare`, `Exchange.declare`, `Basic.consume` and the parameter `headers` in `Basic.publish` are a list of tuples in the form `{name, type, value}`, where `name` is a binary containing the argument/header name, `type` is an atom describing the AMQP field type and `value` a term compatible with the AMQP field type. @@ -263,35 +240,21 @@ Valid argument names in `Exchange.declare` include: ## Troubleshooting / FAQ -#### Connections and Channels - -If this is your first time using RabbitMQ, we recommend you to start designing your application like this way: - -- Open and manage a single connection for an application -- Open/close a channel per process (don't share a channel between multiple processes) - -Once you saw things in action you can now consider optimising the performance by increasing number of connections etc. - -Note it's completely safe to share a single connection between multiple processes. -However it is not recommended to share a channel between multiple processes. -It's technically possible but you want to understand the implications when you do. - -Make sure you close the channel after used to avoid any potential memory leaks and warnings from RabbitMQ client library. - #### Consumer stops receiving messages It usually happens when your code doesn't send acknowledgement(ack, nack or reject) after receiving a message. -You want to investigate if... -- an exception was raised and how it would be handled -- :exit signal was thrown and how it would be handled -- a message processing took long time. +If you use GenServer for your consumer, try storing the number of messages the server is currently processing to the GenServer state. +If the number equals `prefetch_count`, those messages were left without acknowledgements and that's why the consumer has stopped receiving more messages. + +Also review the following points: -If you use GenServer in consumer, try storing number of messages the server is -currently processing to the GenServer state. -If the number equals `prefetch_count`, those messages were left without -acknowledgements and that's why consumer have stopped receiving more -messages. +- when an exception was raised how it would be handled +- when :exit signal was thrown how it would be handled +- when a message processing took long time what could happen + +Also make sure that the consumer monitors the channel pid. +When the channel is gone, you have to reopen it and subscribe to a new channel again. #### The version compatibiliy @@ -312,18 +275,6 @@ Try the following configuration. config :logger, handle_otp_reports: false ``` -Or try filtering out the messages at your application start: - -```elixir -:logger.add_primary_filter( - :ignore_rabbitmq_progress_reports, - {&:logger_filters.domain/2, {:stop, :equal, [:progress]}} -) -``` - -See [this comment](https://github.com/pma/amqp/issues/110#issuecomment-442761299) for the -details. - #### Lager conflicts with Elixir logger Lager is used by rabbit_common and it is not Elixir's best friend yet. diff --git a/lib/amqp/application.ex b/lib/amqp/application.ex index 2044f5b..0782627 100644 --- a/lib/amqp/application.ex +++ b/lib/amqp/application.ex @@ -1,15 +1,239 @@ defmodule AMQP.Application do - @moduledoc false + @moduledoc """ + Provides access to configured connections and channels. + """ use Application + require Logger @impl true def start(_type, _args) do - children = [ - {AMQP.Channel.ReceiverManager, []} + load_config() + children = load_connections() ++ load_channels() + + opts = [ + strategy: :one_for_one, + name: AMQP.Application, + max_restarts: length(children) * 2, + max_seconds: 1 ] - opts = [strategy: :one_for_one, name: AMQP.Application] Supervisor.start_link(children, opts) end + + defp load_config do + unless Application.get_env(:amqp, :enable_progress_report, false) do + disable_progress_report() + end + end + + defp load_connections do + conn = Application.get_env(:amqp, :connection) + conns = Application.get_env(:amqp, :connections, []) + conns = if conn, do: conns ++ [default: conn], else: conns + + Enum.map(conns, fn {name, opts} -> + arg = opts ++ [proc_name: name] + id = AMQP.Application.Connection.get_server_name(name) + Supervisor.child_spec({AMQP.Application.Connection, arg}, id: id) + end) + end + + defp load_channels do + chan = Application.get_env(:amqp, :channel) + chans = Application.get_env(:amqp, :channels, []) + chans = if chan, do: chans ++ [default: chan], else: chans + + Enum.map(chans, fn {name, opts} -> + arg = opts ++ [proc_name: name] + id = AMQP.Application.Channel.get_server_name(name) + Supervisor.child_spec({AMQP.Application.Channel, arg}, id: id) + end) + end + + @doc """ + Disables the progress report logging from Erlang library. + + The log outputs are very verbose and can contain credentials. + This AMQP library recommends to disable unless you want the information. + """ + @spec disable_progress_report :: :ok | {:error, any} + def disable_progress_report do + :logger.add_primary_filter( + :amqp_ignore_rabbitmq_progress_reports, + {&:logger_filters.domain/2, {:stop, :equal, [:progress]}} + ) + rescue + e -> + Logger.warn("Failed to disable progress report by Erlang library: detail: #{inspect(e)}") + {:error, e} + end + + @doc """ + Enables the progress report logging from Erlang library. + """ + @spec enable_progress_report :: :ok | {:error, any} + def enable_progress_report do + case :logger.remove_primary_filter(:amqp_ignore_rabbitmq_progress_reports) do + :ok -> :ok + # filter already removed + {:error, {:not_found, _}} -> :ok + error -> error + end + end + + @doc """ + Provides an easy way to access an AMQP connection. + + The connection will be monitored by AMQP's GenServer and it will automatically try to reconnect when the connection is gone. + + ## Usage + + When you want to have a single connection in your app: + + config :amqp, connection: [ + url: "amqp://guest:guest@localhost:15672" + ] + + You can also use any options available on `AMQP.Connection.open/2`: + + config :amqp, connection: [ + host: "localhost", + port: 15672 + username: "guest", + password: "guest" + ] + + Then the connection will be open at the start of the application and you can access via this function. + + iex> {:ok, conn} = AMQP.Application.get_connection() + + By default, it tries to connect to your local RabbitMQ. You can simply pass the empty keyword list too: + + config :amqp, connection: [] # == [url: "amqp://0.0.0.0"] + + You can set up multiple connections wth `:connections` key: + + config :amqp, connections: [ + business_report: [ + url: "amqp://host1" + ], + analytics: [ + url: "amqp://host2" + ] + ] + + Then you can access each connection with its name. + + iex> {:ok, conn1} = AMQP.Application.get_connection(:business_report) + iex> {:ok, conn2} = AMQP.Application.get_connection(:analytics) + + The defaut name is :default so These two configurations are equivalent: + + config :amqp, connection: [] + config :amqp, connections: [default: []] + + ## Configuration options + + * `:retry_interval` - The retry interval in milliseconds when the connection is failed to open (default `5000`) + * `:url` - AMQP URI for the connection + + See also `AMQP.Connection.open/2` for all available options. + """ + @spec get_connection(binary | atom) :: {:ok, AMQP.Connection.t()} | {:error, any} + def get_connection(name \\ :default) do + AMQP.Application.Connection.get_connection(name) + end + + @doc """ + Provides an easy way to access an AMQP channel. + + AMQP.Application provides a wrapper on top of `AMQP.Channel` with . + The channel will be monitored by AMQP's GenServer and it will automatically try to reopen when the channel is gone. + + ## Usage + + When you want to have a single channel in your app: + + config :amqp, + connection: [url: "amqp://guest:guest@localhost:15672"], + channel: [] + + Then the channel will be open at the start of the application and you can access it via this function. + + iex> {:ok, chan} = AMQP.Application.get_channel() + + You can also set up multiple channels wth `:channels` key: + + config :amqp, + connections: [ + business_report: [url: "amqp://host1"], + analytics: [url: "amqp://host2"] + ], + channels: [ + bisiness_report: [connection: :business_report], + analytics: [connection: :analytics] + ] + + Then you can access each channel with its name. + + iex> {:ok, conn1} = AMQP.Application.get_channel(:business_report) + iex> {:ok, conn2} = AMQP.Application.get_channel(:analytics) + + You can also have multiple channels for a single connection. + + config :amqp, + connection: [], + channels: [ + consumer: [], + producer: [] + ] + + ## Configuration options + + * `:connection` - The connection name configured with `connection` or `connections` (default `:default`) + * `:retry_interval` - The retry interval in milliseconds when the channel is failed to open (default `5000`) + + ## Caveat + + Although AMQP will reopen the named channel automatically when it is closed for some reasons, + your application still needs to monitor the channel for a consumer process. + Be aware the channel reponed doesn't automatically recover the subscription of your consumer + + Here is a sample GenServer module that monitors the channel and re-subscribe the channel. + + defmodule AppConsumer do + use GenServer + @channel :default + @queue "myqueue" + + .... + + def handle_info(:subscribe, state) do + subscribe() + {noreply, state} + end + + def handle_info({:DOWN, _, :process, pid, reason}, state) do + send(self(), :subscribe) + {:noreply, state} + end + + defp subscribe() do + case AMQP.Application.get_channel(@channel) do + {:ok, chan} -> + Process.monitor(chan.pid) + AMQP.Basic.consume(@channel, @queue) + + _error -> + Process.send_after(self(), :subscribe, 1000) + {:error, :retrying} + end + end + end + """ + @spec get_channel(binary | atom) :: {:ok, AMQP.Channel.t()} | {:error, any} + def get_channel(name \\ :default) do + AMQP.Application.Channel.get_channel(name) + end end diff --git a/lib/amqp/application/channel.ex b/lib/amqp/application/channel.ex new file mode 100644 index 0000000..ddd1939 --- /dev/null +++ b/lib/amqp/application/channel.ex @@ -0,0 +1,160 @@ +defmodule AMQP.Application.Channel do + @moduledoc false + # This module will stay as a private module at least during 2.0.x. + # There might be non backward compatible changes on this module on 2.1.x. + + use GenServer + require Logger + alias AMQP.Channel + + @default_interval 5_000 + + @doc """ + Starts a GenServer process linked to the current process. + + ## Examples + + Combines name and retry interval with the connection options. + + iex> opts = [proc_name: :my_chan, retry_interval: 10_000, connection: :my_conn] + iex> :ok = AMQP.Application.Channel.start_link(opts) + iex> {:ok, chan} = AMQP.Application.Channel.get_channel(:my_chan) + + If you omit the proc_name, it uses :default. + + iex> :ok = AMQP.Application.Channel.start_link([]) + iex> {:ok, chan} = AMQP.Application.Channel.get_channel() + iex> {:ok, chan} = AMQP.Application.Channel.get_channel(:default) + """ + @spec start_link(keyword) :: GenServer.on_start() + def start_link(opts) do + {name, init_arg} = link_opts_to_init_arg(opts) + + GenServer.start_link(__MODULE__, init_arg, name: name) + end + + defp link_opts_to_init_arg(opts) do + proc_name = Keyword.get(opts, :proc_name, :default) + server_name = get_server_name(proc_name) + retry_interval = Keyword.get(opts, :retry_interval, @default_interval) + connection = Keyword.get(opts, :connection, proc_name) + + init_arg = %{ + retry_interval: retry_interval, + connection: connection, + name: proc_name, + monitor_ref: nil, + channel: nil + } + + {server_name, init_arg} + end + + @doc """ + Returns a GenServer reference for the channel name + """ + @spec get_server_name(binary | atom) :: binary + def get_server_name(name) do + :"#{__MODULE__}::#{name}" + end + + @doc false + def get_state(name \\ :default) do + GenServer.call(get_server_name(name), :get_state) + end + + @doc """ + Returns pid for the server referred by the name. + + It is a wrapper of `GenServer.whereis/1`. + """ + @spec whereis(binary() | atom()) :: pid() | {atom(), node()} | nil + def whereis(name) do + name + |> get_server_name() + |> GenServer.whereis() + end + + @doc """ + Returns a channel referred by the name. + """ + @spec get_channel(binary | atom) :: {:ok, Channel.t()} | {:error, any} + def get_channel(name \\ :default) do + case GenServer.call(get_server_name(name), :get_channel) do + nil -> {:error, :channel_not_ready} + channel -> {:ok, channel} + end + end + + @impl true + def init(state) do + send(self(), :open) + Process.flag(:trap_exit, true) + {:ok, state} + end + + @impl true + def handle_call(:get_state, _, state) do + {:reply, state, state} + end + + def handle_call(:get_channel, _, state) do + if state[:channel] && Process.alive?(state[:channel].pid) do + {:reply, state[:channel], state} + else + {:reply, nil, state} + end + end + + @impl true + def handle_info(:open, state) do + case AMQP.Application.Connection.get_connection(state[:connection]) do + {:ok, conn} -> + case Channel.open(conn) do + {:ok, chan} -> + ref = Process.monitor(chan.pid) + {:noreply, %{state | channel: chan, monitor_ref: ref}} + + {:error, error} -> + Logger.error("Failed to open an AMQP channel(#{state[:name]}) - #{inspect(error)}") + Process.send_after(self(), :open, state[:retry_interval]) + {:noreply, state} + end + + _error -> + Logger.error( + "Failed to open an AMQP channel(#{state[:name]}). Connection (#{state[:connection]}) is not ready." + ) + + Process.send_after(self(), :open, state[:retry_interval]) + {:noreply, state} + end + end + + def handle_info({:DOWN, _, :process, pid, _reason}, %{channel: %{pid: pid}} = state) + when is_pid(pid) do + Logger.info("AMQP channel is gone (#{state[:name]}). Reopening...") + send(self(), :open) + {:noreply, %{state | channel: nil, monitor_ref: nil}} + end + + def handle_info({:EXIT, _from, reason}, state) do + close(state) + {:stop, reason, %{state | channel: nil, monitor_ref: nil}} + end + + @impl true + def terminate(_reason, state) do + close(state) + %{state | channel: nil, monitor_ref: nil} + end + + defp close(%{channel: %Channel{} = channel, monior_ref: ref}) do + if Process.alive?(channel.pid) do + Process.demonitor(ref) + Channel.close(channel) + end + end + + defp close(_), do: :ok +end diff --git a/lib/amqp/application/connection.ex b/lib/amqp/application/connection.ex new file mode 100644 index 0000000..4557ae7 --- /dev/null +++ b/lib/amqp/application/connection.ex @@ -0,0 +1,163 @@ +defmodule AMQP.Application.Connection do + @moduledoc false + # This module will stay as a private module at least during 2.0.x. + # There might be non backward compatible changes on this module on 2.1.x. + + use GenServer + require Logger + alias AMQP.Connection + + @default_interval 5_000 + + @doc """ + Starts a GenServer process linked to the current process. + + It expects options to be a combination of connection args, proc_name and retry_interval. + + ## Examples + + Combines name and retry interval with the connection options. + + iex> opts = [proc_name: :my_conn, retry_interval: 10_000, host: "localhost"] + iex> {:ok, pid} = AMQP.Application.Connection.start_link(opts) + iex> {:ok, conn} = AMQP.Application.Connection.get_connection(:my_conn) + + Passes URL instead of options and use a default proc name when you need only a single connection. + + iex> opts = [url: "amqp://guest:guest@localhost"] + iex> :ok = AMQP.Application.Connection.start_link(opts) + iex> {:ok, conn} = AMQP.Application.Connection.get_connection() + iex> {:ok, conn} = AMQP.Application.Connection.get_connection(:default) + """ + @spec start_link(keyword) :: GenServer.on_start() + def start_link(opts) do + {name, init_arg} = link_opts_to_init_arg(opts) + + GenServer.start_link(__MODULE__, init_arg, name: name) + end + + defp link_opts_to_init_arg(opts) do + proc_name = Keyword.get(opts, :proc_name, :default) + server_name = get_server_name(proc_name) + retry_interval = Keyword.get(opts, :retry_interval, @default_interval) + open_arg = Keyword.drop(opts, [:proc_name, :retry_interval]) + + init_arg = %{ + retry_interval: retry_interval, + open_arg: open_arg, + name: proc_name, + connection: nil, + monitor_ref: nil + } + + {server_name, init_arg} + end + + @doc """ + Returns a GenServer reference for the connection name + """ + @spec get_server_name(binary | atom) :: binary + def get_server_name(name) do + :"#{__MODULE__}::#{name}" + end + + @doc false + def get_state(name \\ :default) do + GenServer.call(get_server_name(name), :get_state) + end + + @doc """ + Returns pid for the server referred by the name. + + It is a wrapper of `GenServer.whereis/1`. + """ + @spec whereis(binary() | atom()) :: pid() | {atom(), node()} | nil + def whereis(name) do + name + |> get_server_name() + |> GenServer.whereis() + end + + @doc """ + Returns a connection referred by the name. + """ + @spec get_connection(binary | atom) :: {:ok, Connection.t()} | {:error, any} + def get_connection(name \\ :default) do + case GenServer.call(get_server_name(name), :get_connection) do + nil -> {:error, :not_connected} + conn -> {:ok, conn} + end + end + + @impl true + def init(state) do + send(self(), :connect) + Process.flag(:trap_exit, true) + {:ok, state} + end + + @impl true + def handle_call(:get_state, _, state) do + {:reply, state, state} + end + + def handle_call(:get_connection, _, state) do + if state[:connection] && Process.alive?(state[:connection].pid) do + {:reply, state[:connection], state} + else + {:reply, nil, state} + end + end + + @impl true + def handle_info(:connect, state) do + case do_open(state[:open_arg]) do + {:ok, conn} -> + # Get notifications when the connection goes down + ref = Process.monitor(conn.pid) + {:noreply, %{state | connection: conn, monitor_ref: ref}} + + {:error, _} -> + Logger.error("Failed to open AMQP connection (#{state[:name]}). Retrying later...") + + # Retry later + Process.send_after(self(), :connect, state[:retry_interval]) + {:noreply, state} + end + end + + def handle_info({:DOWN, _, :process, pid, _reason}, %{connection: %{pid: pid}} = state) + when is_pid(pid) do + Logger.info("AMQP connection is gone (#{state[:name]}). Reconnecting...") + send(self(), :connect) + {:noreply, %{state | connection: nil, monitor_ref: nil}} + end + + def handle_info({:EXIT, _from, reason}, state) do + close(state) + {:stop, reason, %{state | connection: nil, monitor_ref: nil}} + end + + @impl true + def terminate(_reason, state) do + close(state) + %{state | connection: nil, monitor_ref: nil} + end + + defp close(%{connection: %Connection{} = conn, monior_ref: ref}) do + if Process.alive?(conn.pid) do + Process.demonitor(ref) + Connection.close(conn) + end + end + + defp close(_), do: :ok + + defp do_open(options) do + if url = options[:url] do + Connection.open(url, Keyword.delete(options, :url)) + else + Connection.open(options) + end + end +end diff --git a/lib/amqp/basic.ex b/lib/amqp/basic.ex index 48c3fa3..9495c82 100644 --- a/lib/amqp/basic.ex +++ b/lib/amqp/basic.ex @@ -4,8 +4,7 @@ defmodule AMQP.Basic do """ import AMQP.Core - alias AMQP.{Channel, Utils} - alias AMQP.Channel.ReceiverManager + alias AMQP.{Channel, Utils, SelectiveConsumer} @type error :: {:error, reason :: :blocked | :closing} @@ -74,9 +73,9 @@ defmodule AMQP.Basic do priority: Keyword.get(options, :priority, :undefined), correlation_id: Keyword.get(options, :correlation_id, :undefined), reply_to: Keyword.get(options, :reply_to, :undefined), - expiration: Keyword.get(options, :expiration, :undefined), + expiration: Keyword.get(options, :expiration, :undefined) |> number_to_s(), message_id: Keyword.get(options, :message_id, :undefined), - timestamp: Keyword.get(options, :timestamp, :undefined), + timestamp: Keyword.get(options, :timestamp, :undefined) |> to_epoch(), type: Keyword.get(options, :type, :undefined), user_id: Keyword.get(options, :user_id, :undefined), app_id: Keyword.get(options, :app_id, :undefined), @@ -306,7 +305,7 @@ defmodule AMQP.Basic do * `{:basic_consume_ok, %{consumer_tag: consumer_tag}}` - Sent when the consumer \ process is registered with Basic.consume. The caller receives the same information \ as the return of Basic.consume; - * `{:basic_cancel, %{consumer_tag: consumer_tag, no_wait: no_wait}}` - Sent by the \ + * `{:basic_cancel, %{consumer_tag: consumer_tag, nowait: nowait}}` - Sent by the \ broker when the consumer is unexpectedly cancelled (such as after a queue deletion) * `{:basic_cancel_ok, %{consumer_tag: consumer_tag}}` - Sent to the consumer process after a call to Basic.cancel @@ -325,7 +324,7 @@ defmodule AMQP.Basic do * `:exclusive` - If set, requests exclusive consumer access, meaning that only this consumer can consume from the given `queue`. Note that the client cannot have exclusive access to a queue that already has consumers. - * `:no_wait` - If set, the consume operation is asynchronous. Defaults to + * `:nowait` - If set, the consume operation is asynchronous. Defaults to `false`. * `:arguments` - A list of arguments to pass when consuming (of type `t:AMQP.arguments/0`). See the README for more information. Defaults to `[]`. @@ -333,6 +332,9 @@ defmodule AMQP.Basic do """ @spec consume(Channel.t(), String.t(), pid | nil, keyword) :: {:ok, String.t()} | error def consume(%Channel{} = chan, queue, consumer_pid \\ nil, options \\ []) do + nowait = Keyword.get(options, :no_wait, false) || Keyword.get(options, :nowait, false) + consumer_tag = Keyword.get(options, :consumer_tag, "") + basic_consume = basic_consume( queue: queue, @@ -340,7 +342,7 @@ defmodule AMQP.Basic do no_local: Keyword.get(options, :no_local, false), no_ack: Keyword.get(options, :no_ack, false), exclusive: Keyword.get(options, :exclusive, false), - nowait: Keyword.get(options, :no_wait, false), + nowait: nowait, arguments: Keyword.get(options, :arguments, []) ) @@ -350,10 +352,10 @@ defmodule AMQP.Basic do # https://github.com/rabbitmq/rabbitmq-erlang-client/blob/master/src/amqp_selective_consumer.erl # # It acts like a broker and distributes the messages to the process registered with :amqp_channel.subscribe/3. - # AMQP also provides another broker (Receiver/ReceiverManager) that transfors a message from Erlang record + # AMQP also provides another broker (DirectConsumer/SelectiveConsumer) that transfors a message from Erlang record # to Elixir friendly type and forwards the message to the process passed to this method. # - # [RabbitMQ] -> [Channel = SelectiveConsumer] -> [AMQP.Channel.Receiver] -> [consumer_pid] + # [RabbitMQ] -> [Channel] -> [SelectiveConsumer] -> [consumer_pid] # # If custom_consumer is set when the channel is open, the message handling is up to the consumer implementation. # @@ -361,20 +363,18 @@ defmodule AMQP.Basic do # pid = case chan.custom_consumer do - nil -> - %{pid: pid} = - ReceiverManager.register_handler(chan.pid, consumer_pid || self(), :consume) - - pid + {SelectiveConsumer, _} -> + consumer_pid || self() _ -> # when channel has a custom consumer, leave it to handle the given pid with `#handle_consume` callback. consumer_pid end - case :amqp_channel.subscribe(chan.pid, basic_consume, pid) do - basic_consume_ok(consumer_tag: consumer_tag) -> {:ok, consumer_tag} - error -> {:error, error} + case {nowait, :amqp_channel.subscribe(chan.pid, basic_consume, pid)} do + {true, :ok} -> {:ok, consumer_tag} + {_, basic_consume_ok(consumer_tag: consumer_tag)} -> {:ok, consumer_tag} + {_, error} -> {:error, error} end end @@ -390,18 +390,17 @@ defmodule AMQP.Basic do ## Options - * `:no_wait` - If set, the cancel operation is asynchronous. Defaults to - `false`. - + * `:nowait` - If set, the cancel operation is asynchronous. Defaults to `false`. """ @spec cancel(Channel.t(), String.t(), keyword) :: {:ok, String.t()} | error def cancel(%Channel{pid: pid}, consumer_tag, options \\ []) do - basic_cancel = - basic_cancel(consumer_tag: consumer_tag, nowait: Keyword.get(options, :no_wait, false)) + nowait = Keyword.get(options, :no_wait, false) || Keyword.get(options, :nowait, false) + basic_cancel = basic_cancel(consumer_tag: consumer_tag, nowait: nowait) - case :amqp_channel.call(pid, basic_cancel) do - basic_cancel_ok(consumer_tag: consumer_tag) -> {:ok, consumer_tag} - error -> {:error, error} + case {nowait, :amqp_channel.call(pid, basic_cancel)} do + {true, :ok} -> {:ok, consumer_tag} + {_, basic_cancel_ok(consumer_tag: consumer_tag)} -> {:ok, consumer_tag} + {_, error} -> {:error, error} end end @@ -411,9 +410,8 @@ defmodule AMQP.Basic do The registered process will receive `{:basic_return, payload, meta}` tuples. """ @spec return(Channel.t(), pid) :: :ok - def return(%Channel{pid: pid}, return_handler_pid) do - receiver = ReceiverManager.register_handler(pid, return_handler_pid, :return) - :amqp_channel.register_return_handler(pid, receiver.pid) + def return(%Channel{} = chan, return_handler_pid) do + :amqp_channel.call_consumer(chan.pid, {:register_return_handler, chan, return_handler_pid}) end @doc """ @@ -422,8 +420,12 @@ defmodule AMQP.Basic do """ @spec cancel_return(Channel.t()) :: :ok def cancel_return(%Channel{pid: pid}) do - # Currently we don't remove the receiver. - # The receiver will be deleted automatically when channel is closed. :amqp_channel.unregister_return_handler(pid) end + + defp number_to_s(value) when is_number(value), do: to_string(value) + defp number_to_s(value), do: value + + defp to_epoch(%DateTime{} = value), do: DateTime.to_unix(value) + defp to_epoch(value), do: value end diff --git a/lib/amqp/channel.ex b/lib/amqp/channel.ex index c1d370a..c98880c 100644 --- a/lib/amqp/channel.ex +++ b/lib/amqp/channel.ex @@ -3,7 +3,7 @@ defmodule AMQP.Channel do Functions to operate on Channels. """ - alias AMQP.{Connection, Channel} + alias AMQP.{Connection, Channel, SelectiveConsumer} defstruct [:conn, :pid, :custom_consumer] @type t :: %Channel{conn: Connection.t(), pid: pid, custom_consumer: custom_consumer() | nil} @@ -18,7 +18,7 @@ defmodule AMQP.Channel do custom consumer. """ @spec open(Connection.t(), custom_consumer | nil) :: {:ok, Channel.t()} | {:error, any} - def open(%Connection{} = conn, custom_consumer \\ nil) do + def open(%Connection{} = conn, custom_consumer \\ {SelectiveConsumer, self()}) do do_open_channel(conn, custom_consumer) end diff --git a/lib/amqp/channel/receiver.ex b/lib/amqp/channel/receiver.ex deleted file mode 100644 index a90aa82..0000000 --- a/lib/amqp/channel/receiver.ex +++ /dev/null @@ -1,219 +0,0 @@ -defmodule AMQP.Channel.Receiver do - @moduledoc false - - import AMQP.Core - alias AMQP.Channel.ReceiverManager - - @doc """ - Handles channel messages. - """ - @spec handle_message(pid(), pid(), map()) :: no_return - def handle_message(chan_pid, client_pid, handlers) do - receive do - {:DOWN, _ref, :process, _pid, _reason} -> - ReceiverManager.unregister_receiver(chan_pid, client_pid) - - {:EXIT, _ref, _reason} -> - ReceiverManager.unregister_receiver(chan_pid, client_pid) - - {:add_handler, handler, opts} -> - new_handlers = add_handler(handlers, handler, opts) - handle_message(chan_pid, client_pid, new_handlers) - - msg -> - with true <- Process.alive?(client_pid), - new_handlers <- do_handle_message(client_pid, handlers, msg), - size when size > 0 <- map_size(new_handlers) do - handle_message(chan_pid, client_pid, new_handlers) - else - _ -> ReceiverManager.unregister_receiver(chan_pid, client_pid) - end - end - end - - defp add_handler(handlers, :consume, opts) do - if opts[:tag] do - consumer_tags = (handlers[:consume] || []) ++ [opts[:tag]] - Map.put(handlers, :consume, consumer_tags |> Enum.uniq()) - else - handlers - end - end - - defp add_handler(handlers, handler, _) do - Map.put(handlers, handler, true) - end - - defp remove_handler(handlers, :consume, opts) do - if handlers[:consume] && opts[:tag] do - consumer_tags = List.delete(handlers[:consume], opts[:tag]) - - if length(consumer_tags) == 0 do - Map.delete(handlers, :consume) - else - Map.put(handlers, :consume, consumer_tags) - end - else - handlers - end - end - - defp remove_handler(handlers, handler, _) do - Map.delete(handlers, handler) - end - - # -- Confirm.register_handler - - defp do_handle_message( - client_pid, - handlers, - basic_ack(delivery_tag: delivery_tag, multiple: multiple) - ) do - send(client_pid, {:basic_ack, delivery_tag, multiple}) - handlers - end - - defp do_handle_message( - client_pid, - handlers, - basic_nack(delivery_tag: delivery_tag, multiple: multiple) - ) do - send(client_pid, {:basic_nack, delivery_tag, multiple}) - handlers - end - - # -- Basic.consume - - defp do_handle_message(client_pid, handlers, basic_consume_ok(consumer_tag: consumer_tag)) do - send(client_pid, {:basic_consume_ok, %{consumer_tag: consumer_tag}}) - add_handler(handlers, :consume, tag: consumer_tag) - end - - defp do_handle_message(client_pid, handlers, basic_cancel_ok(consumer_tag: consumer_tag)) do - send(client_pid, {:basic_cancel_ok, %{consumer_tag: consumer_tag}}) - remove_handler(handlers, :consume, tag: consumer_tag) - end - - defp do_handle_message( - client_pid, - handlers, - basic_cancel(consumer_tag: consumer_tag, nowait: no_wait) - ) do - send(client_pid, {:basic_cancel, %{consumer_tag: consumer_tag, no_wait: no_wait}}) - remove_handler(handlers, :consume, tag: consumer_tag) - end - - defp do_handle_message(client_pid, handlers, { - basic_deliver( - consumer_tag: consumer_tag, - delivery_tag: delivery_tag, - redelivered: redelivered, - exchange: exchange, - routing_key: routing_key - ), - amqp_msg( - props: - p_basic( - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - delivery_mode: delivery_mode, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - ), - payload: payload - ) - }) do - send( - client_pid, - {:basic_deliver, payload, - %{ - consumer_tag: consumer_tag, - delivery_tag: delivery_tag, - redelivered: redelivered, - exchange: exchange, - routing_key: routing_key, - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - persistent: delivery_mode == 2, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - }} - ) - - handlers - end - - defp do_handle_message(client_pid, handlers, { - basic_return( - reply_code: reply_code, - reply_text: reply_text, - exchange: exchange, - routing_key: routing_key - ), - amqp_msg( - props: - p_basic( - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - delivery_mode: delivery_mode, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - ), - payload: payload - ) - }) do - send( - client_pid, - {:basic_return, payload, - %{ - reply_code: reply_code, - reply_text: reply_text, - exchange: exchange, - routing_key: routing_key, - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - persistent: delivery_mode == 2, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - }} - ) - - handlers - end -end diff --git a/lib/amqp/channel/receiver_manager.ex b/lib/amqp/channel/receiver_manager.ex deleted file mode 100644 index 3562001..0000000 --- a/lib/amqp/channel/receiver_manager.ex +++ /dev/null @@ -1,128 +0,0 @@ -defmodule AMQP.Channel.ReceiverManager do - @moduledoc false - - # Manages receivers. - # - # AMQP relays messages from a channel to a client and convert - # [Record](https://hexdocs.pm/elixir/Record.html) to the data structure which - # is familiar with Elixir developer (Tuple, Map). - # We call the processes in between a channel and a client Receiver and - # this module manages them. - # - # This module ensures to have a single Receiver per a channel and a client. - # With that way, the sequence of the messages from channel would be always - # preserved. - - use GenServer - alias AMQP.Channel.Receiver - - @enforce_keys [:pid, :channel, :client] - @type t :: %__MODULE__{ - pid: pid(), - channel: pid(), - client: pid() - } - defstruct [:pid, :channel, :client] - - @doc false - @spec start_link(map) :: GenServer.on_start() - def start_link(ops \\ []) - def start_link([]), do: start_link(name: __MODULE__) - - def start_link(opts) do - GenServer.start_link(__MODULE__, %{}, opts) - end - - @impl true - def init(state = %{}) do - {:ok, state} - end - - @doc """ - Returns a receiver pid for the channel and client. - """ - @spec get_receiver(pid(), pid()) :: t() | nil - def get_receiver(channel, client) do - GenServer.call(__MODULE__, {:get, channel, client}) - end - - @doc """ - Unregisters a receiver from GenServer. - """ - @spec unregister_receiver(pid(), pid()) :: :ok - def unregister_receiver(channel, client) do - GenServer.call(__MODULE__, {:unregister, channel, client}) - end - - @doc """ - Registers a receiver pid for the channel and client. - """ - @spec register_handler(pid(), pid(), atom(), keyword()) :: t() | nil - def register_handler(channel, client, handler, opts \\ []) do - GenServer.call(__MODULE__, {:register_handler, channel, client, handler, opts}) - end - - @impl true - def handle_call({:get, channel, client}, _from, receivers) do - key = get_key(channel, client) - {:reply, Map.get(receivers, key), receivers} - end - - def handle_call({:unregister, channel, client}, _from, receivers) do - key = get_key(channel, client) - {:reply, :ok, Map.delete(receivers, key)} - end - - def handle_call({:register_handler, channel, client, handler, opts}, _from, receivers) do - receiver = - receivers - |> get_or_spawn_receiver(channel, client) - |> add_handler(handler, opts) - - key = get_key(channel, client) - {:reply, receiver, Map.put(receivers, key, receiver)} - end - - defp get_receiver(receivers, channel, client) do - key = get_key(channel, client) - - if (receiver = receivers[key]) && Process.alive?(receiver.pid) do - receiver - else - nil - end - end - - defp get_or_spawn_receiver(receivers, channel, client) do - if receiver = get_receiver(receivers, channel, client) do - receiver - else - spawn_receiver(channel, client) - end - end - - defp add_handler(receiver, handler, opts) do - send(receiver.pid, {:add_handler, handler, opts}) - receiver - end - - defp spawn_receiver(channel, client) do - receiver_pid = - spawn(fn -> - Process.flag(:trap_exit, true) - Process.monitor(channel) - Process.monitor(client) - Receiver.handle_message(channel, client, %{}) - end) - - %__MODULE__{ - pid: receiver_pid, - channel: channel, - client: client - } - end - - defp get_key(channel, client) do - "#{inspect(channel)}-#{inspect(client)}" - end -end diff --git a/lib/amqp/confirm.ex b/lib/amqp/confirm.ex index 72f84c1..ea3058a 100644 --- a/lib/amqp/confirm.ex +++ b/lib/amqp/confirm.ex @@ -5,7 +5,6 @@ defmodule AMQP.Confirm do import AMQP.Core alias AMQP.{Basic, Channel} - alias AMQP.Channel.ReceiverManager @doc """ Activates publishing confirmations on the channel. @@ -71,9 +70,8 @@ defmodule AMQP.Confirm do see https://www.rabbitmq.com/confirms.html """ @spec register_handler(Channel.t(), pid) :: :ok - def register_handler(%Channel{pid: chan_pid}, handler_pid) do - receiver = ReceiverManager.register_handler(chan_pid, handler_pid, :confirm) - :amqp_channel.register_confirm_handler(chan_pid, receiver.pid) + def register_handler(%Channel{} = chan, handler_pid) do + :amqp_channel.call_consumer(chan.pid, {:register_confirm_handler, chan, handler_pid}) end @doc """ diff --git a/lib/amqp/connection.ex b/lib/amqp/connection.ex index 6c6d84b..c30e637 100644 --- a/lib/amqp/connection.ex +++ b/lib/amqp/connection.ex @@ -26,9 +26,9 @@ defmodule AMQP.Connection do """ @spec open(keyword | String.t()) :: {:ok, t()} | {:error, atom()} | {:error, any()} - def open(uri_or_options \\ []) when is_binary(uri_or_options) or is_list(uri_or_options) do - open(uri_or_options, :undefined) - end + def open(uri_or_options \\ []) + def open(uri) when is_binary(uri), do: open(uri, []) + def open(options) when is_list(options), do: open("", options) @doc """ Opens an new Connection to an AMQP broker. @@ -92,40 +92,10 @@ defmodule AMQP.Connection do fail_if_no_peer_cert: true ] ) - - ## Backward compatibility for connection name - - RabbitMQ supports user-specified connection names since version 3.6.2. - - Previously AMQP took a connection name as a separate parameter on `open/2` and `open/3` and it is still supported in this version. - - iex> options = [host: "localhost", port: 5672, virtual_host: "/", username: "guest", password: "guest"] - iex> AMQP.Connection.open(options, :undefined) - {:ok, %AMQP.Connection{}} - - iex> AMQP.Connection.open("amqp://guest:guest@localhost", "my-connection") - {:ok, %AMQP.Connection{}} - - iex> AMQP.Connection.open("amqp://guest:guest@localhost", "my-connection", options) - {:ok, %AMQP.Connection{}} - - However the connection name parameter is now deprecated and might not be supported in the future versions. - You are recommented to pass it with `:name` option instead: - - iex> AMQP.Connection.open("amqp://guest:guest@localhost", name: "my-connection") - {:ok, %AMQP.Connection{}} """ - @spec open(String.t() | keyword, keyword | String.t() | :undefined) :: - {:ok, t()} | {:error, atom()} | {:error, any()} - def open(uri, options) - - def open(uri, name) when is_binary(uri) and (is_binary(name) or name == :undefined) do - do_open(uri, name, _options = []) - end - - def open(options, name) when is_list(options) and (is_binary(name) or name == :undefined) do - {name_from_opts, options} = take_connection_name(options) - name = if name == :undefined, do: name_from_opts, else: name + @spec open(String.t(), keyword) :: {:ok, t()} | {:error, atom()} | {:error, any()} + def open(uri, options) when is_nil(uri) or uri == "" do + {name, options} = take_connection_name(options) options |> merge_options_to_default() @@ -138,14 +108,6 @@ defmodule AMQP.Connection do do_open(uri, name, options) end - @doc false - @deprecated "Use :name in open/2 instead" - @spec open(String.t(), String.t() | :undefined, keyword) :: - {:ok, t()} | {:error, atom()} | {:error, any()} - def open(uri, name, options) when is_binary(uri) and is_list(options) do - do_open(uri, name, options) - end - defp do_open(uri, name, options) do case uri |> String.to_charlist() |> :amqp_uri.parse() do {:ok, amqp_params} -> amqp_params |> merge_options_to_amqp_params(options) |> do_open(name) diff --git a/lib/amqp/consumer_helper.ex b/lib/amqp/consumer_helper.ex new file mode 100644 index 0000000..40abf16 --- /dev/null +++ b/lib/amqp/consumer_helper.ex @@ -0,0 +1,140 @@ +defmodule AMQP.ConsumerHelper do + @moduledoc false + + import AMQP.Core + + @doc false + def compose_message(method, message \\ :undefined) + + def compose_message(basic_consume_ok() = method, _message) do + body = method |> basic_consume_ok() |> Enum.into(%{}) + {:basic_consume_ok, body} + end + + def compose_message(basic_cancel_ok() = method, _message) do + body = method |> basic_cancel_ok() |> Enum.into(%{}) + {:basic_cancel_ok, body} + end + + def compose_message(basic_cancel() = method, _message) do + body = method |> basic_cancel() |> Enum.into(%{}) + {:basic_cancel, body} + end + + def compose_message(basic_credit_drained() = method, _message) do + body = method |> basic_credit_drained() |> Enum.into(%{}) + {:basic_credit_drained, body} + end + + def compose_message( + basic_deliver( + consumer_tag: consumer_tag, + delivery_tag: delivery_tag, + redelivered: redelivered, + exchange: exchange, + routing_key: routing_key + ), + amqp_msg( + props: + p_basic( + content_type: content_type, + content_encoding: content_encoding, + headers: headers, + delivery_mode: delivery_mode, + priority: priority, + correlation_id: correlation_id, + reply_to: reply_to, + expiration: expiration, + message_id: message_id, + timestamp: timestamp, + type: type, + user_id: user_id, + app_id: app_id, + cluster_id: cluster_id + ), + payload: payload + ) + ) do + {:basic_deliver, payload, + %{ + consumer_tag: consumer_tag, + delivery_tag: delivery_tag, + redelivered: redelivered, + exchange: exchange, + routing_key: routing_key, + content_type: content_type, + content_encoding: content_encoding, + headers: headers, + persistent: delivery_mode == 2, + priority: priority, + correlation_id: correlation_id, + reply_to: reply_to, + expiration: expiration, + message_id: message_id, + timestamp: timestamp, + type: type, + user_id: user_id, + app_id: app_id, + cluster_id: cluster_id + }} + end + + def compose_message( + basic_return( + reply_code: reply_code, + reply_text: reply_text, + exchange: exchange, + routing_key: routing_key + ), + amqp_msg( + props: + p_basic( + content_type: content_type, + content_encoding: content_encoding, + headers: headers, + delivery_mode: delivery_mode, + priority: priority, + correlation_id: correlation_id, + reply_to: reply_to, + expiration: expiration, + message_id: message_id, + timestamp: timestamp, + type: type, + user_id: user_id, + app_id: app_id, + cluster_id: cluster_id + ), + payload: payload + ) + ) do + {:basic_return, payload, + %{ + reply_code: reply_code, + reply_text: reply_text, + exchange: exchange, + routing_key: routing_key, + content_type: content_type, + content_encoding: content_encoding, + headers: headers, + persistent: delivery_mode == 2, + priority: priority, + correlation_id: correlation_id, + reply_to: reply_to, + expiration: expiration, + message_id: message_id, + timestamp: timestamp, + type: type, + user_id: user_id, + app_id: app_id, + cluster_id: cluster_id + }} + end + + def compose_message(basic_ack(delivery_tag: delivery_tag, multiple: multiple), _message) do + {:basic_ack, delivery_tag, multiple} + end + + def compose_message(basic_nack(delivery_tag: delivery_tag, multiple: multiple), _message) do + {:basic_nack, delivery_tag, multiple} + end +end diff --git a/lib/amqp/core.ex b/lib/amqp/core.ex index 5c68a42..a7302aa 100644 --- a/lib/amqp/core.ex +++ b/lib/amqp/core.ex @@ -201,6 +201,12 @@ defmodule AMQP.Core do Record.extract(:"basic.nack", from_lib: "rabbit_common/include/rabbit_framing.hrl") ) + Record.defrecord( + :basic_credit_drained, + :"basic.credit_drained", + Record.extract(:"basic.credit_drained", from_lib: "rabbit_common/include/rabbit_framing.hrl") + ) + Record.defrecord( :confirm_select, :"confirm.select", diff --git a/lib/amqp/direct_consumer.ex b/lib/amqp/direct_consumer.ex index b2a1a94..254dd1d 100644 --- a/lib/amqp/direct_consumer.ex +++ b/lib/amqp/direct_consumer.ex @@ -16,6 +16,7 @@ defmodule AMQP.DirectConsumer do For more information see: https://www.rabbitmq.com/erlang-client-user-guide.html#consumers-imlementation """ import AMQP.Core + import AMQP.ConsumerHelper @behaviour :amqp_gen_consumer ######################################################### @@ -35,85 +36,32 @@ defmodule AMQP.DirectConsumer do end @impl true - def handle_consume_ok(basic_consume_ok(consumer_tag: consumer_tag), _args, consumer) do - _ = send(consumer, {:basic_consume_ok, %{consumer_tag: consumer_tag}}) + def handle_consume_ok(method, _args, consumer) do + send(consumer, compose_message(method)) {:ok, consumer} end @impl true - def handle_cancel(basic_cancel(consumer_tag: consumer_tag, nowait: no_wait), consumer) do - _ = send(consumer, {:basic_cancel, %{consumer_tag: consumer_tag, no_wait: no_wait}}) + def handle_cancel(method, consumer) do + send(consumer, compose_message(method)) {:ok, consumer} end @impl true - def handle_cancel_ok(basic_cancel_ok(consumer_tag: consumer_tag), _args, consumer) do - _ = send(consumer, {:basic_cancel_ok, %{consumer_tag: consumer_tag}}) + def handle_cancel_ok(method, _args, consumer) do + send(consumer, compose_message(method)) {:ok, consumer} end @impl true - def handle_server_cancel(basic_cancel(consumer_tag: consumer_tag, nowait: no_wait), consumer) do - _ = send(consumer, {:basic_cancel, %{consumer_tag: consumer_tag, no_wait: no_wait}}) + def handle_server_cancel(method, consumer) do + send(consumer, compose_message(method)) {:ok, consumer} end @impl true - def handle_deliver( - basic_deliver( - consumer_tag: consumer_tag, - delivery_tag: delivery_tag, - redelivered: redelivered, - exchange: exchange, - routing_key: routing_key - ), - amqp_msg( - props: - p_basic( - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - delivery_mode: delivery_mode, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - ), - payload: payload - ), - consumer - ) do - send( - consumer, - {:basic_deliver, payload, - %{ - consumer_tag: consumer_tag, - delivery_tag: delivery_tag, - redelivered: redelivered, - exchange: exchange, - routing_key: routing_key, - content_type: content_type, - content_encoding: content_encoding, - headers: headers, - persistent: delivery_mode == 2, - priority: priority, - correlation_id: correlation_id, - reply_to: reply_to, - expiration: expiration, - message_id: message_id, - timestamp: timestamp, - type: type, - user_id: user_id, - app_id: app_id, - cluster_id: cluster_id - }} - ) + def handle_deliver(method, message, consumer) do + send(consumer, compose_message(method, message)) {:ok, consumer} end @@ -135,11 +83,41 @@ defmodule AMQP.DirectConsumer do end def handle_info(down = {:DOWN, _, _, _, _}, consumer) do - _ = send(consumer, down) + send(consumer, down) + {:ok, consumer} + end + + def handle_info({basic_return() = method, message}, consumer) do + send(consumer, compose_message(method, message)) + + {:ok, consumer} + end + + def handle_info(basic_ack() = method, consumer) do + send(consumer, compose_message(method)) + + {:ok, consumer} + end + + def handle_info(basic_nack() = method, consumer) do + send(consumer, compose_message(method)) + {:ok, consumer} end @impl true + def handle_call({:register_return_handler, chan, consumer}, _from, consumer) do + :amqp_channel.register_return_handler(chan.pid, self()) + + {:reply, :ok, consumer} + end + + def handle_call({:register_confirm_handler, chan, consumer}, _from, consumer) do + :amqp_channel.register_confirm_handler(chan.pid, self()) + + {:reply, :ok, consumer} + end + def handle_call(_req, _from, consumer) do {:reply, {:error, :undefined}, consumer} end diff --git a/lib/amqp/exchange.ex b/lib/amqp/exchange.ex index b1d6c5d..672795e 100644 --- a/lib/amqp/exchange.ex +++ b/lib/amqp/exchange.ex @@ -25,7 +25,7 @@ defmodule AMQP.Exchange do * `:auto_delete` - If set, deletes the Exchange once all queues unbind from it; * `:passive` - If set, returns an error if the Exchange does not already exist; * `:internal` - If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications. - * `:no_wait` - If set, the declare operation is asynchronous. Defaults to + * `:nowait` - If set, the declare operation is asynchronous. Defaults to `false`. * `:arguments` - A list of arguments to pass when declaring (of type `t:AMQP.arguments/0`). See the README for more information. Defaults to `[]`. @@ -33,6 +33,8 @@ defmodule AMQP.Exchange do """ @spec declare(Channel.t(), Basic.exchange(), type :: atom, keyword) :: :ok | Basic.error() def declare(%Channel{pid: pid}, exchange, type \\ :direct, options \\ []) do + nowait = get_nowait(options) + exchange_declare = exchange_declare( exchange: exchange, @@ -41,13 +43,14 @@ defmodule AMQP.Exchange do durable: Keyword.get(options, :durable, false), auto_delete: Keyword.get(options, :auto_delete, false), internal: Keyword.get(options, :internal, false), - nowait: Keyword.get(options, :no_wait, false), + nowait: nowait, arguments: Keyword.get(options, :arguments, []) ) - case :amqp_channel.call(pid, exchange_declare) do - exchange_declare_ok() -> :ok - error -> {:error, error} + case {nowait, :amqp_channel.call(pid, exchange_declare)} do + {true, :ok} -> :ok + {_, exchange_declare_ok()} -> :ok + {_, error} -> {:error, error} end end @@ -59,22 +62,25 @@ defmodule AMQP.Exchange do * `:if_unused` - If set, the server will only delete the exchange if it has no queue bindings. - * `:no_wait` - If set, the delete operation is asynchronous. Defaults to + * `:nowait` - If set, the delete operation is asynchronous. Defaults to `false`. """ @spec delete(Channel.t(), Basic.exchange(), keyword) :: :ok | Basic.error() def delete(%Channel{pid: pid}, exchange, options \\ []) do + nowait = get_nowait(options) + exchange_delete = exchange_delete( exchange: exchange, if_unused: Keyword.get(options, :if_unused, false), - nowait: Keyword.get(options, :no_wait, false) + nowait: nowait ) - case :amqp_channel.call(pid, exchange_delete) do - exchange_delete_ok() -> :ok - error -> {:error, error} + case {nowait, :amqp_channel.call(pid, exchange_delete)} do + {true, :ok} -> :ok + {_, exchange_delete_ok()} -> :ok + {_, error} -> {:error, error} end end @@ -85,7 +91,7 @@ defmodule AMQP.Exchange do ## Options * `:routing_key` - the routing key to use for the binding. Defaults to `""`. - * `:no_wait` - If set, the bind operation is asynchronous. Defaults to + * `:nowait` - If set, the bind operation is asynchronous. Defaults to `false`. * `:arguments` - A list of arguments to pass when binding (of type `t:AMQP.arguments/0`). See the README for more information. Defaults to `[]`. @@ -94,18 +100,21 @@ defmodule AMQP.Exchange do @spec bind(Channel.t(), destination :: String.t(), source :: String.t(), keyword) :: :ok | Basic.error() def bind(%Channel{pid: pid}, destination, source, options \\ []) do + nowait = get_nowait(options) + exchange_bind = exchange_bind( destination: destination, source: source, routing_key: Keyword.get(options, :routing_key, ""), - nowait: Keyword.get(options, :no_wait, false), + nowait: nowait, arguments: Keyword.get(options, :arguments, []) ) - case :amqp_channel.call(pid, exchange_bind) do - exchange_bind_ok() -> :ok - error -> {:error, error} + case {nowait, :amqp_channel.call(pid, exchange_bind)} do + {true, :ok} -> :ok + {_, exchange_bind_ok()} -> :ok + {_, error} -> {:error, error} end end @@ -116,7 +125,7 @@ defmodule AMQP.Exchange do ## Options * `:routing_key` - the routing key to use for the binding. Defaults to `""`. - * `:no_wait` - If set, the declare operation is asynchronous. Defaults to + * `:nowait` - If set, the declare operation is asynchronous. Defaults to `false`. * `:arguments` - A list of arguments to pass when declaring (of type `t:AMQP.arguments/0`). See the README for more information. Defaults to `[]`. @@ -125,18 +134,21 @@ defmodule AMQP.Exchange do @spec unbind(Channel.t(), destination :: String.t(), source :: String.t(), keyword) :: :ok | Basic.error() def unbind(%Channel{pid: pid}, destination, source, options \\ []) do + nowait = get_nowait(options) + exchange_unbind = exchange_unbind( destination: destination, source: source, routing_key: Keyword.get(options, :routing_key, ""), - nowait: Keyword.get(options, :no_wait, false), + nowait: nowait, arguments: Keyword.get(options, :arguments, []) ) - case :amqp_channel.call(pid, exchange_unbind) do - exchange_unbind_ok() -> :ok - error -> {:error, error} + case {nowait, :amqp_channel.call(pid, exchange_unbind)} do + {true, :ok} -> :ok + {_, exchange_unbind_ok()} -> :ok + {_, error} -> {:error, error} end end @@ -175,4 +187,9 @@ defmodule AMQP.Exchange do def topic(%Channel{} = channel, exchange, options \\ []) do declare(channel, exchange, :topic, options) end + + # support backward compatibility with old key name + defp get_nowait(opts) do + Keyword.get(opts, :nowait, false) || Keyword.get(opts, :no_wait, false) + end end diff --git a/lib/amqp/queue.ex b/lib/amqp/queue.ex index d69d527..d9c51d8 100644 --- a/lib/amqp/queue.ex +++ b/lib/amqp/queue.ex @@ -23,14 +23,16 @@ defmodule AMQP.Queue do Defaults to `false`. * `:passive` - If set, raises an error unless the queue already exists. Defaults to `false`. - * `:no_wait` - If set, the declare operation is asynchronous. Defaults to + * `:nowait` - If set, the declare operation is asynchronous. Defaults to `false`. * `:arguments` - A list of arguments to pass when declaring (of type `t:AMQP.arguments/0`). See the README for more information. Defaults to `[]`. """ - @spec declare(Channel.t(), Basic.queue(), keyword) :: {:ok, map} | Basic.error() + @spec declare(Channel.t(), Basic.queue(), keyword) :: {:ok, map} | :ok | Basic.error() def declare(%Channel{pid: pid}, queue \\ "", options \\ []) do + nowait = get_nowait(options) + queue_declare = queue_declare( queue: queue, @@ -38,19 +40,23 @@ defmodule AMQP.Queue do durable: Keyword.get(options, :durable, false), exclusive: Keyword.get(options, :exclusive, false), auto_delete: Keyword.get(options, :auto_delete, false), - nowait: Keyword.get(options, :no_wait, false), + nowait: nowait, arguments: Keyword.get(options, :arguments, []) |> Utils.to_type_tuple() ) - case :amqp_channel.call(pid, queue_declare) do - queue_declare_ok( - queue: queue, - message_count: message_count, - consumer_count: consumer_count - ) -> + case {nowait, :amqp_channel.call(pid, queue_declare)} do + {true, :ok} -> + :ok + + {_, + queue_declare_ok( + queue: queue, + message_count: message_count, + consumer_count: consumer_count + )} -> {:ok, %{queue: queue, message_count: message_count, consumer_count: consumer_count}} - error -> + {_, error} -> {:error, error} end end @@ -62,25 +68,28 @@ defmodule AMQP.Queue do * `:routing_key` - The routing key used to bind the queue to the exchange. Defaults to `""`. - * `:no_wait` - If `true`, the binding is not synchronous. Defaults to `false`. + * `:nowait` - If `true`, the binding is not synchronous. Defaults to `false`. * `:arguments` - A list of arguments to pass when binding (of type `t:AMQP.arguments/0`). See the README for more information. Defaults to `[]`. """ @spec bind(Channel.t(), Basic.queue(), Basic.exchange(), keyword) :: :ok | Basic.error() def bind(%Channel{pid: pid}, queue, exchange, options \\ []) do + nowait = get_nowait(options) + queue_bind = queue_bind( queue: queue, exchange: exchange, routing_key: Keyword.get(options, :routing_key, ""), - nowait: Keyword.get(options, :no_wait, false), + nowait: nowait, arguments: Keyword.get(options, :arguments, []) |> Utils.to_type_tuple() ) - case :amqp_channel.call(pid, queue_bind) do - queue_bind_ok() -> :ok - error -> {:error, error} + case {nowait, :amqp_channel.call(pid, queue_bind)} do + {true, :ok} -> :ok + {_, queue_bind_ok()} -> :ok + {_, error} -> {:error, error} end end @@ -119,22 +128,25 @@ defmodule AMQP.Queue do consumers. If the queue has consumers, it's not deleted and an error is returned. * `:if_empty` - If set, the server will only delete the queue if it has no messages. - * `:no_wait` - If set, the delete operation is asynchronous. + * `:nowait` - If set, the delete operation is asynchronous. """ - @spec delete(Channel.t(), Basic.queue(), keyword) :: {:ok, map} | Basic.error() + @spec delete(Channel.t(), Basic.queue(), keyword) :: {:ok, map} | :ok | Basic.error() def delete(%Channel{pid: pid}, queue, options \\ []) do + nowait = get_nowait(options) + queue_delete = queue_delete( queue: queue, if_unused: Keyword.get(options, :if_unused, false), if_empty: Keyword.get(options, :if_empty, false), - nowait: Keyword.get(options, :no_wait, false) + nowait: nowait ) - case :amqp_channel.call(pid, queue_delete) do - queue_delete_ok(message_count: message_count) -> {:ok, %{message_count: message_count}} - error -> {:error, error} + case {nowait, :amqp_channel.call(pid, queue_delete)} do + {true, :ok} -> :ok + {_, queue_delete_ok(message_count: message_count)} -> {:ok, %{message_count: message_count}} + {_, error} -> {:error, error} end end @@ -228,7 +240,7 @@ defmodule AMQP.Queue do do_consume(channel, fun, consumer_tag) - {:basic_cancel, %{consumer_tag: ^consumer_tag, no_wait: _}} -> + {:basic_cancel, %{consumer_tag: ^consumer_tag}} -> exit(:basic_cancel) {:basic_cancel_ok, %{consumer_tag: ^consumer_tag}} -> @@ -245,4 +257,9 @@ defmodule AMQP.Queue do def unsubscribe(%Channel{} = channel, consumer_tag) do Basic.cancel(channel, consumer_tag) end + + # support backward compatibility with old key name + defp get_nowait(opts) do + Keyword.get(opts, :nowait, false) || Keyword.get(opts, :no_wait, false) + end end diff --git a/lib/amqp/selective_consumer.ex b/lib/amqp/selective_consumer.ex new file mode 100644 index 0000000..3384559 --- /dev/null +++ b/lib/amqp/selective_consumer.ex @@ -0,0 +1,304 @@ +defmodule AMQP.SelectiveConsumer do + @moduledoc """ + This is an Elixir reimplementation of `:amqp_selective_consumer` - [source](https://github.com/rabbitmq/rabbitmq-server/blob/master/deps/amqp_client/src/amqp_selective_consumer.erl) + + The module is used by default when you open a channel via `AMQP.Channel.open/2` and allows you + to end consumer processes via `Basic.consume/4` and receive messages from a queue. + + Usually you don't have to pay attention to this module as the interaction would be made through + `AMQP.Channel.open/2`, `Basic.consume/4`, `Basic.return/2`, `Confirm.register_handler/2` etc. + """ + + import AMQP.Core + import AMQP.ConsumerHelper + alias AMQP.{Channel, SelectiveConsumer} + @behaviour :amqp_gen_consumer + + defstruct consumers: %{}, + unassigned: :undefined, + monitors: %{}, + default_consumer: :none, + return_handler: :none, + confirm_handler: :none + + @type t :: %SelectiveConsumer{ + consumers: %{String.t() => pid}, + unassigned: pid | :undefined, + monitors: %{pid => {integer, reference}}, + default_consumer: pid | :none, + return_handler: pid | :none + } + + @doc """ + Ported from :amqp_selective_consumer.register_default_consumer/2. + + This function registers a default consumer with the channel. + A default consumer is used when a subscription is made via + amqp_channel:call(ChannelPid, #'basic.consume'{}) (rather than {@module}:subscribe/3) and + hence there is no consumer pid registered with the consumer tag. In this case, the relevant + deliveries will be sent to the default consumer. + """ + @spec register_default_consumer(Channel.t(), pid) :: :ok + def register_default_consumer(%Channel{pid: pid}, consumer_pid) do + :amqp_channel.call_consumer(pid, {:register_default_consumer, consumer_pid}) + end + + @impl true + def init(_state) do + {:ok, %SelectiveConsumer{}} + end + + @impl true + def handle_consume(basic_consume(consumer_tag: tag, nowait: nowait), pid, status) do + result = + case nowait do + true when tag == :undefined or is_nil(tag) or byte_size(tag) == 0 -> + :no_consumer_tag_specified + + _ when is_binary(tag) and byte_size(tag) >= 0 -> + case resolve_consumer(tag, status) do + {:consumer, _} -> :consumer_tag_in_use + _ -> :ok + end + + _ -> + :ok + end + + case {result, nowait} do + {:ok, true} -> + c = Map.put(status.consumers, tag, pid) + m = add_to_monitors(status.monitors, pid) + {:ok, %{status | consumers: c, monitors: m}} + + {:ok, false} -> + {:ok, %{status | unassigned: pid}} + + {error, true} -> + {:error, error, status} + + {_error, false} -> + # Don't do anything (don't override existing consumers), the server will close the channel with an error. + {:ok, status} + end + end + + @impl true + def handle_consume_ok( + basic_consume_ok(consumer_tag: tag) = consume_ok, + _consume, + %{unassigned: pid} = status + ) + when is_pid(pid) do + c = Map.put(status.consumers, tag, pid) + m = add_to_monitors(status.monitors, pid) + + status = %{status | consumers: c, monitors: m, unassigned: :undefined} + {:ok, %{status | consumers: c, monitors: m}} + + deliver(consume_ok, status) + + {:ok, status} + end + + @impl true + def handle_cancel(basic_cancel(nowait: true), %{default_consumer: :none}) do + exit(:cancel_nowait_requires_default_consumer) + end + + def handle_cancel(basic_cancel(nowait: nowait) = cancel, status) do + case nowait do + true -> {:ok, do_cancel(cancel, status)} + false -> {:ok, status} + end + end + + defp do_cancel(cancel, status) do + tag = tag(cancel) + + case Map.fetch(status.consumers, tag) do + {:ok, consumer} -> + c = Map.delete(status.consumers, tag) + m = remove_from_monitors(status.monitors, consumer) + %{status | consumers: c, monitors: m} + + _error -> + # untracked consumer + status + end + end + + @impl true + def handle_cancel_ok(basic_cancel_ok() = cancel_ok, _cancel, status) do + new_status = do_cancel(cancel_ok, status) + # use old status + deliver(cancel_ok, status) + + {:ok, new_status} + end + + @impl true + def handle_server_cancel(basic_cancel(nowait: true) = cancel, status) do + new_status = do_cancel(cancel, status) + # use old status + deliver(cancel, status) + + {:ok, new_status} + end + + @impl true + def handle_deliver(method, message, status) do + deliver(method, message, status) + {:ok, status} + end + + @impl true + def handle_deliver(_method, _message, _delivery_ctx, _status) do + # The handler is called with delivery_ctx for direct connection. + # Since the library is not supporting direct connection, returns an error. + # + # deliver(method, message, delivery_ctx, status) + # {:ok, status} + + {:error, :undefined} + end + + @impl true + def handle_info({:DOWN, _ref, :process, pid, _reason}, status) do + m = Map.delete(status.monitors, pid) + d = if status.default_consumer == pid, do: :none, else: status.default_consumer + c = status.consumers |> Enum.reject(fn {_, v} -> v == pid end) |> Map.new() + + {:ok, %{status | consumers: c, monitors: m, default_consumer: d}} + end + + def handle_info(basic_credit_drained() = method, status) do + deliver(method, status) + {:ok, status} + end + + def handle_info({basic_return() = method, message}, %{return_handler: pid} = status) + when is_pid(pid) do + composed = compose_message(method, message) + send(pid, composed) + + {:ok, status} + end + + def handle_info(basic_ack() = method, %{confirm_handler: pid} = status) when is_pid(pid) do + composed = compose_message(method, :undefined) + send(pid, composed) + + {:ok, status} + end + + def handle_info(basic_nack() = method, %{confirm_handler: pid} = status) when is_pid(pid) do + composed = compose_message(method, :undefined) + send(pid, composed) + + {:ok, status} + end + + @impl true + def handle_call({:register_default_consumer, pid}, _from, status) do + m = + if is_pid(status.default_consumer) do + remove_from_monitors(status.monitors, status.default_consumer) + else + status.monitors + end + |> add_to_monitors(pid) + + {:reply, :ok, %{status | monitors: m, default_consumer: pid}} + end + + def handle_call({:register_return_handler, chan, handler_pid}, _from, status) do + :amqp_channel.register_return_handler(chan.pid, self()) + + {:reply, :ok, %{status | return_handler: handler_pid}} + end + + def handle_call({:register_confirm_handler, chan, handler_pid}, _from, status) do + :amqp_channel.register_confirm_handler(chan.pid, self()) + + {:reply, :ok, %{status | confirm_handler: handler_pid}} + end + + @impl true + def terminate(_reason, _status) do + :ok + end + + defp deliver(method, status) do + deliver(method, :undefined, status) + end + + defp deliver(method, message, status) do + tag = tag(method) + composed = compose_message(method, message) + deliver_to_consumer_or_die(tag, composed, status) + end + + # delivery_ctx support is yet to come. + # + # defp deliver(method, message, delivery_ctx, status) do + # tag = tag(method) + # composed = + # method + # |> compose_message(message) + # |> Tuple.append(delivery_ctx) + # + # deliver_to_consumer_or_die(tag, composed, status) + # end + + defp deliver_to_consumer_or_die(tag, message, status) do + case resolve_consumer(tag, status) do + {:consumer, pid} -> send(pid, message) + {:default, pid} -> send(pid, message) + _error -> exit(:unexpected_delivery_and_no_default_consumer) + end + end + + # AMQP original: convert Erlang record to map + + defp resolve_consumer(tag, %{consumers: consumers, default_consumer: default}) do + case Map.fetch(consumers, tag) do + {:ok, pid} -> + {:consumer, pid} + + :error when is_pid(default) -> + {:default, default} + + _ -> + :error + end + end + + defp add_to_monitors(monitors, pid) do + case Map.fetch(monitors, pid) do + :error -> + Map.put(monitors, pid, {1, :erlang.monitor(:process, pid)}) + + {:ok, {count, mref}} -> + Map.put(monitors, pid, {count + 1, mref}) + end + end + + defp remove_from_monitors(monitors, pid) do + case Map.fetch(monitors, pid) do + {:ok, {1, mref}} -> + :erlang.demonitor(mref) + Map.delete(monitors, pid) + + {:ok, {count, mref}} -> + Map.put(monitors, pid, {count - 1, mref}) + end + end + + defp tag(basic_consume(consumer_tag: tag)), do: tag + defp tag(basic_consume_ok(consumer_tag: tag)), do: tag + defp tag(basic_cancel(consumer_tag: tag)), do: tag + defp tag(basic_cancel_ok(consumer_tag: tag)), do: tag + defp tag(basic_deliver(consumer_tag: tag)), do: tag + defp tag(basic_credit_drained(consumer_tag: tag)), do: tag +end diff --git a/mix.exs b/mix.exs index 5d27b0e..854f5d1 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule AMQP.Mixfile do use Mix.Project - @version "1.6.0" + @version "2.0.0-rc.1" def project do [ @@ -22,13 +22,20 @@ defmodule AMQP.Mixfile do main: "readme", source_ref: "v#{@version}", source_url: "https://github.com/pma/amqp" + ], + test_coverage: [tool: ExCoveralls], + preferred_cli_env: [ + coveralls: :test, + "coveralls.detail": :test, + "coveralls.post": :test, + "coveralls.html": :test ] ] end def application do [ - applications: [:lager, :amqp_client], + applications: [:lager, :amqp_client, :logger], mod: {AMQP.Application, []} ] end @@ -43,7 +50,8 @@ defmodule AMQP.Mixfile do {:inch_ex, "~> 0.5", only: :docs}, # Dev dependencies. - {:dialyxir, "~> 0.5", only: :dev, runtime: false} + {:dialyxir, "~> 0.5", only: :dev, runtime: false}, + {:excoveralls, "~> 0.10", only: :test} ] end diff --git a/mix.lock b/mix.lock index 8cb8a6e..7e91f4c 100644 --- a/mix.lock +++ b/mix.lock @@ -1,18 +1,28 @@ %{ "amqp_client": {:hex, :amqp_client, "3.8.9", "42fb24cfa606f87f77e84604c1ad896c26c236228196a294a07f88f8d2982695", [:make, :rebar3], [{:rabbit_common, "3.8.9", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "8ad70be479d2b8c7882aa2908245b490960fe5c619d2edd56eb24f90e1132b60"}, + "certifi": {:hex, :certifi, "2.5.2", "b7cfeae9d2ed395695dd8201c57a2d019c0c43ecaf8b8bcb9320b40d6662f340", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "3b3b5f36493004ac3455966991eaf6e768ce9884693d9968055aeeeb1e575040"}, "credentials_obfuscation": {:hex, :credentials_obfuscation, "2.2.0", "f8040672ff9644ccaefc40ffb8ec33ed80ac77df84ac5f206a1c079ca2aacc9d", [:rebar3], [], "hexpm", "52dd8585a2123e6e259253a7f4f849f1584404bd25cdfaf18247e198065c601e"}, "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm", "6c32a70ed5d452c6650916555b1f96c79af5fc4bf286997f8b15f213de786f73"}, "earmark": {:hex, :earmark, "1.4.5", "62ffd3bd7722fb7a7b1ecd2419ea0b458c356e7168c1f5d65caf09b4fbdd13c8", [:mix], [], "hexpm", "b7d0e6263d83dc27141a523467799a685965bf8b13b6743413f19a7079843f4f"}, "ex_doc": {:hex, :ex_doc, "0.22.1", "9bb6d51508778193a4ea90fa16eac47f8b67934f33f8271d5e1edec2dc0eee4c", [:mix], [{:earmark, "~> 1.4.0", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "d957de1b75cb9f78d3ee17820733dc4460114d8b1e11f7ee4fd6546e69b1db60"}, + "excoveralls": {:hex, :excoveralls, "0.13.3", "edc5f69218f84c2bf61b3609a22ddf1cec0fbf7d1ba79e59f4c16d42ea4347ed", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cc26f48d2f68666380b83d8aafda0fffc65dafcc8d8650358e0b61f6a99b1154"}, "goldrush": {:hex, :goldrush, "0.1.9", "f06e5d5f1277da5c413e84d5a2924174182fb108dabb39d5ec548b27424cd106", [:rebar3], [], "hexpm", "99cb4128cffcb3227581e5d4d803d5413fa643f4eb96523f77d9e6937d994ceb"}, + "hackney": {:hex, :hackney, "1.16.0", "5096ac8e823e3a441477b2d187e30dd3fff1a82991a806b2003845ce72ce2d84", [:rebar3], [{:certifi, "2.5.2", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.1", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.0", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.6", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "3bf0bebbd5d3092a3543b783bf065165fa5d3ad4b899b836810e513064134e18"}, + "idna": {:hex, :idna, "6.0.1", "1d038fb2e7668ce41fbf681d2c45902e52b3cb9e9c77b55334353b222c2ee50c", [:rebar3], [{:unicode_util_compat, "0.5.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a02c8a1c4fd601215bb0b0324c8a6986749f807ce35f25449ec9e69758708122"}, "inch_ex": {:hex, :inch_ex, "0.5.6", "418357418a553baa6d04eccd1b44171936817db61f4c0840112b420b8e378e67", [:mix], [{:poison, "~> 1.5 or ~> 2.0 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm", "7123ca0450686a61416a06cd38e26af18fd0f8c1cff5214770a957c6e0724338"}, + "jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"}, "jsx": {:hex, :jsx, "2.11.0", "08154624050333919b4ac1b789667d5f4db166dc50e190c4d778d1587f102ee0", [:rebar3], [], "hexpm", "eed26a0d04d217f9eecefffb89714452556cf90eb38f290a27a4d45b9988f8c0"}, "lager": {:hex, :lager, "3.8.0", "3402b9a7e473680ca179fc2f1d827cab88dd37dd1e6113090c6f45ef05228a1c", [:rebar3], [{:goldrush, "0.1.9", [hex: :goldrush, repo: "hexpm", optional: false]}], "hexpm", "f6cb541b688eab60730d8d286eb77256a5a9ad06eac10d43beaf55d07e68bbb6"}, "makeup": {:hex, :makeup, "1.0.3", "e339e2f766d12e7260e6672dd4047405963c5ec99661abdc432e6ec67d29ef95", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "2e9b4996d11832947731f7608fed7ad2f9443011b3b479ae288011265cdd3dad"}, "makeup_elixir": {:hex, :makeup_elixir, "0.14.1", "4f0e96847c63c17841d42c08107405a005a2680eb9c7ccadfd757bd31dabccfb", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f2438b1a80eaec9ede832b5c41cd4f373b38fd7aa33e3b22d9db79e640cbde11"}, + "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, + "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, "nimble_parsec": {:hex, :nimble_parsec, "0.6.0", "32111b3bf39137144abd7ba1cce0914533b2d16ef35e8abc5ec8be6122944263", [:mix], [], "hexpm", "27eac315a94909d4dc68bc07a4a83e06c8379237c5ea528a9acff4ca1c873c52"}, + "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"}, "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm", "fec8660eb7733ee4117b85f55799fd3833eb769a6df71ccf8903e8dc5447cfce"}, "rabbit_common": {:hex, :rabbit_common, "3.8.9", "5a80cb825013ada01921db6f20143295eff8c43284b96a4a40e52576397c9322", [:make, :rebar3], [{:credentials_obfuscation, "2.2.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:jsx, "2.11.0", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "3.8.0", [hex: :lager, repo: "hexpm", optional: false]}, {:ranch, "1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}, {:recon, "2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "e8138d2cf3fcef8e1c37f5e2c0fd39ca052b1fe98d21036508ed58f7edece33d"}, "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"}, "recon": {:hex, :recon, "2.5.1", "430ffa60685ac1efdfb1fe4c97b8767c92d0d92e6e7c3e8621559ba77598678a", [:mix, :rebar3], [], "hexpm", "5721c6b6d50122d8f68cccac712caa1231f97894bab779eff5ff0f886cb44648"}, + "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, + "unicode_util_compat": {:hex, :unicode_util_compat, "0.5.0", "8516502659002cec19e244ebd90d312183064be95025a319a6c7e89f4bccd65b", [:rebar3], [], "hexpm", "d48d002e15f5cc105a696cf2f1bbb3fc72b4b770a184d8420c8db20da2674b38"}, } diff --git a/test/application/channel_test.exs b/test/application/channel_test.exs new file mode 100644 index 0000000..7559de5 --- /dev/null +++ b/test/application/channel_test.exs @@ -0,0 +1,34 @@ +defmodule AMQP.Application.ChnnelTest do + use ExUnit.Case + alias AMQP.Application.Connection, as: AppConn + alias AMQP.Application.Channel, as: AppChan + + setup do + {:ok, app_conn_pid} = AppConn.start_link([]) + + on_exit(fn -> + Process.exit(app_conn_pid, :normal) + end) + + [app_conn: app_conn_pid] + end + + test "opens and accesses channel" do + opts = [connection: :default, proc_name: :test_chan] + {:ok, pid} = AppChan.start_link(opts) + + assert {:ok, %AMQP.Channel{}} = AppChan.get_channel(:test_chan) + Process.exit(pid, :normal) + end + + test "reconnects when the channel is gone" do + opts = [connection: :default, proc_name: :test_chan] + {:ok, _pid} = AppChan.start_link(opts) + {:ok, %AMQP.Channel{} = chan1} = AppChan.get_channel(:test_chan) + AMQP.Channel.close(chan1) + :timer.sleep(50) + + assert {:ok, %AMQP.Channel{} = chan2} = AppChan.get_channel(:test_chan) + refute chan1 == chan2 + end +end diff --git a/test/application/connection_test.exs b/test/application/connection_test.exs new file mode 100644 index 0000000..5efd0aa --- /dev/null +++ b/test/application/connection_test.exs @@ -0,0 +1,24 @@ +defmodule AMQP.Application.ConnectionTest do + use ExUnit.Case + alias AMQP.Application.Connection, as: AppConn + + test "opens and accesses connections" do + opts = [proc_name: :my_conn, retry_interval: 10_000, url: "amqp://guest:guest@localhost"] + {:ok, pid} = AppConn.start_link(opts) + + {:ok, conn} = AppConn.get_connection(:my_conn) + assert %AMQP.Connection{} = conn + + Process.exit(pid, :normal) + end + + test "reconnects when the connection is gone" do + {:ok, _pid} = AppConn.start_link([]) + {:ok, %AMQP.Connection{} = conn1} = AppConn.get_connection() + AMQP.Connection.close(conn1) + :timer.sleep(50) + + assert {:ok, %AMQP.Connection{} = conn2} = AppConn.get_connection() + refute conn1 == conn2 + end +end diff --git a/test/basic_test.exs b/test/basic_test.exs index e6579d3..b32fa52 100644 --- a/test/basic_test.exs +++ b/test/basic_test.exs @@ -2,7 +2,6 @@ defmodule BasicTest do use ExUnit.Case alias AMQP.{Basic, Connection, Channel, Queue} - alias Channel.ReceiverManager setup do {:ok, conn} = Connection.open() @@ -104,15 +103,7 @@ defmodule BasicTest do Channel.close(meta[:chan]) Process.flag(:trap_exit, true) - assert {:normal, _} = catch_exit(Basic.cancel(meta[:chan], consumer_tag)) - end - - test "removes a receiver when queue does not exist", meta do - catch_exit(Basic.consume(meta[:chan], "non-existent-queue")) - - :timer.sleep(100) - receiver = ReceiverManager.get_receiver(meta[:chan].pid, self()) - refute receiver + assert catch_exit(Basic.cancel(meta[:chan], consumer_tag)) end end end diff --git a/test/channel/receiver_test.exs b/test/channel/receiver_test.exs deleted file mode 100644 index 330d7da..0000000 --- a/test/channel/receiver_test.exs +++ /dev/null @@ -1,87 +0,0 @@ -defmodule AMQP.Channel.ReceiverTest do - use ExUnit.Case - - alias AMQP.{Basic, Connection, Channel, Queue} - alias Channel.ReceiverManager - - setup do - {:ok, conn} = Connection.open() - {:ok, chan} = Channel.open(conn) - {:ok, %{queue: queue}} = Queue.declare(chan) - - on_exit(fn -> - Queue.delete(chan, queue) - :ok = Channel.close(chan) - :ok = Connection.close(conn) - end) - - {:ok, conn: conn, chan: chan, queue: queue} - end - - test "closes the receiver when channel is closed", meta do - {:ok, chan} = Channel.open(meta.conn) - - {:ok, consumer_tag} = Basic.consume(chan, meta.queue) - assert_receive {:basic_consume_ok, %{consumer_tag: ^consumer_tag}} - - receiver = ReceiverManager.get_receiver(chan.pid, self()) - assert Process.alive?(receiver.pid) - - :ok = Channel.close(chan) - :timer.sleep(100) - refute ReceiverManager.get_receiver(chan.pid, self()) - refute Process.alive?(receiver.pid) - end - - test "closes the receiver when the client is closed", meta do - {:ok, chan} = Channel.open(meta.conn) - - task = - Task.async(fn -> - {:ok, consumer_tag} = Basic.consume(chan, meta.queue) - assert_receive {:basic_consume_ok, %{consumer_tag: ^consumer_tag}} - ReceiverManager.get_receiver(chan.pid, self()) - end) - - receiver = Task.await(task) - :timer.sleep(100) - refute Process.alive?(task.pid) - refute Process.alive?(receiver.pid) - refute ReceiverManager.get_receiver(chan.pid, task.pid) - - :ok = Channel.close(chan) - end - - test "closes the receiver when all handlers are cancelled", meta do - {:ok, chan} = Channel.open(meta.conn) - - {:ok, consumer_tag} = Basic.consume(chan, meta.queue) - assert_receive {:basic_consume_ok, %{consumer_tag: ^consumer_tag}} - - receiver = ReceiverManager.get_receiver(chan.pid, self()) - assert Process.alive?(receiver.pid) - - {:ok, ^consumer_tag} = Basic.cancel(chan, consumer_tag) - :timer.sleep(100) - refute ReceiverManager.get_receiver(chan.pid, self()) - refute Process.alive?(receiver.pid) - - :ok = Channel.close(chan) - end - - test "unregisters the receiver when the register process dies", meta do - {:ok, chan} = Channel.open(meta.conn) - - {:ok, consumer_tag} = Basic.consume(chan, meta.queue) - assert_receive {:basic_consume_ok, %{consumer_tag: ^consumer_tag}} - - receiver = ReceiverManager.get_receiver(chan.pid, self()) - assert Process.alive?(receiver.pid) - - Process.exit(receiver.pid, :normal) - :timer.sleep(100) - refute ReceiverManager.get_receiver(chan.pid, self()) - - :ok = Channel.close(chan) - end -end diff --git a/test/connection_test.exs b/test/connection_test.exs index 8fb9791..be12987 100644 --- a/test/connection_test.exs +++ b/test/connection_test.exs @@ -59,13 +59,6 @@ defmodule ConnectionTest do assert :ok = Connection.close(conn) end - test "open connection with uri, name, and options (deprected but still spported)" do - assert {:ok, conn} = - Connection.open("amqp://nonexistent:5672", "my-connection", host: 'localhost') - - assert :ok = Connection.close(conn) - end - test "override uri with options" do uri = "amqp://foo:bar@amqp.test.com:12345" {:ok, amqp_params} = uri |> String.to_charlist() |> :amqp_uri.parse() diff --git a/test/direct_consumer_test.exs b/test/direct_consumer_test.exs index b24af97..07ed02c 100644 --- a/test/direct_consumer_test.exs +++ b/test/direct_consumer_test.exs @@ -14,7 +14,7 @@ defmodule DirectConsumerTest do send(receiver_pid, :stop) end) - {:ok, conn: conn, chan: chan} + {:ok, conn: conn, chan: chan, receiver_pid: receiver_pid} end def simple_receiver(pid) do @@ -33,7 +33,7 @@ defmodule DirectConsumerTest do end test "basic return", meta do - :ok = Basic.return(meta[:chan], self()) + :ok = Basic.return(meta[:chan], meta[:receiver_pid]) exchange = "" routing_key = "non-existent-queue" diff --git a/test/queue_test.exs b/test/queue_test.exs index a15ff8f..0c0fb35 100644 --- a/test/queue_test.exs +++ b/test/queue_test.exs @@ -20,6 +20,11 @@ defmodule QueueTest do assert {:ok, %{message_count: 0}} = Queue.delete(meta[:chan], queue) end + test "delare queue with nowait option", meta do + assert :ok = Queue.declare(meta[:chan], "hello", nowait: true) + assert :ok = Queue.delete(meta[:chan], "hello", nowait: true) + end + test "declare queue with explicitly assigned name", meta do name = rand_name() @@ -43,6 +48,20 @@ defmodule QueueTest do assert {:ok, %{message_count: 0}} = Queue.delete(meta[:chan], queue) end + test "bind with nowait option", meta do + queue = rand_name() + exchange = rand_name() + assert :ok = Exchange.fanout(meta[:chan], exchange) + + assert {:ok, %{queue: ^queue, message_count: 0, consumer_count: 0}} = + Queue.declare(meta[:chan], queue) + + assert :ok = Queue.bind(meta[:chan], queue, exchange, nowait: true) + assert :ok = Queue.unbind(meta[:chan], queue, exchange) + assert :ok = Exchange.delete(meta[:chan], exchange) + assert {:ok, %{message_count: 0}} = Queue.delete(meta[:chan], queue) + end + test "status returns message count and consumer count", meta do queue = rand_name() diff --git a/test/selective_consumer_test.exs b/test/selective_consumer_test.exs new file mode 100644 index 0000000..bf7fe57 --- /dev/null +++ b/test/selective_consumer_test.exs @@ -0,0 +1,107 @@ +defmodule AMQP.SelectiveConsumerTest do + use ExUnit.Case + + import AMQP.Core + alias AMQP.{Basic, Connection, Channel, Queue, SelectiveConsumer} + + defmodule InspectConsumer do + def message_loop do + receive do + :stop -> + :ok + + m -> + IO.inspect(m) + message_loop() + end + end + end + + setup do + {:ok, conn} = Connection.open() + {:ok, chan} = Channel.open(conn) + {:ok, %{queue: queue}} = Queue.declare(chan) + default_consumer = spawn(&InspectConsumer.message_loop/0) + SelectiveConsumer.register_default_consumer(chan, default_consumer) + + on_exit(fn -> + if Process.alive?(chan.pid) do + Queue.delete(chan, queue) + send(default_consumer, :stop) + Channel.close(chan) + end + + :ok = Connection.close(conn) + end) + + {:ok, conn: conn, chan: chan, queue: queue} + end + + describe "with nowait option" do + test "Basic.consume", meta do + consumer_tag = "hello" + opts = [nowait: true, consumer_tag: consumer_tag] + {:ok, ^consumer_tag} = Basic.consume(meta[:chan], meta[:queue], self(), opts) + refute_receive {:basic_consume_ok, _} + + Basic.publish(meta[:chan], "", meta[:queue], "hi") + assert_receive {:basic_deliver, "hi", %{consumer_tag: ^consumer_tag}} + + {:ok, ^consumer_tag} = Basic.cancel(meta[:chan], consumer_tag) + end + + test "Basic.cancel", meta do + {:ok, consumer_tag} = Basic.consume(meta[:chan], meta[:queue], self()) + + {:ok, ^consumer_tag} = Basic.cancel(meta[:chan], consumer_tag, nowait: true) + refute_receive {:basic_cancel_ok, _} + end + end + + describe "with multiple queues" do + setup meta do + {:ok, %{queue: queue2}} = Queue.declare(meta[:chan]) + + on_exit(fn -> + Queue.delete(meta[:chan], queue2) + end) + + {:ok, queue2: queue2} + end + + test "does not delete the consumer until all gone", meta do + {:ok, consumer_tag1} = Basic.consume(meta[:chan], meta[:queue]) + {:ok, consumer_tag2} = Basic.consume(meta[:chan], meta[:queue2]) + + {:ok, ^consumer_tag1} = Basic.cancel(meta[:chan], consumer_tag1) + + Basic.publish(meta[:chan], "", meta[:queue2], "hi") + assert_receive {:basic_deliver, "hi", %{consumer_tag: ^consumer_tag2}} + + Basic.publish(meta[:chan], "", meta[:queue], "hola") + refute_receive {:basic_deliver, "hola", %{consumer_tag: ^consumer_tag1}} + + {:ok, ^consumer_tag2} = Basic.cancel(meta[:chan], consumer_tag2) + end + end + + describe "basic_credit_drained" do + test "forwards the message to the end consumer", meta do + {:ok, consumer_tag} = Basic.consume(meta[:chan], meta[:queue]) + + # don't know how we can emit the message from the server so use send/2 + # to emulate the message + msg = basic_credit_drained(consumer_tag: consumer_tag) + + consumer_pid = + meta[:chan].pid + |> :sys.get_state() + |> elem(3) + + send(consumer_pid, msg) + assert_receive {:basic_credit_drained, %{consumer_tag: ^consumer_tag}} + + {:ok, ^consumer_tag} = Basic.cancel(meta[:chan], consumer_tag) + end + end +end