Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version 2.0.0 #177

Merged
merged 34 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
381c909
coverage tool
ono Dec 12, 2020
784223b
Version 2.0.0-pre.0
ono Dec 12, 2020
82d656c
mix format
ono Dec 24, 2020
35a94b1
Don't cache deps
ono Dec 24, 2020
4ca7593
Port :amqp_client's :amqp_selective_consumer to Elixir
ono Dec 9, 2020
f0ccd4c
mix format
ono Dec 12, 2020
584bac8
Support nowait option
ono Dec 22, 2020
b2d8ebe
Support multiple tags
ono Dec 23, 2020
389cf62
Disable delivery_ctx support
ono Dec 23, 2020
75be928
Support credit_drained
ono Dec 23, 2020
818f45b
Support return/confirm via consumer
ono Dec 23, 2020
1499b77
Bye bye receiver
ono Dec 23, 2020
b7a7a6c
Refactor consumers
ono Dec 23, 2020
5b05b5d
mix format
ono Dec 23, 2020
477da36
Add moduledoc
ono Dec 24, 2020
20cdc31
Handle nowait properly
ono Dec 24, 2020
5d816e2
Doesn't check the reason (it doesn't seem to be consistent)
ono Dec 24, 2020
a180b41
Merge pull request #176 from pma/feature/selective-consumer
ono Dec 24, 2020
fa13360
Accept integer for expiration and DateTime for timestamp
ono Dec 27, 2020
280d846
Merge pull request #178 from pma/feature/convert-option-type
ono Dec 27, 2020
9b3a4ca
Disable Erlang library's pregress report by default
ono Dec 28, 2020
e9ca8db
Merge pull request #179 from pma/feature/disable-progress-report
ono Dec 28, 2020
78e79fa
Provide a function to re-enable the report
ono Dec 29, 2020
b82ffa2
Create a connection from config
ono Dec 29, 2020
1b31d81
Channel management
ono Dec 29, 2020
73bd733
Test Application.Connection
ono Jan 3, 2021
6cf8f4b
Test channel
ono Jan 3, 2021
2355bfd
Access via AMQP.Application
ono Jan 3, 2021
c8789fe
Merge pull request #180 from pma/feature/rich-connection
ono Jan 3, 2021
375cdf7
v2.0.0-rc.1
ono Jan 3, 2021
a57645d
Unsupport Connection.open/3
ono Jan 9, 2021
830f565
Merge pull request #181 from pma/unsupport-open-3
ono Jan 9, 2021
c22c793
Update README
ono Jan 12, 2021
e3feddd
Tweak README
ono Jan 12, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions .github/workflows/elixir.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@ jobs:
otp-version: 23.2
elixir-version: 1.11.2

- name: Restore dependencies cache
uses: actions/cache@v2
with:
path: deps
key: ${{ runner.os }}-mix-${{ hashFiles('**/mix.lock') }}
restore-keys: ${{ runner.os }}-mix-

- name: Install dependencies
run: mix deps.get

Expand Down Expand Up @@ -60,13 +53,6 @@ jobs:
otp-version: ${{matrix.otp}}
elixir-version: ${{matrix.elixir}}

- name: Restore dependencies cache
uses: actions/cache@v2
with:
path: deps
key: ${{ runner.os }}-mix-${{ hashFiles('**/mix.lock') }}
restore-keys: ${{ runner.os }}-mix-

- name: Install dependencies
run: mix deps.get

Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ erl_crash.dump
*.ez
/log
/doc
/cover
141 changes: 46 additions & 95 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ Simple Elixir wrapper for the Erlang RabbitMQ client.

The API is based on Langohr, a Clojure client for RabbitMQ.

## Migration from 0.X to 1.X
## Upgrading guides

