Skip to content

Commit

Permalink
Merge pull request #151 from stfc/Connect_to_rabbit_hosts_directly
Browse files Browse the repository at this point in the history
ENH: Add support for multiple rabbit hosts in con string
  • Loading branch information
meoflynn committed Jul 2, 2024
2 parents 62c065e + 94117d4 commit 66d1861
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class _RabbitFields:
environment variables.
"""

rabbit_host: str = field(default_factory=partial(os.getenv, "RABBIT_HOST", None))
rabbit_hosts: str = field(default_factory=partial(os.getenv, "RABBIT_HOST", None))
rabbit_port: str = field(default_factory=partial(os.getenv, "RABBIT_PORT", None))
rabbit_username: str = field(
default_factory=partial(os.getenv, "RABBIT_USERNAME", None)
Expand Down
39 changes: 29 additions & 10 deletions OpenStack-Rabbit-Consumer/rabbit_consumer/message_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,33 @@ def on_message(message: rabbitpy.Message) -> None:
message.ack()


def generate_login_str(config: ConsumerConfig) -> str:
"""
Generates the login string for the rabbit connection.
"""
if not config.rabbit_hosts:
raise ValueError("No rabbit hosts provided")

if not isinstance(config.rabbit_hosts, str):
raise ValueError("Rabbit hosts must be a comma separated string of hosts")

debug_str = "amqp://"
connect_str = "amqp://"

for host in config.rabbit_hosts.split(","):
host = host.strip()
connect_str += f"{config.rabbit_username}:{config.rabbit_password}@{host}:{config.rabbit_port},"
debug_str += f"{config.rabbit_username}:<password>@{host}:{config.rabbit_port},"

# Trim the trailing comma
connect_str = connect_str[:-1]
debug_str = debug_str[:-1]

logger.debug("Connecting to rabbit with: %s", debug_str)

return connect_str


def initiate_consumer() -> None:
"""
Initiates the message consumer and starts consuming messages in a loop.
Expand All @@ -263,18 +290,10 @@ def initiate_consumer() -> None:
# Ensure we have valid creds before trying to contact rabbit
verify_kerberos_ticket()

config = ConsumerConfig()

host = config.rabbit_host
port = config.rabbit_port
login_user = config.rabbit_username
login_pass = config.rabbit_password
logger.debug(
"Connecting to rabbit with: amqp://%s:<password>@%s:%s/", login_user, host, port
)
exchanges = ["nova"]

login_str = f"amqp://{login_user}:{login_pass}@{host}:{port}/"
config = ConsumerConfig()
login_str = generate_login_str(config)
with rabbitpy.Connection(login_str) as conn:
with conn.channel() as channel:
logger.debug("Connected to RabbitMQ")
Expand Down
2 changes: 1 addition & 1 deletion OpenStack-Rabbit-Consumer/tests/test_consumer_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
]

RABBIT_FIELDS = [
("rabbit_host", "RABBIT_HOST"),
("rabbit_hosts", "RABBIT_HOST"),
("rabbit_port", "RABBIT_PORT"),
("rabbit_username", "RABBIT_USERNAME"),
("rabbit_password", "RABBIT_PASSWORD"),
Expand Down
76 changes: 63 additions & 13 deletions OpenStack-Rabbit-Consumer/tests/test_message_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
is_aq_managed_image,
get_aq_build_metadata,
delete_machine,
generate_login_str,
)
from rabbit_consumer.vm_data import VmData

Expand Down Expand Up @@ -101,34 +102,82 @@ def test_on_message_accepts_event_types(message_event_type, consume, event_type)
message.ack.assert_called_once()


# pylint: disable=too-few-public-methods
class MockedConfig(ConsumerConfig):
@pytest.fixture(name="mocked_config")
def mocked_config_fixture() -> ConsumerConfig:
"""
Provides a mocked input config for the consumer
"""
config = ConsumerConfig()

