Skip to content

Commit

Permalink
retry healthcheck with all available configs
Browse files Browse the repository at this point in the history
  • Loading branch information
elfkuzco committed Aug 5, 2024
1 parent 512c059 commit 5c467e2
Showing 1 changed file with 90 additions and 120 deletions.
210 changes: 90 additions & 120 deletions worker/manager/src/mirrors_qa_manager/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from urllib.parse import urlencode

import pycountry
from docker.models.containers import Container
from docker.errors import APIError
from docker.models.containers import Container, ExecResult
from docker.types import Mount

from mirrors_qa_manager import logger
Expand Down Expand Up @@ -112,56 +113,70 @@ def get_country_codes_from_config_files(self) -> list[str]:

return list(country_codes)

def copy_wireguard_conf_file(self, country_code: str | None = None) -> Path:
"""Path to copied-from-base-dir <country_code>.conf file in instance directory.
If country_code is None, first file with .conf suffix is copied.
Raises:
FileNotFoundError: configuration file was not found.
"""

if country_code:
pattern = f"{country_code}*.conf"
else:
pattern = "*.conf"

conf_name = None
try:
# Select a random file matching the pattern
conf_name = random.choice( # noqa: S311
list(self.base_dir.glob(pattern))
).name
except IndexError:
# no files found in base_dir
pass

if conf_name is None:
if country_code:
message = (
f"Configuration file for {country_code} was not "
f"found in {self.base_dir}"
)
else:
message = (
f"No wireguard configuration file was found in {self.base_dir}"
)
raise FileNotFoundError(message)

def copy_wireguard_conf_file(self, conf_fpath: Path) -> Path:
"""Copy configuration file to wireguard configuration folder."""
# Move the configuration file to the wg_confs folder of the wireguard
# container.
return shutil.copy(
self.base_dir / conf_name,
conf_fpath,
self.wg_confs_dir / f"{self.wg_interface}.conf",
)

def start_wireguard_container(self, image_name: str) -> Container:
# Copy the first configuration file we see during start up by passing
# no argument. Ensures configuration files actuallly exist before starting
# the wireguard container
self.copy_wireguard_conf_file()
def wg_container_is_healthy(self) -> ExecResult | None:
"""Check if a healthcheck command was successful on container."""
try:
return exec_command(
self.docker,
Settings.WIREGUARD_CONTAINER_NAME,
self.wg_healthcheck_cmd,
)
except APIError:
return None

def wg_healthcheck_till_healthy(self, conf_fpaths: list[Path]) -> ExecResult | None:
"""Try wg healthcheck till status is healthy using configuration files."""
for conf_fpath in conf_fpaths:
# Copy the configuration file to the confs folder
self.copy_wireguard_conf_file(conf_fpath)
# After copying the file, restart the interface.
if self.wg_interface_status == WgInterfaceStatus.UP:
try:
exec_command(
self.docker,
Settings.WIREGUARD_CONTAINER_NAME,
self.wg_down_cmd,
)
except APIError as exc:
logger.debug(
f"error while bringing down wireguard interface: {exc!s}"
)
pass
else:
self.wg_interface_status = WgInterfaceStatus.DOWN

if self.wg_interface_status == WgInterfaceStatus.DOWN:
try:
exec_command(
self.docker,
Settings.WIREGUARD_CONTAINER_NAME,
self.wg_up_cmd,
)
except APIError as exc:
logger.debug(f"error while bringing up wireguard interface {exc!s}")
pass
else:
self.wg_interface_status = WgInterfaceStatus.UP

logger.debug(f"Checking wireguard interface status using {conf_fpath.name}")

if healthcheck_result := self.wg_container_is_healthy():
return healthcheck_result
return None