If you use amqp 0.X and plan to migrate to 1.0 please read our [migration guide](https://github.com/pma/amqp/wiki/Upgrade-from-0.X-to-1.0).
If you use old version and plan to migrate to 1.0 please read our upgrade guides:

* [0.x to 1.0](https://github.com/pma/amqp/wiki/Upgrade-from-0.X-to-1.0)
* [1.x to 2.0](https://github.com/pma/amqp/wiki/2.0-Release-Notes#breaking-changes-and-upgrade-guide)

## Usage

Expand All @@ -19,7 +22,7 @@ Add AMQP as a dependency in your `mix.exs` file.
```elixir
def deps do
[
{:amqp, "~> 1.6.0"}
{:amqp, "~> 2.0.0-rc.1"}
]
end
```
Expand Down Expand Up @@ -167,75 +170,49 @@ Error converting Hello, World! to integer
Error converting Hello, World! to integer
```

## Stable RabbitMQ Connection

While the above example works, it does nothing to handle RabbitMQ connection
outages. In case of an outage your Genserver will remain stale and won't
receive any messages from the broker as the connection is never restarted.
### Configuration

Luckily, implementing a reconnection logic is quite straight forward. Since the
connection record holds the pid of the connection itself, we can monitor it
and get a notification when it goes down.
#### Erlang library's progress report

Example implementation:
This library uses an official Erlang RabbitMQ client library internally and we found its logging is too verbose.
These are called progress reports by the Erlang library and you would see a lot of entries with info log level if you
use 1.x version.
AMQP disables that by default from version 2.0.
If you want to see more detailed logs, you can enable it by adding the following line on your config.

```elixir
defmodule MyApp.AMQP do
use GenServer
require Logger
alias AMQP.Connection

@host "amqp://localhost"
@reconnect_interval 10_000
config :amqp, enable_progress_report: true
```

def start_link(opts \\ [name: __MODULE__]) do
GenServer.start_link(__MODULE__, nil, opts)
end
#### Connections and channels

def init(_) do
send(self(), :connect)
{:ok, nil}
end
You can define a connection and channel in your config and AMQP will automatically...

def get_connection do
case GenServer.call(__MODULE__, :get) do
nil -> {:error, :not_connected}
conn -> {:ok, conn}
end
end
* Open the connection and channel at the start of the application
* Automatically try to reconnect if they are disconnected

def handle_call(:get, _, conn) do
{:reply, conn, conn}
end
```elixir
config :amqp,
connections: [
myconn: [url: "amqp://guest:guest@myhost:12345"],
],
channels: [
mychan: [connection: :myconn]
]
```

def handle_info(:connect, conn) do
case Connection.open(@host) do
{:ok, conn} ->
# Get notifications when the connection goes down
Process.monitor(conn.pid)
{:noreply, conn}

{:error, _} ->
Logger.error("Failed to connect #{@host}. Reconnecting later...")
# Retry later
Process.send_after(self(), :connect, @reconnect_interval)
{:noreply, nil}
end
end
You can access the connection/channel via `AMQP.Application`.

def handle_info({:DOWN, _, :process, _pid, reason}, _) do
# Stop GenServer. Will be restarted by Supervisor.
{:stop, {:connection_lost, reason}, nil}
end
end
```elixir
iex> {:ok, chan} = AMQP.Application.get_channel(:mychan)
iex> :ok = AMQP.Basic.publish(chan, "", "", "Hello")
```

Now, when the server starts, it will try to reconnect indefinitely until it succeeds.
When the connection drops or the server is down, the GenServer will stop.
If you have put the GenServer module to your application tree, the Supervisor will automatically restart it.
Then it will try to reconnect indefinitely until it succeeds.
When a channel is down and reconnected, you have to make sure your consumer subscribes to a channel again.

## Types of arguments and headers
See the documentation for `AMQP.Application.get_connection/1` and `AMQP.Application.get_channel/1` for more details.

### Types of arguments and headers

The parameter `arguments` in `Queue.declare`, `Exchange.declare`, `Basic.consume` and the parameter `headers` in `Basic.publish` are a list of tuples in the form `{name, type, value}`, where `name` is a binary containing the argument/header name, `type` is an atom describing the AMQP field type and `value` a term compatible with the AMQP field type.

Expand Down Expand Up @@ -263,35 +240,21 @@ Valid argument names in `Exchange.declare` include:

## Troubleshooting / FAQ

#### Connections and Channels

If this is your first time using RabbitMQ, we recommend you to start designing your application like this way:

- Open and manage a single connection for an application
- Open/close a channel per process (don't share a channel between multiple processes)

Once you saw things in action you can now consider optimising the performance by increasing number of connections etc.

Note it's completely safe to share a single connection between multiple processes.
However it is not recommended to share a channel between multiple processes.
It's technically possible but you want to understand the implications when you do.

Make sure you close the channel after used to avoid any potential memory leaks and warnings from RabbitMQ client library.

#### Consumer stops receiving messages

It usually happens when your code doesn't send acknowledgement(ack, nack or reject) after receiving a message.
You want to investigate if...

- an exception was raised and how it would be handled
- :exit signal was thrown and how it would be handled
- a message processing took long time.
If you use GenServer for your consumer, try storing the number of messages the server is currently processing to the GenServer state.
If the number equals `prefetch_count`, those messages were left without acknowledgements and that's why the consumer has stopped receiving more messages.

Also review the following points:

If you use GenServer in consumer, try storing number of messages the server is
currently processing to the GenServer state.
If the number equals `prefetch_count`, those messages were left without
acknowledgements and that's why consumer have stopped receiving more
messages.
- when an exception was raised how it would be handled
- when :exit signal was thrown how it would be handled
- when a message processing took long time what could happen

Also make sure that the consumer monitors the channel pid.
When the channel is gone, you have to reopen it and subscribe to a new channel again.

#### The version compatibiliy

Expand All @@ -312,18 +275,6 @@ Try the following configuration.
config :logger, handle_otp_reports: false
```

Or try filtering out the messages at your application start:

```elixir
:logger.add_primary_filter(
:ignore_rabbitmq_progress_reports,
{&:logger_filters.domain/2, {:stop, :equal, [:progress]}}
)
```

See [this comment](https://github.com/pma/amqp/issues/110#issuecomment-442761299) for the
details.

#### Lager conflicts with Elixir logger

Lager is used by rabbit_common and it is not Elixir's best friend yet.
Expand Down
Loading