rabbit_host = "rabbit_host"
rabbit_port = 1234
rabbit_username = "rabbit_username"
rabbit_password = "rabbit_password"
# Note: the mismatched spaces are intentional
config.rabbit_hosts = "rabbit_host1, rabbit_host2,rabbit_host3"
config.rabbit_port = 1234
config.rabbit_username = "rabbit_username"
config.rabbit_password = "rabbit_password"
return config


def test_generate_login_str(mocked_config):
"""
Test that the function generates the correct login string
"""
expected = (
"amqp://"
"rabbit_username:rabbit_password@rabbit_host1:1234,"
"rabbit_username:rabbit_password@rabbit_host2:1234,"
"rabbit_username:rabbit_password@rabbit_host3:1234"
)

assert generate_login_str(mocked_config) == expected


def test_generate_login_str_no_hosts(mocked_config):
"""
Test that the function raises if nothing is passed
"""
mocked_config.rabbit_hosts = ""
with pytest.raises(ValueError):
assert generate_login_str(mocked_config)


def test_generate_login_non_str(mocked_config):
"""
Test that the function raises if the input is not a string
"""
mocked_config.rabbit_hosts = 1234
with pytest.raises(ValueError):
assert generate_login_str(mocked_config)


@patch("rabbit_consumer.message_consumer.logger")
def test_password_does_not_get_logged(logging, mocked_config):
"""
Test that the password does not get logged
"""
returned_str = generate_login_str(mocked_config)
logging.debug.assert_called_once()
logging_arg = logging.debug.call_args[0][1]
assert mocked_config.rabbit_password in returned_str

# Check that the password is not in the log message
assert mocked_config.rabbit_username in logging_arg
assert mocked_config.rabbit_password not in logging_arg


@patch("rabbit_consumer.message_consumer.verify_kerberos_ticket")
@patch("rabbit_consumer.message_consumer.generate_login_str")
@patch("rabbit_consumer.message_consumer.rabbitpy")
def test_initiate_consumer_channel_setup(rabbitpy, _):
def test_initiate_consumer_channel_setup(rabbitpy, gen_login, _, mocked_config):
"""
Test that the function sets up the channel and queue correctly
"""
mocked_config = MockedConfig()

with patch("rabbit_consumer.message_consumer.ConsumerConfig") as config:
config.return_value = mocked_config
initiate_consumer()

rabbitpy.Connection.assert_called_once_with(
f"amqp://{mocked_config.rabbit_username}:{mocked_config.rabbit_password}@{mocked_config.rabbit_host}:{mocked_config.rabbit_port}/"
)
gen_login.assert_called_once_with(mocked_config)

rabbitpy.Connection.assert_called_once_with(gen_login.return_value)
connection = rabbitpy.Connection.return_value.__enter__.return_value
connection.channel.assert_called_once()
channel = connection.channel.return_value.__enter__.return_value
Expand All @@ -149,7 +198,8 @@ def test_initiate_consumer_actual_consumption(rabbitpy, message_mock, _):
# We need our mocked queue to act like a generator
rabbitpy.Queue.return_value.__iter__.return_value = queue_messages

initiate_consumer()
with patch("rabbit_consumer.message_consumer.generate_login_str"):
initiate_consumer()

message_mock.assert_has_calls([call(message) for message in queue_messages])

Expand Down
2 changes: 1 addition & 1 deletion OpenStack-Rabbit-Consumer/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.3.7
3.0.0
4 changes: 2 additions & 2 deletions charts/rabbit-consumer/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 1.6.1
version: 1.7.0

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "v2.3.7"
appVersion: "v3.0.0"
2 changes: 1 addition & 1 deletion charts/rabbit-consumer/prod-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ consumer:
defaultPrefix: vm-openstack-Prod-

rabbitmq:
host: openstack.stfc.ac.uk
host: hv747.nubes.rl.ac.uk

openstack:
authUrl: https://openstack.stfc.ac.uk:5000/v3
Expand Down

0 comments on commit 66d1861

Please sign in to comment.