Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry healthcheck with all available configs #33

Merged
merged 4 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion worker/manager/src/mirrors_qa_manager/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def exec_command(
result = container.exec_run(cmd)
if result.exit_code != 0:
raise APIError(
f"Unable to execute command {cmd} in {container.name}, "
f"unable to execute command {cmd} in {container.name}, "
f"result: {result.output}, status: {result.exit_code}"
)
return result # pyright: ignore[reportReturnType]
211 changes: 91 additions & 120 deletions worker/manager/src/mirrors_qa_manager/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from urllib.parse import urlencode

import pycountry
from docker.models.containers import Container
from docker.errors import APIError
from docker.models.containers import Container, ExecResult
from docker.types import Mount

from mirrors_qa_manager import logger
Expand Down Expand Up @@ -112,56 +113,71 @@ def get_country_codes_from_config_files(self) -> list[str]:

return list(country_codes)

def copy_wireguard_conf_file(self, country_code: str | None = None) -> Path:
"""Path to copied-from-base-dir <country_code>.conf file in instance directory.

If country_code is None, first file with .conf suffix is copied.
Raises:
FileNotFoundError: configuration file was not found.
"""

if country_code:
pattern = f"{country_code}*.conf"
else:
pattern = "*.conf"

conf_name = None
try:
# Select a random file matching the pattern
conf_name = random.choice( # noqa: S311
list(self.base_dir.glob(pattern))
).name
except IndexError:
# no files found in base_dir
pass

if conf_name is None:
if country_code:
message = (
f"Configuration file for {country_code} was not "
f"found in {self.base_dir}"
)
else:
message = (
f"No wireguard configuration file was found in {self.base_dir}"
)
raise FileNotFoundError(message)

def copy_wireguard_conf_file(self, conf_fpath: Path) -> Path:
"""Copy configuration file to wireguard configuration folder."""
# Move the configuration file to the wg_confs folder of the wireguard
# container.
return shutil.copy(
self.base_dir / conf_name,
conf_fpath,
self.wg_confs_dir / f"{self.wg_interface}.conf",
)

def start_wireguard_container(self, image_name: str) -> Container:
# Copy the first configuration file we see during start up by passing
# no argument. Ensures configuration files actuallly exist before starting
# the wireguard container
self.copy_wireguard_conf_file()
def wg_container_is_healthy(self) -> ExecResult | None:
"""Check if a healthcheck command was successful on container."""
try:
return exec_command(
self.docker,
Settings.WIREGUARD_CONTAINER_NAME,
self.wg_healthcheck_cmd,
)
except APIError:
logger.error("error whlie performing healthcheck: {exc!s}")
return None

def wg_healthcheck_till_healthy(self, conf_fpaths: list[Path]) -> ExecResult | None:
rgaudin marked this conversation as resolved.
Show resolved Hide resolved
"""Try wg healthcheck till status is healthy using configuration files."""
for conf_fpath in conf_fpaths:
# Copy the configuration file to the confs folder
self.copy_wireguard_conf_file(conf_fpath)
# After copying the file, restart the interface.
if self.wg_interface_status == WgInterfaceStatus.UP:
try:
exec_command(
self.docker,
Settings.WIREGUARD_CONTAINER_NAME,
self.wg_down_cmd,
)
except APIError as exc:
logger.debug(
f"error while bringing down wireguard interface: {exc!s}"
)
pass
else:
self.wg_interface_status = WgInterfaceStatus.DOWN

if self.wg_interface_status == WgInterfaceStatus.DOWN:
try:
exec_command(
self.docker,
Settings.WIREGUARD_CONTAINER_NAME,
self.wg_up_cmd,
)
except APIError as exc:
logger.debug(f"error while bringing up wireguard interface {exc!s}")
pass
else:
self.wg_interface_status = WgInterfaceStatus.UP

logger.debug(f"Checking wireguard interface status using {conf_fpath.name}")

if healthcheck_result := self.wg_container_is_healthy():
return healthcheck_result
return None

def start_wireguard_container(self, image_name: str, conf_fpath: Path) -> Container:
# Try and remove container if it wasn't removed before
self.remove_container(Settings.WIREGUARD_CONTAINER_NAME)
logger.info("Starting wireguard container")
self.copy_wireguard_conf_file(conf_fpath)
# Mount the wireguard directories using the host's fs, not this container's
mounts = [
Mount("/config", str(self.get_host_fpath(self.wg_root_dir)), type="bind"),
Expand Down Expand Up @@ -294,20 +310,30 @@ def get_country_code(self, country_name: str) -> str:

def run(self) -> None:
logger.info("Starting worker manager.")
# First time startup, get all available configuration files.
conf_fpaths = self.base_dir.glob("*.conf")
try:
# Fetch the first configuration file for starting the container.
conf_fpath = next(conf_fpaths)
except StopIteration as exc:
# no configuration files were found
message = f"No wireguard configuration file was found in {self.base_dir}"
raise FileNotFoundError(message) from exc
# Start the wireguard network container
self.start_wireguard_container(Settings.WIREGUARD_IMAGE)
logger.info(f"Starting wireguard container using {conf_fpath.name}.")
self.start_wireguard_container(Settings.WIREGUARD_IMAGE, conf_fpath=conf_fpath)
# When we start the container with the configuration file for the first
# time, the interface status is going to be UP. We keep track of this
# status before performing healthchecks on the container.
self.wg_interface_status = WgInterfaceStatus.UP
while True:
try:
# Ensure the wireguard container is still up
if (
exec_command(
self.docker,
Settings.WIREGUARD_CONTAINER_NAME,
self.wg_healthcheck_cmd,
).exit_code
== 0
):
self.wg_interface_status = WgInterfaceStatus.UP
if not self.wg_container_is_healthy():
# Try all the availalbe configuration files till container is up.
if self.wg_healthcheck_till_healthy(list(conf_fpaths)) is None:
rgaudin marked this conversation as resolved.
Show resolved Hide resolved
error_message = "Unable to start wireguard container."
raise Exception(error_message)

# Update the worker list of countries using the configuration files
self.update_countries_list()
Expand All @@ -316,87 +342,32 @@ def run(self) -> None:
for test in tests:
test_id = test["id"]
country_code = test["country_code"]
# Fetch the configuration file for the requested country
try:
self.copy_wireguard_conf_file(country_code)
except FileNotFoundError:
# Fetch all configuration files for the requested country
conf_fpaths = list(self.base_dir.glob(f"{country_code}*.conf"))
if not conf_fpaths:
logger.error(
f"Could not find {country_code}.conf for "
f"Could not find any configuration file for {country_code}"
f"test {test_id}. Skipping test."
)
continue
except Exception:
logger.error(
f"error while fetching {country_code}.conf for {test_id}"
)
continue
# Shuffle the order of the configuration files so we don't
# always test with the same configuration file.
random.shuffle(conf_fpaths)

logger.info(
f"Reconfiguring wireguard network interface for {country_code}"
)

# After copying the file, restart the interface.
if self.wg_interface_status == WgInterfaceStatus.UP:
logger.info(
f"Bringing down wireguard interface for test {test_id} "
f"country_code: {country_code}"
)
try:
exec_command(
self.docker,
Settings.WIREGUARD_CONTAINER_NAME,
self.wg_down_cmd,
)
except Exception as exc:
logger.error(
f"error while bringing down wireguard interface "
f"for test {test_id}, country: {country_code}: {exc!s}"
)
continue
else:
self.wg_interface_status = WgInterfaceStatus.DOWN

if self.wg_interface_status == WgInterfaceStatus.DOWN:
logger.info(
f"Bringing up wireguard interface for test {test_id}, "
f"country_code: {country_code}"
)
try:
exec_command(
self.docker,
Settings.WIREGUARD_CONTAINER_NAME,
self.wg_up_cmd,
)
except Exception as exc:
logger.error(
f"error while bringing up wireguard interface "
f"for test {test_id}, country: {country_code}: "
f"{exc!s}"
)
continue
else:
self.wg_interface_status = WgInterfaceStatus.UP

# Perform another healthcheck to ensure traffic can go
# through. Result will be used for populating the IP-related
# data
logger.info(
"Checking if traffic can pass through wireguard interface "
f"for test {test_id}, country: {country_code}"
)
try:
healthcheck_result = exec_command(
self.docker,
Settings.WIREGUARD_CONTAINER_NAME,
self.wg_healthcheck_cmd,
)
except Exception as exc:
healthcheck_result = self.wg_healthcheck_till_healthy(conf_fpaths)
rgaudin marked this conversation as resolved.
Show resolved Hide resolved
if healthcheck_result is None:
logger.error(
"error while pefroming wireguard healthcheck for "
f"test {test_id}, country: {country_code}, {exc!s}"
f"test {test_id}, country: {country_code}"
)
continue

logger.debug(f"Healthcheck result: {healthcheck_result.output}")

# Start container for the task
task_container_name = f"task-worker-{test_id}"
# It is possible that a container with the existing name already
Expand Down
Loading