Skip to content

Commit

Permalink
Merge pull request #738 from maresb/refactor-lookup-again
Browse files Browse the repository at this point in the history
More refactoring of lookup module
  • Loading branch information
maresb authored Oct 29, 2024
2 parents 1491c29 + 5f3c267 commit 1436bc1
Show file tree
Hide file tree
Showing 3 changed files with 515 additions and 105 deletions.
119 changes: 14 additions & 105 deletions conda_lock/lookup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import hashlib
import json
import logging
import time
Expand All @@ -7,12 +6,10 @@
from pathlib import Path
from typing import Dict, TypedDict

import requests

from filelock import FileLock, Timeout
from packaging.utils import NormalizedName, canonicalize_name
from packaging.utils import NormalizedName
from packaging.utils import canonicalize_name as canonicalize_pypi_name
from platformdirs import user_cache_path

from conda_lock.lookup_cache import cached_download_file


logger = logging.getLogger(__name__)
Expand All @@ -31,7 +28,7 @@ class MappingEntry(TypedDict):
def _get_pypi_lookup(mapping_url: str) -> Dict[NormalizedName, MappingEntry]:
url = mapping_url
if url.startswith("http://") or url.startswith("https://"):
content = cached_download_file(url)
content = cached_download_file(url, cache_subdir_name="pypi-mapping")
else:
if url.startswith("file://"):
path = url[len("file://") :]
Expand All @@ -51,9 +48,9 @@ def _get_pypi_lookup(mapping_url: str) -> Dict[NormalizedName, MappingEntry]:
logger.debug(f"Loaded {len(lookup)} entries in {load_duration:.2f}s")
# lowercase and kebabcase the pypi names
assert lookup is not None
lookup = {canonicalize_name(k): v for k, v in lookup.items()}
lookup = {canonicalize_pypi_name(k): v for k, v in lookup.items()}
for v in lookup.values():
v["pypi_name"] = canonicalize_name(v["pypi_name"])
v["pypi_name"] = canonicalize_pypi_name(v["pypi_name"])
return lookup


