Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Take connection_name from options parameter and make it a new standard #167

Merged
merged 6 commits into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 84 additions & 79 deletions lib/amqp/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,23 @@ defmodule AMQP.Connection do
@type t :: %Connection{pid: pid}

@doc """
Opens a new connection. `connection_name` can be passed in as an option when given a keyword list.
Opens a new connection.

Behaves exactly like `open(options_or_uri, :undefined)`. See `open/2`.
"""
@spec open(keyword | String.t()) :: {:ok, t()} | {:error, atom()} | {:error, any()}
def open(options \\ [])
Behaves like `open/2` but takes only either AMQP URI or options.

def open(uri) when is_binary(uri) do
open(uri, :undefined)
end
## Examples

def open(options) when is_list(options) do
options
|> Keyword.get_and_update(:connection_name, fn _ -> :pop end)
|> case do
{nil, options} ->
open(options, :undefined)
iex> options = [host: "localhost", port: 5672, virtual_host: "/", username: "guest", password: "guest"]
iex> AMQP.Connection.open(options)
{:ok, %AMQP.Connection{}}

{connection_name, options} ->
open(options, connection_name)
end
iex> AMQP.Connection.open("amqp://guest:guest@localhost")
{:ok, %AMQP.Connection{}}

"""
@spec open(keyword | String.t()) :: {:ok, t()} | {:error, atom()} | {:error, any()}
def open(uri_or_options \\ []) when is_binary(uri_or_options) or is_list(uri_or_options) do
open(uri_or_options, :undefined)
end

@doc """
Expand All @@ -42,13 +38,6 @@ defmodule AMQP.Connection do
case of a failure. If you need robust connections and channels, use monitors on the returned
connection PID.

This function can be called in three ways:

* with a list of options and a name
* with an AMQP URI and a name
* with an AMQP URI and a list of options (in this case, the options are merged with
the AMQP URI, taking precedence on same keys)

## Options

