Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add support for multiple rabbit hosts in con string #151

Merged
merged 4 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class _RabbitFields:
environment variables.
"""

rabbit_host: str = field(default_factory=partial(os.getenv, "RABBIT_HOST", None))
rabbit_hosts: str = field(default_factory=partial(os.getenv, "RABBIT_HOST", None))
rabbit_port: str = field(default_factory=partial(os.getenv, "RABBIT_PORT", None))
rabbit_username: str = field(
default_factory=partial(os.getenv, "RABBIT_USERNAME", None)
Expand Down
37 changes: 27 additions & 10 deletions OpenStack-Rabbit-Consumer/rabbit_consumer/message_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,31 @@ def on_message(message: rabbitpy.Message) -> None:
message.ack()


def generate_login_str(config: ConsumerConfig) -> str:
"""
Generates the login string for the rabbit connection.
"""
if not config.rabbit_hosts:
raise ValueError("No rabbit hosts provided")

if not isinstance(config.rabbit_hosts, str):
raise ValueError("Rabbit hosts must be a comma separated string of hosts")

connect_str = "amqp://"

for host in config.rabbit_hosts.split(","):
host = host.strip()
connect_str += f"{config.rabbit_username}:{config.rabbit_password}@{host}:{config.rabbit_port},"

# Trim the trailing comma
connect_str = connect_str[:-1]

sanitised_connect_str = connect_str.replace(config.rabbit_password, "<password>")
logger.debug("Connecting to rabbit with: %s", sanitised_connect_str)
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show resolved Hide resolved
DavidFair marked this conversation as resolved.
Show resolved Hide resolved

return connect_str


def initiate_consumer() -> None:
"""
Initiates the message consumer and starts consuming messages in a loop.
Expand All @@ -263,18 +288,10 @@ def initiate_consumer() -> None:
# Ensure we have valid creds before trying to contact rabbit
verify_kerberos_ticket()

config = ConsumerConfig()

host = config.rabbit_host
port = config.rabbit_port
login_user = config.rabbit_username
login_pass = config.rabbit_password
logger.debug(
"Connecting to rabbit with: amqp://%s:<password>@%s:%s/", login_user, host, port
)
exchanges = ["nova"]

login_str = f"amqp://{login_user}:{login_pass}@{host}:{port}/"
config = ConsumerConfig()
login_str = generate_login_str(config)
with rabbitpy.Connection(login_str) as conn:
with conn.channel() as channel:
logger.debug("Connected to RabbitMQ")
Expand Down
2 changes: 1 addition & 1 deletion OpenStack-Rabbit-Consumer/tests/test_consumer_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
]

RABBIT_FIELDS = [
("rabbit_host", "RABBIT_HOST"),
("rabbit_hosts", "RABBIT_HOST"),
("rabbit_port", "RABBIT_PORT"),
("rabbit_username", "RABBIT_USERNAME"),
("rabbit_password", "RABBIT_PASSWORD"),
Expand Down
76 changes: 63 additions & 13 deletions OpenStack-Rabbit-Consumer/tests/test_message_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
is_aq_managed_image,
get_aq_build_metadata,
delete_machine,
generate_login_str,
)
from rabbit_consumer.vm_data import VmData

Expand Down Expand Up @@ -101,34 +102,82 @@ def test_on_message_accepts_event_types(message_event_type, consume, event_type)
message.ack.assert_called_once()


# pylint: disable=too-few-public-methods
class MockedConfig(ConsumerConfig):
@pytest.fixture(name="mocked_config")
def mocked_config_fixture() -> ConsumerConfig:
"""
Provides a mocked input config for the consumer
"""
config = ConsumerConfig()

rabbit_host = "rabbit_host"
rabbit_port = 1234
rabbit_username = "rabbit_username"
rabbit_password = "rabbit_password"
# Note: the mismatched spaces are intentional
config.rabbit_hosts = "rabbit_host1, rabbit_host2,rabbit_host3"
config.rabbit_port = 1234
config.rabbit_username = "rabbit_username"
config.rabbit_password = "rabbit_password"
return config


def test_generate_login_str(mocked_config):
"""
Test that the function generates the correct login string
"""
expected = (
"amqp://"
"rabbit_username:rabbit_password@rabbit_host1:1234,"
"rabbit_username:rabbit_password@rabbit_host2:1234,"
"rabbit_username:rabbit_password@rabbit_host3:1234"
)

assert generate_login_str(mocked_config) == expected


def test_generate_login_str_no_hosts(mocked_config):
"""
Test that the function raises if nothing is passed
"""
mocked_config.rabbit_hosts = ""
with pytest.raises(ValueError):
assert generate_login_str(mocked_config)


def test_generate_login_non_str(mocked_config):
"""
Test that the function raises if the input is not a string
"""
mocked_config.rabbit_hosts = 1234
with pytest.raises(ValueError):
assert generate_login_str(mocked_config)


@patch("rabbit_consumer.message_consumer.logger")
def test_password_does_not_get_logged(logging, mocked_config):
"""
Test that the password does not get logged
"""
returned_str = generate_login_str(mocked_config)
logging.debug.assert_called_once()
logging_arg = logging.debug.call_args[0][1]
assert mocked_config.rabbit_password in returned_str

# Check that the password is not in the log message
assert mocked_config.rabbit_username in logging_arg
assert mocked_config.rabbit_password not in logging_arg
meoflynn marked this conversation as resolved.
Show resolved Hide resolved


@patch("rabbit_consumer.message_consumer.verify_kerberos_ticket")
@patch("rabbit_consumer.message_consumer.generate_login_str")
@patch("rabbit_consumer.message_consumer.rabbitpy")
def test_initiate_consumer_channel_setup(rabbitpy, _):
def test_initiate_consumer_channel_setup(rabbitpy, gen_login, _, mocked_config):
"""
Test that the function sets up the channel and queue correctly
"""
mocked_config = MockedConfig()

with patch("rabbit_consumer.message_consumer.ConsumerConfig") as config:
config.return_value = mocked_config
initiate_consumer()

rabbitpy.Connection.assert_called_once_with(
f"amqp://{mocked_config.rabbit_username}:{mocked_config.rabbit_password}@{mocked_config.rabbit_host}:{mocked_config.rabbit_port}/"
)
gen_login.assert_called_once_with(mocked_config)

rabbitpy.Connection.assert_called_once_with(gen_login.return_value)
connection = rabbitpy.Connection.return_value.__enter__.return_value
connection.channel.assert_called_once()
channel = connection.channel.return_value.__enter__.return_value
Expand All @@ -149,7 +198,8 @@ def test_initiate_consumer_actual_consumption(rabbitpy, message_mock, _):
# We need our mocked queue to act like a generator
rabbitpy.Queue.return_value.__iter__.return_value = queue_messages

initiate_consumer()
with patch("rabbit_consumer.message_consumer.generate_login_str"):
initiate_consumer()

message_mock.assert_has_calls([call(message) for message in queue_messages])

Expand Down
2 changes: 1 addition & 1 deletion OpenStack-Rabbit-Consumer/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.3.7
3.0.0
Loading