diff --git a/karapace/auth.py b/karapace/auth.py index e70efe166..e7a71ac80 100644 --- a/karapace/auth.py +++ b/karapace/auth.py @@ -3,6 +3,7 @@ See LICENSE for details """ from base64 import b64encode +from collections import defaultdict from dataclasses import dataclass, field from enum import Enum, unique from hmac import compare_digest @@ -10,7 +11,7 @@ from karapace.rapu import JSON_CONTENT_TYPE from karapace.statsd import StatsClient from typing import Optional -from watchfiles import awatch +from watchfiles import awatch, Change import aiohttp import aiohttp.web @@ -87,24 +88,31 @@ async def start_refresh_task(self, stats: StatsClient) -> None: async def _refresh_authfile() -> None: """Reload authfile, but keep old auth data if loading fails""" - try: - # Poll delay for fallback mechanism if system notifications cannot be used. - async for changes in awatch( - self._auth_filename, poll_delay_ms=5000, stop_event=self._refresh_auth_awatch_stop_event - ): - unique_files = {filename for _, filename in changes} - for filename in unique_files: - if self._auth_filename in filename: - try: - self._load_authfile() - except InvalidConfiguration as e: - log.warning("Could not load authentication file: %s", e) - except asyncio.CancelledError: - log.info("Closing schema registry ACL refresh task") - return - except Exception as ex: # pylint: disable=broad-except - log.exception("Schema registry auth file could not be loaded") - stats.unexpected_exception(ex=ex, where="schema_registry_authfile_reloader") + while True: + try: + async for changes in awatch(self._auth_filename, stop_event=self._refresh_auth_awatch_stop_event): + changes_by_filename = defaultdict(set) + for change, filename in changes: + changes_by_filename[filename].add(change) + for filename in changes_by_filename.copy(): + if self._auth_filename in filename: + try: + self._load_authfile() + except InvalidConfiguration as e: + log.warning("Could not load authentication file: %s", e) + finally: + if Change.deleted in changes_by_filename[self._auth_filename]: + raise StopIteration + except asyncio.CancelledError: + log.info("Closing schema registry ACL refresh task") + return + except StopIteration: + # Reset watch after delete event (e.g. file is replaced) + pass + except Exception as ex: # pylint: disable=broad-except + log.exception("Schema registry auth file could not be loaded") + stats.unexpected_exception(ex=ex, where="schema_registry_authfile_reloader") + return self._refresh_auth_task = asyncio.create_task(_refresh_authfile())