Skip to content

Commit

Permalink
Added a few type hints and opened Pandoras box.
Browse files Browse the repository at this point in the history
  • Loading branch information
MatthiasZepper committed Feb 27, 2024
1 parent dceb7b3 commit e64f1a7
Showing 1 changed file with 36 additions and 24 deletions.
60 changes: 36 additions & 24 deletions nf_core/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import tarfile
import textwrap
from datetime import datetime
from typing import List, Optional, Tuple
from zipfile import ZipFile

import git
Expand Down Expand Up @@ -978,7 +979,7 @@ def prioritize_direct_download(self, container_list):
d[k] = c
return sorted(list(d.values()))

def gather_registries(self, workflow_directory):
def gather_registries(self, workflow_directory: str) -> None:
"""Fetch the registries from the pipeline config and CLI arguments and store them in a set.
This is needed to symlink downloaded container images so Nextflow will find them.
"""
Expand All @@ -1002,7 +1003,7 @@ def gather_registries(self, workflow_directory):
# add depot.galaxyproject.org to the set, because it is the default registry for singularity hardcoded in modules
self.registry_set.add("depot.galaxyproject.org")

def symlink_singularity_images(self, image_out_path):
def symlink_singularity_images(self, image_out_path: str) -> None:
"""Create a symlink for each registry in the registry set that points to the image.
We have dropped the explicit registries from the modules in favor of the configurable registries.
Unfortunately, Nextflow still expects the registry to be part of the file name, so a symlink is needed.
Expand Down Expand Up @@ -1042,7 +1043,7 @@ def symlink_singularity_images(self, image_out_path):
finally:
os.close(image_dir)

def get_singularity_images(self, current_revision=""):
def get_singularity_images(self, current_revision: str = "") -> None:
"""Loop through container names and download Singularity images"""

if len(self.containers) == 0:
Expand All @@ -1060,10 +1061,10 @@ def get_singularity_images(self, current_revision=""):
)

# Organise containers based on what we need to do with them
containers_exist = []
containers_cache = []
containers_download = []
containers_pull = []
containers_exist: List[str] = []
containers_cache: List[Tuple[str, str, Optional[str]]] = []
containers_download: List[Tuple[str, str, Optional[str]]] = []
containers_pull: List[Tuple[str, str, Optional[str]]] = []
for container in self.containers:
# Fetch the output and cached filenames for this container
out_path, cache_path = self.singularity_image_filenames(container)
Expand All @@ -1086,16 +1087,16 @@ def get_singularity_images(self, current_revision=""):

# We have a copy of this in the NXF_SINGULARITY_CACHE dir
if cache_path and os.path.exists(cache_path):
containers_cache.append([container, out_path, cache_path])
containers_cache.append((container, out_path, cache_path))
continue

# Direct download within Python
if container.startswith("http"):
containers_download.append([container, out_path, cache_path])
containers_download.append((container, out_path, cache_path))
continue

# Pull using singularity
containers_pull.append([container, out_path, cache_path])
containers_pull.append((container, out_path, cache_path))

# Exit if we need to pull images and Singularity is not installed
if len(containers_pull) > 0:
Expand Down Expand Up @@ -1127,8 +1128,8 @@ def get_singularity_images(self, current_revision=""):

# Kick off concurrent downloads
future_downloads = [
pool.submit(self.singularity_download_image, *container, progress)
for container in containers_download
pool.submit(self.singularity_download_image, *containers, progress)
for containers in containers_download
]

# Make ctrl-c work with multi-threading
Expand All @@ -1153,13 +1154,13 @@ def get_singularity_images(self, current_revision=""):
# Re-raise exception on the main thread
raise

for container in containers_pull:
for containers in containers_pull:
progress.update(task, description="Pulling singularity images")
# it is possible to try multiple registries / mirrors if multiple were specified.
# Iteration happens over a copy of self.container_library[:], as I want to be able to remove failing registries for subsequent images.
for library in self.container_library[:]:
try:
self.singularity_pull_image(*container, library, progress)
self.singularity_pull_image(*containers, library, progress)
# Pulling the image was successful, no ContainerError was raised, break the library loop
break
except ContainerError.ImageExistsError:
Expand Down Expand Up @@ -1196,12 +1197,12 @@ def get_singularity_images(self, current_revision=""):
# The else clause executes after the loop completes normally.
# This means the library loop completed without breaking, indicating failure for all libraries (registries)
log.error(
f"Not able to pull image of {container}. Service might be down or internet connection is dead."
f"Not able to pull image of {containers}. Service might be down or internet connection is dead."
)
# Task should advance in any case. Failure to pull will not kill the download process.
progress.update(task, advance=1)

def singularity_image_filenames(self, container):
def singularity_image_filenames(self, container: str) -> Tuple[str, Optional[str]]:
"""Check Singularity cache for image, copy to destination folder if found.
Args:
Expand Down Expand Up @@ -1258,7 +1259,7 @@ def singularity_image_filenames(self, container):