Expand All @@ -68,18 +65,15 @@ def pypi_name_to_conda_name(name: str, mapping_url: str) -> str:
'zpfqzvrj'
"""
cname = canonicalize_pypi_name(name)
if cname in _get_pypi_lookup(mapping_url):
lookup = _get_pypi_lookup(mapping_url)[cname]
res = lookup.get("conda_name") or lookup.get("conda_forge")
lookup = _get_pypi_lookup(mapping_url)
if cname in lookup:
entry = lookup[cname]
res = entry.get("conda_name") or entry.get("conda_forge")
if res is not None:
return res
else:
logging.warning(
f"Could not find conda name for {cname}. Assuming identity."
)
return cname
else:
return cname

logger.warning(f"Could not find conda name for {cname}. Assuming identity.")
return cname


@lru_cache(maxsize=None)
Expand All @@ -96,90 +90,5 @@ def _get_conda_lookup(mapping_url: str) -> Dict[str, MappingEntry]:
def conda_name_to_pypi_name(name: str, mapping_url: str) -> NormalizedName:
"""return the pypi name for a conda package"""
lookup = _get_conda_lookup(mapping_url=mapping_url)
cname = canonicalize_name(name)
cname = canonicalize_pypi_name(name)
return lookup.get(cname, {"pypi_name": cname})["pypi_name"]


def cached_download_file(url: str) -> bytes:
"""Download a file and cache it in the user cache directory.
If the file is already cached, return the cached contents.
If the file is not cached, download it and cache the contents
and the ETag.
Protect against multiple processes downloading the same file.
"""
CLEAR_CACHE_AFTER_SECONDS = 60 * 60 * 24 * 2 # 2 days
DONT_CHECK_IF_NEWER_THAN_SECONDS = 60 * 5 # 5 minutes
current_time = time.time()
cache = user_cache_path("conda-lock", appauthor=False)
cache.mkdir(parents=True, exist_ok=True)

# clear out old cache files
for file in cache.iterdir():
if file.name.startswith("pypi-mapping-"):
mtime = file.stat().st_mtime
age = current_time - mtime
if age < 0 or age > CLEAR_CACHE_AFTER_SECONDS:
logger.debug("Removing old cache file %s", file)
file.unlink()

url_hash = hashlib.sha256(url.encode()).hexdigest()[:4]
destination_mapping = cache / f"pypi-mapping-{url_hash}.yaml"
destination_etag = destination_mapping.with_suffix(".etag")
destination_lock = destination_mapping.with_suffix(".lock")

# Wait for any other process to finish downloading the file.
# Use the ETag to avoid downloading the file if it hasn't changed.
# Otherwise, download the file and cache the contents and ETag.
while True:
try:
with FileLock(destination_lock, timeout=5):
# Return the contents immediately if the file is fresh
try:
mtime = destination_mapping.stat().st_mtime
age = current_time - mtime
if age < DONT_CHECK_IF_NEWER_THAN_SECONDS:
contents = destination_mapping.read_bytes()
logger.debug(
f"Using cached mapping {destination_mapping} without "
f"checking for updates"
)
return contents
except FileNotFoundError:
pass
# Get the ETag from the last download, if it exists
if destination_mapping.exists() and destination_etag.exists():
logger.debug(f"Old ETag found at {destination_etag}")
try:
old_etag = destination_etag.read_text().strip()
headers = {"If-None-Match": old_etag}
except FileNotFoundError:
logger.warning("Failed to read ETag")
headers = {}
else:
headers = {}
# Download the file and cache the result.
logger.debug(f"Requesting {url}")
res = requests.get(url, headers=headers)
if res.status_code == 304:
logger.debug(
f"{url} has not changed since last download, "
f"using {destination_mapping}"
)
else:
res.raise_for_status()
time.sleep(10)
destination_mapping.write_bytes(res.content)
if "ETag" in res.headers:
destination_etag.write_text(res.headers["ETag"])
else:
logger.warning("No ETag in response headers")
logger.debug(f"Downloaded {url} to {destination_mapping}")
return destination_mapping.read_bytes()

except Timeout:
logger.warning(
f"Failed to acquire lock on {destination_lock}, it is likely "
f"being downloaded by another process. Retrying..."
)
179 changes: 179 additions & 0 deletions conda_lock/lookup_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import hashlib
import logging
import platform
import re

from datetime import datetime
from pathlib import Path
from typing import Optional

import requests

from filelock import FileLock, Timeout
from platformdirs import user_cache_path


logger = logging.getLogger(__name__)


CLEAR_CACHE_AFTER_SECONDS = 60 * 60 * 24 * 2 # 2 days
"""Cached files older than this will be deleted."""

DONT_CHECK_IF_NEWER_THAN_SECONDS = 60 * 5 # 5 minutes
"""If the cached file is newer than this, just use it without checking for updates."""

WINDOWS_TIME_EPSILON = 0.005
"""Windows has issues with file timestamps, so we add this small offset
to ensure that newly created files have a positive age.
"""


def uncached_download_file(url: str) -> bytes:
"""The simple equivalent to cached_download_file."""
res = requests.get(url, headers={"User-Agent": "conda-lock"})
res.raise_for_status()
return res.content


def cached_download_file(
url: str,
*,
cache_subdir_name: str,
cache_root: Optional[Path] = None,
max_age_seconds: float = CLEAR_CACHE_AFTER_SECONDS,
dont_check_if_newer_than_seconds: float = DONT_CHECK_IF_NEWER_THAN_SECONDS,
) -> bytes:
"""Download a file and cache it in the user cache directory.
If the file is already cached, return the cached contents.
If the file is not cached, download it and cache the contents
and the ETag.
Protect against multiple processes downloading the same file.
"""
if cache_root is None:
cache_root = user_cache_path("conda-lock", appauthor=False)
cache = cache_root / "cache" / cache_subdir_name
cache.mkdir(parents=True, exist_ok=True)
clear_old_files_from_cache(cache, max_age_seconds=max_age_seconds)

destination_lock = (cache / cached_filename_for_url(url)).with_suffix(".lock")

# Wait for any other process to finish downloading the file.
# This way we can use the result from the current download without
# spawning multiple concurrent downloads.
while True:
try:
with FileLock(str(destination_lock), timeout=5):
return _download_to_or_read_from_cache(
url,
cache=cache,
dont_check_if_newer_than_seconds=dont_check_if_newer_than_seconds,
)
except Timeout:
logger.warning(
f"Failed to acquire lock on {destination_lock}, it is likely "
f"being downloaded by another process. Retrying..."
)


def _download_to_or_read_from_cache(
url: str, *, cache: Path, dont_check_if_newer_than_seconds: float
) -> bytes:
"""Download a file to the cache directory and return the contents.
This function is designed to be called from within a FileLock context to avoid
multiple processes downloading the same file.
If the file is newer than `dont_check_if_newer_than_seconds`, then immediately
return the cached contents. Otherwise we pass the ETag from the last download
in the headers to avoid downloading the file if it hasn't changed remotely.
"""
destination = cache / cached_filename_for_url(url)
destination_etag = destination.with_suffix(".etag")
request_headers = {"User-Agent": "conda-lock"}
# Return the contents immediately if the file is fresh
if destination.is_file():
age_seconds = get_age_seconds(destination)
if 0 <= age_seconds < dont_check_if_newer_than_seconds:
logger.debug(
f"Using cached mapping {destination} of age {age_seconds}s "
f"without checking for updates"
)
return destination.read_bytes()
# Add the ETag from the last download, if it exists, to the headers.
# The ETag is used to avoid downloading the file if it hasn't changed remotely.
# Otherwise, download the file and cache the contents and ETag.
if destination_etag.is_file():
old_etag = destination_etag.read_text().strip()
request_headers["If-None-Match"] = old_etag
# Download the file and cache the result.
logger.debug(f"Requesting {url}")
res = requests.get(url, headers=request_headers)
if res.status_code == 304:
logger.debug(f"{url} has not changed since last download, using {destination}")
else:
res.raise_for_status()
destination.write_bytes(res.content)
if "ETag" in res.headers:
destination_etag.write_text(res.headers["ETag"])
else:
logger.warning("No ETag in response headers")
logger.debug(f"Downloaded {url} to {destination}")
return destination.read_bytes()


def cached_filename_for_url(url: str) -> str:
"""Return a filename for a URL that is probably unique to the URL.
The filename is a 4-character hash of the URL, followed by the extension.
If the extension is not alphanumeric or too long, it is omitted.
>>> cached_filename_for_url("https://example.com/foo.json")
'a5d7.json'
>>> cached_filename_for_url("https://example.com/foo")
'5ea6'
>>> cached_filename_for_url("https://example.com/foo.bär")
'2191'
>>> cached_filename_for_url("https://example.com/foo.baaaaaar")
'1861'
"""
url_hash = hashlib.sha256(url.encode()).hexdigest()[:4]
extension = url.split(".")[-1]
if len(extension) <= 6 and re.match("^[a-zA-Z0-9]+$", extension):
return f"{url_hash}.{extension}"
else:
return f"{url_hash}"


def clear_old_files_from_cache(cache: Path, *, max_age_seconds: float) -> None:
"""Remove files in the cache directory older than `max_age_seconds`.
Also removes any files that somehow have a modification time in the future.
For safety, this raises an error if `cache` is not a subdirectory of
a directory named `"cache"`.
"""
if not cache.parent.name == "cache":
raise ValueError(
f"Expected cache directory, got {cache}. Parent should be 'cache' ",
f"not '{cache.parent.name}'",
)
for file in cache.iterdir():
age_seconds = get_age_seconds(file)
if age_seconds < 0 or age_seconds >= max_age_seconds:
logger.debug(f"Removing old cache file {file} of age {age_seconds}s")
file.unlink()


def get_age_seconds(path: Path) -> float:
"""Return the age of a file in seconds.
On Windows, the age of a new file is sometimes slightly negative, so we add a small
offset to ensure that the age is positive.
"""
raw_age = datetime.now().timestamp() - path.stat().st_mtime
if platform.system() == "Windows":
return raw_age + WINDOWS_TIME_EPSILON
else:
return raw_age
Loading

0 comments on commit 1436bc1

Please sign in to comment.