def start_wireguard_container(self, image_name: str, conf_fpath: Path) -> Container:
# Try and remove container if it wasn't removed before
self.remove_container(Settings.WIREGUARD_CONTAINER_NAME)
logger.info("Starting wireguard container")
self.copy_wireguard_conf_file(conf_fpath)
# Mount the wireguard directories using the host's fs, not this container's
mounts = [
Mount("/config", str(self.get_host_fpath(self.wg_root_dir)), type="bind"),
Expand Down Expand Up @@ -294,20 +309,30 @@ def get_country_code(self, country_name: str) -> str:

def run(self) -> None:
logger.info("Starting worker manager.")
# First time startup, get all available configuration files.
conf_fpaths = self.base_dir.glob("*.conf")
try:
# Fetch the first configuration file for starting the container.
conf_fpath = next(conf_fpaths)
except StopIteration as exc:
# no configuration files were found
message = f"No wireguard configuration file was found in {self.base_dir}"
raise FileNotFoundError(message) from exc
# Start the wireguard network container
self.start_wireguard_container(Settings.WIREGUARD_IMAGE)
logger.info(f"Starting wireguard container using {conf_fpath.name}.")
self.start_wireguard_container(Settings.WIREGUARD_IMAGE, conf_fpath=conf_fpath)
# When we start the container with the configuration file for the first
# time, the interface status is going to be UP. We keep track of this
# status before performing healthchecks on the container.
self.wg_interface_status = WgInterfaceStatus.UP
while True:
try:
# Ensure the wireguard container is still up
if (
exec_command(
self.docker,
Settings.WIREGUARD_CONTAINER_NAME,
self.wg_healthcheck_cmd,
).exit_code
== 0
):
self.wg_interface_status = WgInterfaceStatus.UP
if not self.wg_container_is_healthy():
# Try all the availalbe configuration files till container is up.
if self.wg_healthcheck_till_healthy(list(conf_fpaths)) is None:
error_message = "Unable to start wireguard container."
raise Exception(error_message)

# Update the worker list of countries using the configuration files
self.update_countries_list()
Expand All @@ -316,87 +341,32 @@ def run(self) -> None:
for test in tests:
test_id = test["id"]
country_code = test["country_code"]
# Fetch the configuration file for the requested country
try:
self.copy_wireguard_conf_file(country_code)
except FileNotFoundError:
# Fetch all configuration files for the requested country
conf_fpaths = list(self.base_dir.glob(f"{country_code}*.conf"))
if not conf_fpaths:
logger.error(
f"Could not find {country_code}.conf for "
f"Could not find any configuration file for {country_code}"
f"test {test_id}. Skipping test."
)
continue
except Exception:
logger.error(
f"error while fetching {country_code}.conf for {test_id}"
)
continue
# Shuffle the order of the configuration files so we don't
# always test with the same configuration file.
random.shuffle(conf_fpaths)

logger.info(
f"Reconfiguring wireguard network interface for {country_code}"
)

# After copying the file, restart the interface.
if self.wg_interface_status == WgInterfaceStatus.UP:
logger.info(
f"Bringing down wireguard interface for test {test_id} "
f"country_code: {country_code}"
)
try:
exec_command(
self.docker,
Settings.WIREGUARD_CONTAINER_NAME,
self.wg_down_cmd,
)
except Exception as exc:
logger.error(
f"error while bringing down wireguard interface "
f"for test {test_id}, country: {country_code}: {exc!s}"
)
continue
else:
self.wg_interface_status = WgInterfaceStatus.DOWN

if self.wg_interface_status == WgInterfaceStatus.DOWN:
logger.info(
f"Bringing up wireguard interface for test {test_id}, "
f"country_code: {country_code}"
)
try:
exec_command(
self.docker,
Settings.WIREGUARD_CONTAINER_NAME,
self.wg_up_cmd,
)
except Exception as exc:
logger.error(
f"error while bringing up wireguard interface "
f"for test {test_id}, country: {country_code}: "
f"{exc!s}"
)
continue
else:
self.wg_interface_status = WgInterfaceStatus.UP

# Perform another healthcheck to ensure traffic can go
# through. Result will be used for populating the IP-related
# data
logger.info(
"Checking if traffic can pass through wireguard interface "
f"for test {test_id}, country: {country_code}"
)
try:
healthcheck_result = exec_command(
self.docker,
Settings.WIREGUARD_CONTAINER_NAME,
self.wg_healthcheck_cmd,
)
except Exception as exc:
healthcheck_result = self.wg_healthcheck_till_healthy(conf_fpaths)
if healthcheck_result is None:
logger.error(
"error while pefroming wireguard healthcheck for "
f"test {test_id}, country: {country_code}, {exc!s}"
f"test {test_id}, country: {country_code}"
)
continue

logger.debug(f"Healthcheck result: {healthcheck_result.output}")

# Start container for the task
task_container_name = f"task-worker-{test_id}"
# It is possible that a container with the existing name already
Expand Down

0 comments on commit 5c467e2

Please sign in to comment.