return (out_path, cache_path)

def singularity_copy_cache_image(self, container, out_path, cache_path):
def singularity_copy_cache_image(self, container: str, out_path: str, cache_path: Optional[str]) -> None:
"""Copy Singularity image from NXF_SINGULARITY_CACHEDIR to target folder."""
# Copy to destination folder if we have a cached version
if cache_path and os.path.exists(cache_path):
Expand All @@ -1267,7 +1268,9 @@ def singularity_copy_cache_image(self, container, out_path, cache_path):
# Create symlinks to ensure that the images are found even with different registries being used.
self.symlink_singularity_images(out_path)

def singularity_download_image(self, container, out_path, cache_path, progress):
def singularity_download_image(
self, container: str, out_path: str, cache_path: Optional[str], progress: DownloadProgress
) -> None:
"""Download a singularity image from the web.
Use native Python to download the file.
Expand Down Expand Up @@ -1314,7 +1317,6 @@ def singularity_download_image(self, container, out_path, cache_path, progress):

# Rename partial filename to final filename
os.rename(output_path_tmp, output_path)
output_path_tmp = None

# Copy cached download if we are using the cache
if cache_path:
Expand All @@ -1339,8 +1341,12 @@ def singularity_download_image(self, container, out_path, cache_path, progress):
os.remove(output_path)
# Re-raise the caught exception
raise
finally:
del output_path_tmp

def singularity_pull_image(self, container, out_path, cache_path, library, progress):
def singularity_pull_image(
self, container: str, out_path: str, cache_path: Optional[str], library: List[str], progress: DownloadProgress
) -> None:
"""Pull a singularity image using ``singularity pull``
Attempt to use a local installation of singularity to pull the image.
Expand All @@ -1355,6 +1361,11 @@ def singularity_pull_image(self, container, out_path, cache_path, library, progr
"""
output_path = cache_path or out_path

# where the output of 'singularity pull' is first generated before being copied to the NXF_SINGULARITY_CACHDIR.
# if not defined by the Singularity administrators, then use the temporary directory to avoid storing the images in the work directory.
if os.environ.get("SINGULARITY_CACHEDIR") is None:
os.environ["SINGULARITY_CACHEDIR"] = NFCORE_CACHE_DIR

# Sometimes, container still contain an explicit library specification, which
# resulted in attempted pulls e.g. from docker://quay.io/quay.io/qiime2/core:2022.11
# Thus, if an explicit registry is specified, the provided -l value is ignored.
Expand Down Expand Up @@ -1399,9 +1410,10 @@ def singularity_pull_image(self, container, out_path, cache_path, library, progr
bufsize=1,
) as proc:
lines = []
for line in proc.stdout:
lines.append(line)
progress.update(task, current_log=line.strip())
if proc.stdout is not None:
for line in proc.stdout:
lines.append(line)
progress.update(task, current_log=line.strip())

if lines:
# something went wrong with the container retrieval
Expand All @@ -1428,7 +1440,7 @@ def singularity_pull_image(self, container, out_path, cache_path, library, progr

progress.remove_task(task)

def compress_download(self):
def compress_download(self) -> None:
"""Take the downloaded files and make a compressed .tar.gz archive."""
log.debug(f"Creating archive: {self.output_filename}")

Expand Down

0 comments on commit e64f1a7

Please sign in to comment.