* `:username` - The name of a user registered with the broker (defaults to `"guest"`);
Expand All @@ -61,100 +50,123 @@ defmodule AMQP.Connection do
* `:heartbeat` - The hearbeat interval in seconds (defaults to `10`);
* `:connection_timeout` - The connection timeout in milliseconds (defaults to `50000`);
* `:ssl_options` - Enable SSL by setting the location to cert files (defaults to `:none`);
* `:client_properties` - A list of extra client properties to be sent to the server, defaults to `[]`;
* `:client_properties` - A list of extra client properties to be sent to the server (defaults to `[])`;
* `:socket_options` - Extra socket options. These are appended to the default options. \
See http://www.erlang.org/doc/man/inet.html#setopts-2 and http://www.erlang.org/doc/man/gen_tcp.html#connect-4 \
for descriptions of the available options.
* `:auth_mechanisms` - A list of authentication of SASL authentication mechanisms to use.
See https://www.rabbitmq.com/access-control.html#mechanisms and https://github.com/rabbitmq/rabbitmq-auth-mechanism-ssl
for descriptions of the available options.
for descriptions of the available options;
* `:auth_mechanisms` - A list of authentication of SASL authentication mechanisms to use. \
See https://www.rabbitmq.com/access-control.html#mechanisms and https://github.com/rabbitmq/rabbitmq-auth-mechanism-ssl \
for descriptions of the available options;
* `:name` - A human-readable string that will be displayed in the management UI. \
Connection names do not have to be unique and cannot be used as connection identifiers \
(defaults to `:undefined`).

## Examples

iex> options = [host: "localhost", port: 5672, virtual_host: "/", username: "guest", password: "guest", name: "my-conn"]
iex> AMQP.Connection.open(options)
{:ok, %AMQP.Connection{}}

iex> AMQP.Connection.open("amqp://guest:guest@localhost", port: 5673)
{:ok, %AMQP.Connection{}}

## Enabling SSL

To enable SSL, supply the following in the `ssl_options` field:

* `cacertfile` - Specifies the certificates of the root Certificate Authorities that we wish to implicitly trust;
* `certfile` - The client's own certificate in PEM format;
* `keyfile` - The client's private key in PEM format;
* `:cacertfile` - Specifies the certificates of the root Certificate Authorities that we wish to implicitly trust;
* `:certfile` - The client's own certificate in PEM format;
* `:keyfile` - The client's private key in PEM format.

### Example
Here is an example:

```
AMQP.Connection.open port: 5671,
ssl_options: [cacertfile: '/path/to/testca/cacert.pem',
certfile: '/path/to/client/cert.pem',
keyfile: '/path/to/client/key.pem',
# only necessary with intermediate CAs
# depth: 2,
verify: :verify_peer,
fail_if_no_peer_cert: true]
```
iex> AMQP.Connection.open(
port: 5671,
ssl_options: [
cacertfile: '/path/to/testca/cacert.pem',
certfile: '/path/to/client/cert.pem',
keyfile: '/path/to/client/key.pem',
# only necessary with intermediate CAs
# depth: 2,
verify: :verify_peer,
fail_if_no_peer_cert: true
]
)

## Connection name
## Backward compatibility for connection name

RabbitMQ supports user-specified connection names since version 3.6.2.

Connection names are human-readable strings that will be displayed in the management UI.
Connection names do not have to be unique and cannot be used as connection identifiers.

If the name is `:undefined`, the connection is not registered with any name.

## Examples
Previously AMQP took a connection name as a separate parameter on `open/2` and `open/3` and it is still supported in this version.

iex> options = [host: "localhost", port: 5672, virtual_host: "/", username: "guest", password: "guest"]
iex> AMQP.Connection.open(options, :undefined)
{:ok, %AMQP.Connection{}}

iex> AMQP.Connection.open("amqp://guest:guest@localhost", port: 5673)
iex> AMQP.Connection.open("amqp://guest:guest@localhost", "my-connection")
{:ok, %AMQP.Connection{}}

iex> AMQP.Connection.open("amqp://guest:guest@localhost", "a-connection-with-a-name")
iex> AMQP.Connection.open("amqp://guest:guest@localhost", "my-connection", options)
{:ok, %AMQP.Connection{}}

However the connection name parameter is now deprecated and might not be supported in the future versions.
You are recommented to pass it with `:name` option instead:

iex> AMQP.Connection.open("amqp://guest:guest@localhost", name: "my-connection")
{:ok, %AMQP.Connection{}}
"""
@spec open(keyword | String.t(), String.t() | :undefined | keyword) ::
@spec open(String.t() | keyword, keyword | String.t() | :undefined) ::
{:ok, t()} | {:error, atom()} | {:error, any()}
def open(options_or_uri, options_or_name)
def open(uri, options)

def open(uri, name) when is_binary(uri) and (is_binary(name) or name == :undefined) do
open(uri, name, _options = [])
do_open(uri, name, _options = [])
end

def open(options, name) when is_list(options) and (is_binary(name) or name == :undefined) do
{name_from_opts, options} = take_connection_name(options)
name = if name == :undefined, do: name_from_opts, else: name

options
|> merge_options_to_default()
|> do_open(name)
end

def open(uri, options) when is_binary(uri) and is_list(options) do
open(uri, :undefined, options)
end
{name, options} = take_connection_name(options)

@doc """
Opens a new connection with a name, merging the given AMQP URI and options.

This function opens a new connection by merging the given AMQP URI `uri` and
the given `options` and assigns it the name `name`.

The options in `options` take precedence over the values in `uri`.

See `open/2` for options.

## Examples

iex> AMQP.Connection.open("amqp://guest:guest@localhost", "a-connection-name", port: 5673)
{:ok, %AMQP.Connection{}}
do_open(uri, name, options)
end

"""
@doc false
ono marked this conversation as resolved.
Show resolved Hide resolved
@deprecated "Use :name in open/2 instead"
@spec open(String.t(), String.t() | :undefined, keyword) ::
{:ok, t()} | {:error, atom()} | {:error, any()}
def open(uri, name, options) when is_binary(uri) and is_list(options) do
do_open(uri, name, options)
end

defp do_open(uri, name, options) do
case uri |> String.to_charlist() |> :amqp_uri.parse() do
{:ok, amqp_params} -> amqp_params |> merge_options_to_amqp_params(options) |> do_open(name)
error -> error
end
end

defp do_open(amqp_params, name) do
case :amqp_connection.start(amqp_params, name) do
{:ok, pid} -> {:ok, %Connection{pid: pid}}
error -> error
end
end

# take name from options
defp take_connection_name(options) do
name = options[:name] || :undefined
options = Keyword.delete(options, :name)
{name, options}
end

@doc false
@spec merge_options_to_amqp_params(tuple, keyword) :: tuple
def merge_options_to_amqp_params(amqp_params, options) do
Expand Down Expand Up @@ -220,13 +232,6 @@ defmodule AMQP.Connection do
end
end

defp do_open(amqp_params, name) do
case :amqp_connection.start(amqp_params, name) do
{:ok, pid} -> {:ok, %Connection{pid: pid}}
error -> error
end
end

defp normalize_ssl_options(options) when is_list(options) do
for {k, v} <- options do
if k in [:cacertfile, :certfile, :keyfile] do
Expand Down
33 changes: 23 additions & 10 deletions test/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,35 @@ defmodule ConnectionTest do
assert :ok = Connection.close(conn)
end

test "open connection with uri, name, and options" do
test "open connection with uri, port as an integer, and options " do
assert {:ok, conn} =
Connection.open("amqp://nonexistent:5672", "my-connection", host: 'localhost')
Connection.open("amqp://nonexistent",
host: 'localhost',
port: 5672
)

assert :ok = Connection.close(conn)
end

test "open connection with uri, name, port as an integer, and options " do
test "open connection with uri, port as a string, and options" do
assert {:ok, conn} =
Connection.open("amqp://nonexistent", "my-connection",
Connection.open("amqp://nonexistent",
host: 'localhost',
port: 5672
port: "5672"
)

assert :ok = Connection.close(conn)
end

test "open connection with uri, name, port as a string, and options " do
test "open connection with name in options" do
assert {:ok, conn} = Connection.open("amqp://localhost", name: "my-connection")
assert get_connection_name(conn) == "my-connection"
assert :ok = Connection.close(conn)
end

test "open connection with uri, name, and options (deprected but still spported)" do
assert {:ok, conn} =
Connection.open("amqp://nonexistent", "my-connection",
host: 'localhost',
port: "5672"
)
Connection.open("amqp://nonexistent:5672", "my-connection", host: 'localhost')

assert :ok = Connection.close(conn)
end
Expand All @@ -70,4 +76,11 @@ defmodule ConnectionTest do
assert params[:password] == "bar"
assert params[:host] == 'amqp.test.com'
end

defp get_connection_name(conn) do
params = :amqp_connection.info(conn.pid, [:amqp_params])[:amqp_params]
amqp_params_network(client_properties: props) = params
{_, _, name} = Enum.find(props, fn {key, _type, _value} -> key == "connection_name" end)
name
end
end