Skip to content

Commit

Permalink
ENH: Add support for multiple rabbit hosts in con string
Browse files Browse the repository at this point in the history
Support constructing a connection string which contains multiple host
endpoints. This avoids us going through an LB and matches the approach
used by OpenStack more closely.
  • Loading branch information
DavidFair committed Jul 2, 2024
1 parent d23281b commit ffb52b7
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class _RabbitFields:
environment variables.
"""

rabbit_host: str = field(default_factory=partial(os.getenv, "RABBIT_HOST", None))
rabbit_hosts: str = field(default_factory=partial(os.getenv, "RABBIT_HOST", None))
rabbit_port: str = field(default_factory=partial(os.getenv, "RABBIT_PORT", None))
rabbit_username: str = field(
default_factory=partial(os.getenv, "RABBIT_USERNAME", None)
Expand Down
37 changes: 27 additions & 10 deletions OpenStack-Rabbit-Consumer/rabbit_consumer/message_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,31 @@ def on_message(message: rabbitpy.Message) -> None:
message.ack()


def generate_login_str(config: ConsumerConfig) -> str:
"""
Generates the login string for the rabbit connection.
"""
if not config.rabbit_hosts:
raise ValueError("No rabbit hosts provided")

if not isinstance(config.rabbit_hosts, str):
raise ValueError("Rabbit hosts must be a comma separated string of hosts")

connect_str = "amqp://"

for host in config.rabbit_hosts.split(","):
host = host.strip()
connect_str += f"{config.rabbit_username}:{config.rabbit_password}@{host}:{config.rabbit_port},"

# Trim the trailing comma
connect_str = connect_str[:-1]

sanitised_connect_str = connect_str.replace(config.rabbit_password, "<password>")
logger.debug("Connecting to rabbit with: %s", sanitised_connect_str)

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

This expression logs
sensitive data (password)
as clear text.

return connect_str


def initiate_consumer() -> None:
"""
Initiates the message consumer and starts consuming messages in a loop.
Expand All @@ -263,18 +288,10 @@ def initiate_consumer() -> None:
# Ensure we have valid creds before trying to contact rabbit
verify_kerberos_ticket()

config = ConsumerConfig()

host = config.rabbit_host
port = config.rabbit_port
login_user = config.rabbit_username
login_pass = config.rabbit_password
logger.debug(
"Connecting to rabbit with: amqp://%s:<password>@%s:%s/", login_user, host, port
)
exchanges = ["nova"]

login_str = f"amqp://{login_user}:{login_pass}@{host}:{port}/"
config = ConsumerConfig()
login_str = generate_login_str(config)
with rabbitpy.Connection(login_str) as conn:
with conn.channel() as channel:
logger.debug("Connected to RabbitMQ")
Expand Down
76 changes: 63 additions & 13 deletions OpenStack-Rabbit-Consumer/tests/test_message_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
is_aq_managed_image,
get_aq_build_metadata,
delete_machine,
generate_login_str,
)
from rabbit_consumer.vm_data import VmData

Expand Down Expand Up @@ -101,34 +102,82 @@ def test_on_message_accepts_event_types(message_event_type, consume, event_type)
message.ack.assert_called_once()


# pylint: disable=too-few-public-methods
class MockedConfig(ConsumerConfig):
@pytest.fixture(name="mocked_config")
def mocked_config_fixture() -> ConsumerConfig:
"""
Provides a mocked input config for the consumer
"""
config = ConsumerConfig()

rabbit_host = "rabbit_host"
rabbit_port = 1234
rabbit_username = "rabbit_username"
rabbit_password = "rabbit_password"
# Note: the mismatched spaces are intentional
config.rabbit_hosts = "rabbit_host1, rabbit_host2,rabbit_host3"
config.rabbit_port = 1234
config.rabbit_username = "rabbit_username"
config.rabbit_password = "rabbit_password"
return config


def test_generate_login_str(mocked_config):
"""
Test that the function generates the correct login string
"""
expected = (
"amqp://"
"rabbit_username:rabbit_password@rabbit_host1:1234,"
"rabbit_username:rabbit_password@rabbit_host2:1234,"
"rabbit_username:rabbit_password@rabbit_host3:1234"
)

assert generate_login_str(mocked_config) == expected


def test_generate_login_str_no_hosts(mocked_config):
"""
Test that the function raises if nothing is passed
"""
mocked_config.rabbit_hosts = ""
with pytest.raises(ValueError):
assert generate_login_str(mocked_config)


def test_generate_login_non_str(mocked_config):
"""
Test that the function raises if the input is not a string
"""
mocked_config.rabbit_hosts = 1234
with pytest.raises(ValueError):
assert generate_login_str(mocked_config)


@patch("rabbit_consumer.message_consumer.logger")
def test_password_does_not_get_logged(logging, mocked_config):
"""
Test that the password does not get logged
"""
returned_str = generate_login_str(mocked_config)
logging.debug.assert_called_once()
logging_arg = logging.debug.call_args[0][1]
assert mocked_config.rabbit_password in returned_str

# Check that the password is not in the log message
assert mocked_config.rabbit_username in logging_arg
assert mocked_config.rabbit_password not in logging_arg


@patch("rabbit_consumer.message_consumer.verify_kerberos_ticket")
@patch("rabbit_consumer.message_consumer.generate_login_str")
@patch("rabbit_consumer.message_consumer.rabbitpy")
def test_initiate_consumer_channel_setup(rabbitpy, _):
def test_initiate_consumer_channel_setup(rabbitpy, gen_login, _, mocked_config):
"""
Test that the function sets up the channel and queue correctly
"""
mocked_config = MockedConfig()

with patch("rabbit_consumer.message_consumer.ConsumerConfig") as config:
config.return_value = mocked_config
initiate_consumer()

rabbitpy.Connection.assert_called_once_with(
f"amqp://{mocked_config.rabbit_username}:{mocked_config.rabbit_password}@{mocked_config.rabbit_host}:{mocked_config.rabbit_port}/"
)
gen_login.assert_called_once_with(mocked_config)

rabbitpy.Connection.assert_called_once_with(gen_login.return_value)
connection = rabbitpy.Connection.return_value.__enter__.return_value
connection.channel.assert_called_once()
channel = connection.channel.return_value.__enter__.return_value
Expand All @@ -149,7 +198,8 @@ def test_initiate_consumer_actual_consumption(rabbitpy, message_mock, _):
# We need our mocked queue to act like a generator
rabbitpy.Queue.return_value.__iter__.return_value = queue_messages

initiate_consumer()
with patch("rabbit_consumer.message_consumer.generate_login_str"):
initiate_consumer()

message_mock.assert_has_calls([call(message) for message in queue_messages])

Expand Down
2 changes: 1 addition & 1 deletion OpenStack-Rabbit-Consumer/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.3.7
3.0.0

0 comments on commit ffb52b7

Please sign in to comment.