Skip to content

Commit

Permalink
Merge pull request #167 from pma/feature/open-with-connection-name
Browse files Browse the repository at this point in the history
Take connection_name from options parameter and make it a new standard
  • Loading branch information
ono authored Jun 19, 2020
2 parents cc6491d + 6d0c606 commit 824ec4d
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 89 deletions.
163 changes: 84 additions & 79 deletions lib/amqp/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,23 @@ defmodule AMQP.Connection do
@type t :: %Connection{pid: pid}

@doc """
Opens a new connection. `connection_name` can be passed in as an option when given a keyword list.
Opens a new connection.
Behaves exactly like `open(options_or_uri, :undefined)`. See `open/2`.
"""
@spec open(keyword | String.t()) :: {:ok, t()} | {:error, atom()} | {:error, any()}
def open(options \\ [])
Behaves like `open/2` but takes only either AMQP URI or options.
def open(uri) when is_binary(uri) do
open(uri, :undefined)
end
## Examples
def open(options) when is_list(options) do
options
|> Keyword.get_and_update(:connection_name, fn _ -> :pop end)
|> case do
{nil, options} ->
open(options, :undefined)
iex> options = [host: "localhost", port: 5672, virtual_host: "/", username: "guest", password: "guest"]
iex> AMQP.Connection.open(options)
{:ok, %AMQP.Connection{}}
{connection_name, options} ->
open(options, connection_name)
end
iex> AMQP.Connection.open("amqp://guest:guest@localhost")
{:ok, %AMQP.Connection{}}
"""
@spec open(keyword | String.t()) :: {:ok, t()} | {:error, atom()} | {:error, any()}
def open(uri_or_options \\ []) when is_binary(uri_or_options) or is_list(uri_or_options) do
open(uri_or_options, :undefined)
end

@doc """
Expand All @@ -42,13 +38,6 @@ defmodule AMQP.Connection do
case of a failure. If you need robust connections and channels, use monitors on the returned
connection PID.
This function can be called in three ways:
* with a list of options and a name
* with an AMQP URI and a name
* with an AMQP URI and a list of options (in this case, the options are merged with
the AMQP URI, taking precedence on same keys)
## Options
* `:username` - The name of a user registered with the broker (defaults to `"guest"`);
Expand All @@ -61,100 +50,123 @@ defmodule AMQP.Connection do
* `:heartbeat` - The hearbeat interval in seconds (defaults to `10`);
* `:connection_timeout` - The connection timeout in milliseconds (defaults to `50000`);
* `:ssl_options` - Enable SSL by setting the location to cert files (defaults to `:none`);
* `:client_properties` - A list of extra client properties to be sent to the server, defaults to `[]`;
* `:client_properties` - A list of extra client properties to be sent to the server (defaults to `[])`;
* `:socket_options` - Extra socket options. These are appended to the default options. \
See http://www.erlang.org/doc/man/inet.html#setopts-2 and http://www.erlang.org/doc/man/gen_tcp.html#connect-4 \
for descriptions of the available options.
* `:auth_mechanisms` - A list of authentication of SASL authentication mechanisms to use.
See https://www.rabbitmq.com/access-control.html#mechanisms and https://github.com/rabbitmq/rabbitmq-auth-mechanism-ssl
for descriptions of the available options.
for descriptions of the available options;
* `:auth_mechanisms` - A list of authentication of SASL authentication mechanisms to use. \
See https://www.rabbitmq.com/access-control.html#mechanisms and https://github.com/rabbitmq/rabbitmq-auth-mechanism-ssl \
for descriptions of the available options;
* `:name` - A human-readable string that will be displayed in the management UI. \
Connection names do not have to be unique and cannot be used as connection identifiers \
(defaults to `:undefined`).
## Examples
iex> options = [host: "localhost", port: 5672, virtual_host: "/", username: "guest", password: "guest", name: "my-conn"]
iex> AMQP.Connection.open(options)
{:ok, %AMQP.Connection{}}
iex> AMQP.Connection.open("amqp://guest:guest@localhost", port: 5673)
{:ok, %AMQP.Connection{}}
## Enabling SSL
To enable SSL, supply the following in the `ssl_options` field:
* `cacertfile` - Specifies the certificates of the root Certificate Authorities that we wish to implicitly trust;
* `certfile` - The client's own certificate in PEM format;
* `keyfile` - The client's private key in PEM format;
* `:cacertfile` - Specifies the certificates of the root Certificate Authorities that we wish to implicitly trust;
* `:certfile` - The client's own certificate in PEM format;
* `:keyfile` - The client's private key in PEM format.
### Example
Here is an example:
```
AMQP.Connection.open port: 5671,
ssl_options: [cacertfile: '/path/to/testca/cacert.pem',
certfile: '/path/to/client/cert.pem',
keyfile: '/path/to/client/key.pem',
# only necessary with intermediate CAs
# depth: 2,
verify: :verify_peer,
fail_if_no_peer_cert: true]
```
iex> AMQP.Connection.open(
port: 5671,
ssl_options: [
cacertfile: '/path/to/testca/cacert.pem',
certfile: '/path/to/client/cert.pem',
keyfile: '/path/to/client/key.pem',
# only necessary with intermediate CAs
# depth: 2,
verify: :verify_peer,
fail_if_no_peer_cert: true
]
)
## Connection name
## Backward compatibility for connection name
RabbitMQ supports user-specified connection names since version 3.6.2.
Connection names are human-readable strings that will be displayed in the management UI.
Connection names do not have to be unique and cannot be used as connection identifiers.
If the name is `:undefined`, the connection is not registered with any name.
## Examples
Previously AMQP took a connection name as a separate parameter on `open/2` and `open/3` and it is still supported in this version.
iex> options = [host: "localhost", port: 5672, virtual_host: "/", username: "guest", password: "guest"]
iex> AMQP.Connection.open(options, :undefined)
{:ok, %AMQP.Connection{}}
iex> AMQP.Connection.open("amqp://guest:guest@localhost", port: 5673)
iex> AMQP.Connection.open("amqp://guest:guest@localhost", "my-connection")
{:ok, %AMQP.Connection{}}
iex> AMQP.Connection.open("amqp://guest:guest@localhost", "a-connection-with-a-name")
iex> AMQP.Connection.open("amqp://guest:guest@localhost", "my-connection", options)
{:ok, %AMQP.Connection{}}
However the connection name parameter is now deprecated and might not be supported in the future versions.
You are recommented to pass it with `:name` option instead:
iex> AMQP.Connection.open("amqp://guest:guest@localhost", name: "my-connection")
{:ok, %AMQP.Connection{}}
"""
@spec open(keyword | String.t(), String.t() | :undefined | keyword) ::
@spec open(String.t() | keyword, keyword | String.t() | :undefined) ::
{:ok, t()} | {:error, atom()} | {:error, any()}
def open(options_or_uri, options_or_name)
def open(uri, options)

def open(uri, name) when is_binary(uri) and (is_binary(name) or name == :undefined) do
open(uri, name, _options = [])
do_open(uri, name, _options = [])
end

def open(options, name) when is_list(options) and (is_binary(name) or name == :undefined) do
{name_from_opts, options} = take_connection_name(options)
name = if name == :undefined, do: name_from_opts, else: name

options
|> merge_options_to_default()
|> do_open(name)
end

def open(uri, options) when is_binary(uri) and is_list(options) do
open(uri, :undefined, options)
end
{name, options} = take_connection_name(options)

@doc """
Opens a new connection with a name, merging the given AMQP URI and options.
This function opens a new connection by merging the given AMQP URI `uri` and
the given `options` and assigns it the name `name`.
The options in `options` take precedence over the values in `uri`.
See `open/2` for options.
## Examples
iex> AMQP.Connection.open("amqp://guest:guest@localhost", "a-connection-name", port: 5673)
{:ok, %AMQP.Connection{}}
do_open(uri, name, options)
end

"""
@doc false
@deprecated "Use :name in open/2 instead"
@spec open(String.t(), String.t() | :undefined, keyword) ::
{:ok, t()} | {:error, atom()} | {:error, any()}
def open(uri, name, options) when is_binary(uri) and is_list(options) do
do_open(uri, name, options)
end

defp do_open(uri, name, options) do
case uri |> String.to_charlist() |> :amqp_uri.parse() do
{:ok, amqp_params} -> amqp_params |> merge_options_to_amqp_params(options) |> do_open(name)
error -> error
end
end

defp do_open(amqp_params, name) do
case :amqp_connection.start(amqp_params, name) do
{:ok, pid} -> {:ok, %Connection{pid: pid}}
error -> error
end
end

# take name from options
defp take_connection_name(options) do
name = options[:name] || :undefined
options = Keyword.delete(options, :name)
{name, options}
end

@doc false
@spec merge_options_to_amqp_params(tuple, keyword) :: tuple
def merge_options_to_amqp_params(amqp_params, options) do
Expand Down Expand Up @@ -220,13 +232,6 @@ defmodule AMQP.Connection do
end
end

defp do_open(amqp_params, name) do
case :amqp_connection.start(amqp_params, name) do
{:ok, pid} -> {:ok, %Connection{pid: pid}}
error -> error
end
end

defp normalize_ssl_options(options) when is_list(options) do
for {k, v} <- options do
if k in [:cacertfile, :certfile, :keyfile] do
Expand Down
33 changes: 23 additions & 10 deletions test/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,35 @@ defmodule ConnectionTest do
assert :ok = Connection.close(conn)
end

test "open connection with uri, name, and options" do
test "open connection with uri, port as an integer, and options " do
assert {:ok, conn} =
Connection.open("amqp://nonexistent:5672", "my-connection", host: 'localhost')
Connection.open("amqp://nonexistent",
host: 'localhost',
port: 5672
)

assert :ok = Connection.close(conn)
end

test "open connection with uri, name, port as an integer, and options " do
test "open connection with uri, port as a string, and options" do
assert {:ok, conn} =
Connection.open("amqp://nonexistent", "my-connection",
Connection.open("amqp://nonexistent",
host: 'localhost',
port: 5672
port: "5672"
)

assert :ok = Connection.close(conn)
end

test "open connection with uri, name, port as a string, and options " do
test "open connection with name in options" do
assert {:ok, conn} = Connection.open("amqp://localhost", name: "my-connection")
assert get_connection_name(conn) == "my-connection"
assert :ok = Connection.close(conn)
end

test "open connection with uri, name, and options (deprected but still spported)" do
assert {:ok, conn} =
Connection.open("amqp://nonexistent", "my-connection",
host: 'localhost',
port: "5672"
)
Connection.open("amqp://nonexistent:5672", "my-connection", host: 'localhost')

assert :ok = Connection.close(conn)
end
Expand All @@ -70,4 +76,11 @@ defmodule ConnectionTest do
assert params[:password] == "bar"
assert params[:host] == 'amqp.test.com'
end

defp get_connection_name(conn) do
params = :amqp_connection.info(conn.pid, [:amqp_params])[:amqp_params]
amqp_params_network(client_properties: props) = params
{_, _, name} = Enum.find(props, fn {key, _type, _value} -> key == "connection_name" end)
name
end
end

0 comments on commit 824ec4d

Please sign in to comment.