Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rabbitmq additions master #55767

Merged
merged 11 commits into from
Jan 2, 2020
Merged
1 change: 1 addition & 0 deletions doc/ref/states/all/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ state modules
rabbitmq_cluster
rabbitmq_plugin
rabbitmq_policy
rabbitmq_upstream
rabbitmq_user
rabbitmq_vhost
rbac_solaris
Expand Down
6 changes: 6 additions & 0 deletions doc/ref/states/all/salt.states.rabbitmq_upstream.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
=============================
salt.states.rabbitmq_upstream
=============================

.. automodule:: salt.states.rabbitmq_upstream
:members:
267 changes: 244 additions & 23 deletions salt/modules/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ def _safe_output(line):
line.startswith('Listing') and line.endswith('...'),
line.startswith('Listing') and '\t' not in line,
'...done' in line,
line.startswith('WARNING:')
line.startswith('WARNING:'),
len(line) == 0
])


Expand Down Expand Up @@ -256,6 +257,35 @@ def list_vhosts(runas=None):
return _output_to_list(res['stdout'])


def list_upstreams(runas=None):
'''
Returns a dict of upstreams based on rabbitmqctl list_parameters.

:param str runas: The name of the user to run this command as.

CLI Example:

.. code-block:: bash

salt '*' rabbitmq.list_upstreams

.. versionadded:: Neon
'''
if runas is None and not salt.utils.platform.is_windows():
runas = salt.utils.user.get_user()
ret = {}
res = __salt__['cmd.run_all'](
[RABBITMQCTL, 'list_parameters', '-q'],
reset_system_locale=False,
runas=runas,
python_shell=False)
for raw_line in res['stdout'].split('\n'):
if _safe_output(raw_line):
(_, name, definition) = raw_line.split('\t')
ret[name] = definition
return ret


def user_exists(name, runas=None):
'''
Return whether the user exists based on rabbitmqctl list_users.
Expand Down Expand Up @@ -286,6 +316,26 @@ def vhost_exists(name, runas=None):
return name in list_vhosts(runas=runas)


def upstream_exists(name, runas=None):
'''
Return whether the upstreamexists based on rabbitmqctl list_parameters.

:param str name: The name of the upstream to check for.
:param str runas: The name of the user to run the command as.

CLI Example:

.. code-block:: bash

salt '*' rabbitmq.upstream_exists rabbit_upstream

.. versionadded:: Neon
'''
if runas is None and not salt.utils.platform.is_windows():
runas = salt.utils.user.get_user()
return name in list_upstreams(runas=runas)


def add_user(name, password=None, runas=None):
'''
Add a rabbitMQ user via rabbitmqctl user_add <user> <password>
Expand Down Expand Up @@ -442,7 +492,11 @@ def check_password(name, password, runas=None):
runas = salt.utils.user.get_user()

try:
res = __salt__['cmd.run']([RABBITMQCTL, 'status'], reset_system_locale=False, runas=runas, python_shell=False)
res = __salt__['cmd.run'](
[RABBITMQCTL, 'status'],
reset_system_locale=False,
runas=runas,
python_shell=False)
server_version = re.search(r'\{rabbit,"RabbitMQ","(.+)"\}', res)

if server_version is None:
Expand Down Expand Up @@ -484,9 +538,9 @@ def check_password(name, password, runas=None):
return True

cmd = ('rabbit_auth_backend_internal:check_user_login'
'(<<"{0}">>, [{{password, <<"{1}">>}}]).').format(
name.replace('"', '\\"'),
password.replace('"', '\\"'))
'(<<"{0}">>, [{{password, <<"{1}">>}}]).').format(
name.replace('"', '\\"'),
password.replace('"', '\\"'))

