diff --git a/worker/manager/src/mirrors_qa_manager/worker.py b/worker/manager/src/mirrors_qa_manager/worker.py index 98e05ba..2075555 100644 --- a/worker/manager/src/mirrors_qa_manager/worker.py +++ b/worker/manager/src/mirrors_qa_manager/worker.py @@ -7,6 +7,7 @@ import signal import sys import time +from collections.abc import Iterable from enum import Enum from pathlib import Path from typing import Any @@ -291,16 +292,29 @@ def update_countries_list(self): "countries." ) - def fetch_tests(self) -> list[dict[str, Any]]: + def fetch_tests(self) -> Iterable[dict[str, str]]: logger.debug("Fetching tasks from backend API") - # Fetch tasks that were assigned to the worker that haven't been expired params = urlencode({"worker_id": self.worker_id, "status": "PENDING"}) - data = self.query_api(f"/tests?{params}") + while True: + data = self.query_api(f"/tests?{params}") + nb_tests = data["metadata"]["page_size"] + if nb_tests == 0: # No more pending tests to fetch + break + + logger.info(f"Fetched {nb_tests} test(s) from Backend API") + + current_page = data["metadata"]["current_page"] + last_page = data["metadata"]["last_page"] + + logger.debug( + f"Fetched page {current_page} of {last_page} of pending tests." + ) - logger.info(f"Fetched {data['metadata']['page_size']} test(s) from Backend API") + yield from data["tests"] - return data["tests"] + if current_page == last_page: + break def sleep(self) -> None: logger.info(f"Sleeping for {Settings.SLEEP_SECONDS}s") @@ -340,8 +354,7 @@ def run(self) -> None: # Update the worker list of countries using the configuration files self.update_countries_list() - tests = self.fetch_tests() - for test in tests: + for test in self.fetch_tests(): test_id = test["id"] country_code = test["country_code"] # Fetch all configuration files for the requested country