res = __salt__['cmd.run_all'](
[RABBITMQCTL, 'eval', cmd],
Expand Down Expand Up @@ -696,7 +750,11 @@ def join_cluster(host, user='rabbit', ram_node=None, runas=None):
if runas is None and not salt.utils.platform.is_windows():
runas = salt.utils.user.get_user()
stop_app(runas)
res = __salt__['cmd.run_all'](cmd, reset_system_locale=False, runas=runas, python_shell=False)
res = __salt__['cmd.run_all'](
cmd,
reset_system_locale=False,
runas=runas,
python_shell=False)
start_app(runas)

return _format_response(res, 'Join')
Expand Down Expand Up @@ -800,7 +858,11 @@ def list_queues(runas=None, *args):
runas = salt.utils.user.get_user()
cmd = [RABBITMQCTL, 'list_queues', '-q']
cmd.extend(args)
res = __salt__['cmd.run_all'](cmd, reset_system_locale=False, runas=runas, python_shell=False)
res = __salt__['cmd.run_all'](
cmd,
reset_system_locale=False,
runas=runas,
python_shell=False)
_check_response(res)
return _output_to_dict(res['stdout'])

Expand All @@ -822,7 +884,11 @@ def list_queues_vhost(vhost, runas=None, *args):
runas = salt.utils.user.get_user()
cmd = [RABBITMQCTL, 'list_queues', '-q', '-p', vhost]
cmd.extend(args)
res = __salt__['cmd.run_all'](cmd, reset_system_locale=False, runas=runas, python_shell=False)
res = __salt__['cmd.run_all'](
cmd,
reset_system_locale=False,
runas=runas,
python_shell=False)
_check_response(res)
return _output_to_dict(res['stdout'])

Expand Down Expand Up @@ -923,7 +989,11 @@ def set_policy(vhost,
if apply_to:
cmd.extend(['--apply-to', apply_to])
cmd.extend([name, pattern, definition])
res = __salt__['cmd.run_all'](cmd, reset_system_locale=False, runas=runas, python_shell=False)
res = __salt__['cmd.run_all'](
cmd,
reset_system_locale=False,
runas=runas,
python_shell=False)
log.debug('Set policy: %s', res['stdout'])
return _format_response(res, 'Set')

Expand Down Expand Up @@ -971,36 +1041,44 @@ def policy_exists(vhost, name, runas=None):

def list_available_plugins(runas=None):
'''
Returns a list of the names of all available plugins (enabled and disabled).
Returns a list of the names of all available plugins (enabled and disabled).

CLI Example:
CLI Example:

.. code-block:: bash
.. code-block:: bash

salt '*' rabbitmq.list_available_plugins
'''
salt '*' rabbitmq.list_available_plugins
'''
if runas is None and not salt.utils.platform.is_windows():
runas = salt.utils.user.get_user()
cmd = [_get_rabbitmq_plugin(), 'list', '-m']
ret = __salt__['cmd.run_all'](cmd, reset_system_locale=False, python_shell=False, runas=runas)
ret = __salt__['cmd.run_all'](
cmd,
reset_system_locale=False,
runas=runas,
python_shell=False)
_check_response(ret)
return _output_to_list(ret['stdout'])


def list_enabled_plugins(runas=None):
'''
Returns a list of the names of the enabled plugins.
Returns a list of the names of the enabled plugins.

CLI Example:
CLI Example:

.. code-block:: bash
.. code-block:: bash

salt '*' rabbitmq.list_enabled_plugins
'''
salt '*' rabbitmq.list_enabled_plugins
'''
if runas is None and not salt.utils.platform.is_windows():
runas = salt.utils.user.get_user()
cmd = [_get_rabbitmq_plugin(), 'list', '-m', '-e']
ret = __salt__['cmd.run_all'](cmd, reset_system_locale=False, python_shell=False, runas=runas)
ret = __salt__['cmd.run_all'](
cmd,
reset_system_locale=False,
runas=runas,
python_shell=False)
_check_response(ret)
return _output_to_list(ret['stdout'])

Expand Down Expand Up @@ -1033,7 +1111,11 @@ def enable_plugin(name, runas=None):
if runas is None and not salt.utils.platform.is_windows():
runas = salt.utils.user.get_user()
cmd = [_get_rabbitmq_plugin(), 'enable', name]
ret = __salt__['cmd.run_all'](cmd, reset_system_locale=False, runas=runas, python_shell=False)
ret = __salt__['cmd.run_all'](
cmd,
reset_system_locale=False,
runas=runas,
python_shell=False)
return _format_response(ret, 'Enabled')


Expand All @@ -1050,5 +1132,144 @@ def disable_plugin(name, runas=None):
if runas is None and not salt.utils.platform.is_windows():
runas = salt.utils.user.get_user()
cmd = [_get_rabbitmq_plugin(), 'disable', name]
ret = __salt__['cmd.run_all'](cmd, reset_system_locale=False, runas=runas, python_shell=False)
ret = __salt__['cmd.run_all'](
cmd,
reset_system_locale=False,
runas=runas,
python_shell=False)
return _format_response(ret, 'Disabled')


def set_upstream(
name,
uri,
prefetch_count=None,
reconnect_delay=None,
ack_mode=None,
trust_user_id=None,
exchange=None,
max_hops=None,
expires=None,
message_ttl=None,
ha_policy=None,
queue=None,
runas=None):
'''
Configures an upstream via rabbitmqctl set_parameter. This can be an exchange-upstream,
a queue-upstream or both.

:param str name: The name of the upstream to configure.

The following parameters apply to federated exchanges and federated queues:

:param str uri: The AMQP URI(s) for the upstream.
:param int prefetch_count: The maximum number of unacknowledged messages copied
over a link at any one time. Default: 1000
:param int reconnect_delay: The duration (in seconds) to wait before reconnecting
to the broker after being disconnected. Default: 1
:param str ack_mode: Determines how the link should acknowledge messages.
If set to ``on-confirm`` (the default), messages are acknowledged to the
upstream broker after they have been confirmed downstream. This handles
network errors and broker failures without losing messages, and is the
slowest option.
If set to ``on-publish``, messages are acknowledged to the upstream broker
after they have been published downstream. This handles network errors
without losing messages, but may lose messages in the event of broker failures.
If set to ``no-ack``, message acknowledgements are not used. This is the
fastest option, but may lose messages in the event of network or broker failures.
:param bool trust_user_id: Determines how federation should interact with the
validated user-id feature. If set to true, federation will pass through
any validated user-id from the upstream, even though it cannot validate
it itself. If set to false or not set, it will clear any validated user-id
it encounters. You should only set this to true if you trust the upstream
server (and by extension, all its upstreams) not to forge user-ids.

The following parameters apply to federated exchanges only:

:param str exchange: The name of the upstream exchange. Default is to use the
same name as the federated exchange.
:param int max_hops: The maximum number of federation links that a message
published to a federated exchange can traverse before it is discarded.
Default is 1. Note that even if max-hops is set to a value greater than 1,
messages will never visit the same node twice due to travelling in a loop.
However, messages may still be duplicated if it is possible for them to
travel from the source to the destination via multiple routes.
:param int expires: The expiry time (in milliseconds) after which an upstream
queue for a federated exchange may be deleted, if a connection to the upstream
broker is lost. The default is 'none', meaning the queue should never expire.
This setting controls how long the upstream queue will last before it is
eligible for deletion if the connection is lost.
This value is used to set the "x-expires" argument for the upstream queue.
:param int message_ttl: The expiry time for messages in the upstream queue
for a federated exchange (see expires), in milliseconds. Default is ``None``,
meaning messages should never expire. This does not apply to federated queues.
This value is used to set the "x-message-ttl" argument for the upstream queue.
:param str ha_policy: Determines the "x-ha-policy" argument for the upstream
queue for a federated exchange (see expires). This is only of interest
when connecting to old brokers which determine queue HA mode using this
argument. Default is ``None``, meaning the queue is not HA.

The following parameter applies to federated queues only:

:param str queue: The name of the upstream queue. Default is to use the same
name as the federated queue.

:param str runas: The name of the user to run the command as.

CLI Example:

.. code-block:: bash

salt '*' rabbitmq.set_upstream upstream_name ack_mode=on-confirm max_hops=1 \
trust_user_id=True uri=amqp://hostname

.. versionadded:: Neon
'''
if runas is None and not salt.utils.platform.is_windows():
runas = salt.utils.user.get_user()
params = salt.utils.data.filter_falsey({
'uri': uri,
'prefetch-count': prefetch_count,
'reconnect-delay': reconnect_delay,
'ack-mode': ack_mode,
'trust-user-id': trust_user_id,
'exchange': exchange,
'max-hops': max_hops,
'expires': expires,
'message-ttl': message_ttl,
'ha-policy': ha_policy,
'queue': queue,
})
res = __salt__['cmd.run_all'](
[RABBITMQCTL, 'set_parameter', 'federation-upstream', name, salt.utils.json.dumps(params)],
reset_system_locale=False,
runas=runas,
python_shell=False)
_check_response(res)
return True


def delete_upstream(name, runas=None):
'''
Deletes an upstream via rabbitmqctl clear_parameter.

:param str name: The name of the upstream to delete.
:param str runas: The name of the user to run the command as.

CLI Example:

.. code-block:: bash

salt '*' rabbitmq.delete_upstream upstream_name

.. versionadded:: Neon
'''
if runas is None and not salt.utils.platform.is_windows():
runas = salt.utils.user.get_user()
res = __salt__['cmd.run_all'](
[RABBITMQCTL, 'clear_parameter', 'federation-upstream', name],
reset_system_locale=False,
runas=runas,
python_shell=False)
_check_response(res)
return True
Loading