From 20a952b0a087c55fda3611e72e4b413cc07a16dc Mon Sep 17 00:00:00 2001 From: Florian Zwagemaker Date: Mon, 27 Feb 2023 12:49:06 +0100 Subject: [PATCH] feat!: addition of proper logging functionality for the ViroConstrictor package feat!: replace snakemake logging output through our own log handler for unified output fix: suppress snakemake logging output (workaround for https://github.com/snakemake/snakemake/issues/2089) refactor!: Use a generic (`__main__.py`) top level entry-point instead of the named `ViroConstrictor.py` entrypoint refactor: re-structure argument parsing functionalities into its own class refactor: re-structure snakemake run-information and config functionalities into its own class refactor: remove old shell stdout-coloring method with the rich library refactor: simplify several functions to ensure a properly defined return refactor: Use f-strings more consistently for i.e. string concatenation with variables refactor: add type-hints to all functions fix: ensure `samples_df` and `samples_dict` always contain the same information --- ViroConstrictor/ViroConstrictor.py | 194 ----- ViroConstrictor/__main__.py | 192 +++++ ViroConstrictor/functions.py | 82 +- ViroConstrictor/logging.py | 199 +++++ ViroConstrictor/parser.py | 1037 +++++++++++++++---------- ViroConstrictor/runconfigs.py | 260 +++---- ViroConstrictor/runreport.py | 36 +- ViroConstrictor/samplesheet.py | 75 +- ViroConstrictor/update.py | 60 +- ViroConstrictor/userprofile.py | 99 ++- ViroConstrictor/validatefasta.py | 109 ++- ViroConstrictor/workflow/presets.py | 9 +- ViroConstrictor/workflow/workflow.smk | 22 +- setup.py | 8 +- 14 files changed, 1352 insertions(+), 1030 deletions(-) delete mode 100644 ViroConstrictor/ViroConstrictor.py create mode 100644 ViroConstrictor/__main__.py create mode 100644 ViroConstrictor/logging.py diff --git a/ViroConstrictor/ViroConstrictor.py b/ViroConstrictor/ViroConstrictor.py deleted file mode 100644 index 85d39e0..0000000 --- a/ViroConstrictor/ViroConstrictor.py +++ /dev/null @@ -1,194 +0,0 @@ -""" -Starting point of the ViroConstrictor pipeline and wrapper - -Copyright © 2021 RIVM - -https://github.com/RIVM-bioinformatics/ViroConstrictor -""" - -# pylint: disable=C0103 - -import os -import sys - -import snakemake -import yaml -import pandas as pd - -from ViroConstrictor import __version__ -from ViroConstrictor.functions import color -from ViroConstrictor.parser import ValidArgs -from ViroConstrictor.runconfigs import SnakemakeConfig, SnakemakeParams, WriteYaml -from ViroConstrictor.runreport import WriteReport -from ViroConstrictor.update import update -from ViroConstrictor.userprofile import ReadConfig -from ViroConstrictor.validatefasta import CheckReferenceFile - -yaml.warnings({"YAMLLoadWarning": False}) - - -def CheckSampleProperties(sampleinfo): - """This function checks that the reference fasta file exists and that it is a valid fasta file - - Parameters - ---------- - sampleinfo - A dictionary of dictionaries. The outer dictionary is keyed by sample name, and the inner - dictionary should contain the "REFERENCE" key containing a path to the reference fasta file. - - """ - for sample in sampleinfo: - if not os.path.isfile(sampleinfo.get(sample).get("REFERENCE")): - raise FileNotFoundError( - f"\n{color.RED + color.BOLD}The given reference fasta file for sample '{sample}' does not exist. Please check the reference fasta and try again. Exiting...{color.END}\n" - ) - - reference_files = {sampleinfo.get(sample).get("REFERENCE") for sample in sampleinfo} - for f in reference_files: - CheckReferenceFile(f) - - -def main(): - """ - ViroConstrictor starting point - --> Fetch and parse arguments - --> check validity - --> Read (or write, if necessary) the user-config files - --> Change working directories and make necessary local files for snakemake - --> Run snakemake with appropriate settings - """ - - ##> Check the default userprofile, make it if it doesn't exist - conf = ReadConfig(os.path.expanduser("~/.ViroConstrictor_defaultprofile.ini")) - - flags, sampleinfo, samples_df = ValidArgs(sys.argv[1:]) - samples_df = samples_df.reset_index(drop=False).rename(columns={"index": "SAMPLE"}) - sampleinfo_df = pd.DataFrame.from_dict(sampleinfo, orient="index").reset_index(drop=False).rename(columns={"index": "SAMPLE"}) - - preset_fallback_warnings = [] - preset_score_warnings = [] - for s in sampleinfo_df.itertuples(): - sample, preset, score, input_target = ( - s.SAMPLE, - s.PRESET, - s.PRESET_SCORE, - s.VIRUS, - ) - if score == 0.0: - warn = Warning( - f"""Sample '{sample}' was given the following information as an input-target: '{input_target}'. -This information could however not be used to determine a preset. Because of this, the preset '{preset}' was used instead. -Please check your input-target for any significant misspellings, or consider using an alias or a different abbreviation for your input-target to check whether this resolves the issue. - -It may be that your input-target does not yet have an associated preset in ViroConstrictor. -If your suspect this to be the case, please open an issue on the ViroConstrictor GitHub page: https://github.com/RIVM-bioinformatics/ViroConstrictor""" - ) - preset_fallback_warnings.append(warn) - continue - if score < 0.8: - warn = Warning( - f"""Sample '{sample}' was given the following information as an input-target: '{input_target}'. -As a result, the preset '{preset}' was chosen. But this was done with less than 80% certainty. -Certainty score: {score:.0%} -Please check the input-target and try again if a different preset is required.""" - ) - preset_score_warnings.append(warn) - continue - - if not flags.skip_updates: - update(sys.argv, conf) - - inpath = os.path.abspath(flags.input) - start_path = os.getcwd() - - outpath = os.path.abspath(flags.output) - - exec_folder = os.path.abspath(os.path.dirname(__file__)) - - Snakefile = os.path.join(exec_folder, "workflow", "workflow.smk") - - CheckSampleProperties(sampleinfo) # raises errors if stuff is not right - - ##@ check if the output dir exists, create if not - ##@ change the working directory - if not os.path.exists(outpath): - os.makedirs(outpath) - - if os.getcwd() != outpath: - os.chdir(outpath) - workdir = outpath - - samplesheet = WriteYaml(sampleinfo, f"{workdir}/samplesheet.yaml") - run_config = SnakemakeConfig(conf, flags.threads, flags.dryrun) - run_params = SnakemakeParams( - conf, - flags.threads, - sampleinfo, - flags.platform, - samplesheet, - flags.amplicon_type, - ) - - if conf["COMPUTING"]["compmode"] == "local": - status = snakemake.snakemake( - Snakefile, - workdir=workdir, - cores=run_config["cores"], - use_conda=run_config["use-conda"], - conda_frontend="mamba", - jobname=run_config["jobname"], - latency_wait=run_config["latency-wait"], - dryrun=run_config["dryrun"], - configfiles=[WriteYaml(run_params, f"{workdir}/config/run_params.yaml")], - restart_times=3, - keepgoing=True, - ) - if conf["COMPUTING"]["compmode"] == "grid": - status = snakemake.snakemake( - Snakefile, - workdir=workdir, - cores=run_config["cores"], - nodes=run_config["cores"], - use_conda=run_config["use-conda"], - conda_frontend="mamba", - jobname=run_config["jobname"], - latency_wait=run_config["latency-wait"], - drmaa=run_config["drmaa"], - drmaa_log_dir=run_config["drmaa-log-dir"], - dryrun=run_config["dryrun"], - configfiles=[WriteYaml(run_params, f"{workdir}/config/run_params.yaml")], - restart_times=3, - keepgoing=True, - ) - - if run_config["dryrun"] is False and status is True: - snakemake.snakemake( - Snakefile, - workdir=workdir, - report="results/snakemake_report.html", - configfiles=[WriteYaml(run_params, f"{workdir}/config/run_params.yaml")], - quiet=True, - ) - - workflow_state = "Failed" if status is False else "Success" - - WriteReport( - workdir, - inpath, - start_path, - conf, - run_params, - run_config, - workflow_state, - ) - - if preset_score_warnings and not flags.disable_presets: - for w in preset_score_warnings: - print(f"{color.YELLOW}Warning: {w}{color.END}\n") - if preset_fallback_warnings and not flags.disable_presets: - for w in preset_fallback_warnings: - print(f"{color.YELLOW + color.BOLD}Warning: {w}{color.END}\n") - - if status is False: - exit(1) - exit(0) diff --git a/ViroConstrictor/__main__.py b/ViroConstrictor/__main__.py new file mode 100644 index 0000000..0744083 --- /dev/null +++ b/ViroConstrictor/__main__.py @@ -0,0 +1,192 @@ +""" +Starting point of the ViroConstrictor pipeline and wrapper + +Copyright © 2021 RIVM + +https://github.com/RIVM-bioinformatics/ViroConstrictor +""" + +# pylint: disable=C0103 + +import sys +from typing import Literal, NoReturn + +import pandas as pd +import snakemake + +import ViroConstrictor.logging +from ViroConstrictor import __version__ +from ViroConstrictor.logging import log +from ViroConstrictor.parser import CLIparser +from ViroConstrictor.runconfigs import GetSnakemakeRunDetails, WriteYaml +from ViroConstrictor.runreport import WriteReport +from ViroConstrictor.update import update + + +def get_preset_warning_list( + sample_info_df: pd.DataFrame, +) -> tuple[list[str], list[str]]: + """Takes a dataframe with sample information and returns a tuple of two lists of warnings + + Parameters + ---------- + sample_info_df : pd.DataFrame + pd.DataFrame + + Returns + ------- + A list of warnings. + + """ + preset_fallback_warnings = preset_score_warnings = [] + for s in sample_info_df.itertuples(): + sample, preset, score, input_target = ( + s.SAMPLE, + s.PRESET, + s.PRESET_SCORE, + s.VIRUS, + ) + if score == 0.0: + warn = f"""[red]Sample '[bold underline]{sample}[/bold underline]' was given the following information as an input-target: '[bold underline]{input_target}[/bold underline]'. +This information could however not be used to determine a preset. Because of this, the preset '[bold underline]{preset}[/bold underline]' was used instead.[/red] +[yellow]Please check your input-target for any significant misspellings, or consider using an alias or a different abbreviation for your input-target to check whether this resolves the issue.[/yellow] + +It may be that your input-target does not yet have an associated preset in ViroConstrictor. +If your suspect this to be the case, please open an issue on the ViroConstrictor GitHub page: [magenta underline]https://github.com/RIVM-bioinformatics/ViroConstrictor[/magenta underline]""" + preset_fallback_warnings.append(warn) + continue + if score < 0.8: + warn = f"""[red]Sample '[bold underline]{sample}[/bold underline]' was given the following information as an input-target: '[bold underline]{input_target}[/bold underline]'. +As a result, the preset '{preset}' was chosen. But this was done with less than 80% certainty.[/red] +[yellow]Certainty score: [bold]{score:.0%}[/bold][/yellow] +Please check the input-target and try again if a different preset is required.""" + preset_score_warnings.append(warn) + continue + return preset_fallback_warnings, preset_score_warnings + + +def show_preset_warnings( + warnings: list[str], fallbacks: list[str], disabled: bool +) -> None: + if warnings and not disabled: + for w in warnings: + log.warn(f"{w}") + if fallbacks and not disabled: + for w in fallbacks: + log.warn(f"{w}") + + +def main() -> NoReturn: + """ + ViroConstrictor starting point + --> Fetch and parse arguments + --> check validity + --> Read (or write, if necessary) the user-config files + --> Change working directories and make necessary local files for snakemake + --> Run snakemake with appropriate settings + """ + + parsed_input = CLIparser(input_args=sys.argv[1:]) + + preset_fallback_warnings, preset_score_warnings = get_preset_warning_list( + parsed_input.samples_df + ) + if not parsed_input.flags.skip_updates: + update(sys.argv, parsed_input.user_config) + + snakemake_run_details = GetSnakemakeRunDetails(inputs_obj=parsed_input) + + log.info(f"{'='*20} [bold yellow] Starting Workflow [/bold yellow] {'='*20}") + status: bool = False + + if parsed_input.user_config["COMPUTING"]["compmode"] == "local": + status = snakemake.snakemake( + snakefile=parsed_input.snakefile, + workdir=parsed_input.workdir, + cores=snakemake_run_details.snakemake_run_conf["cores"], + use_conda=snakemake_run_details.snakemake_run_conf["use-conda"], + conda_frontend="mamba", + jobname=snakemake_run_details.snakemake_run_conf["jobname"], + latency_wait=snakemake_run_details.snakemake_run_conf["latency-wait"], + dryrun=snakemake_run_details.snakemake_run_conf["dryrun"], + configfiles=[ + WriteYaml( + snakemake_run_details.snakemake_run_parameters, + f"{parsed_input.workdir}/config/run_params.yaml", + ) + ], + restart_times=3, + keepgoing=True, + quiet=["all"], # type: ignore + log_handler=[ + ViroConstrictor.logging.snakemake_logger(logfile=parsed_input.logfile) + ], + printshellcmds=False, + ) + if parsed_input.user_config["COMPUTING"]["compmode"] == "grid": + status = snakemake.snakemake( + snakefile=parsed_input.snakefile, + workdir=parsed_input.workdir, + cores=snakemake_run_details.snakemake_run_conf["cores"], + nodes=snakemake_run_details.snakemake_run_conf["cores"], + use_conda=snakemake_run_details.snakemake_run_conf["use-conda"], + conda_frontend="mamba", + jobname=snakemake_run_details.snakemake_run_conf["jobname"], + latency_wait=snakemake_run_details.snakemake_run_conf["latency-wait"], + drmaa=snakemake_run_details.snakemake_run_conf["drmaa"], + drmaa_log_dir=snakemake_run_details.snakemake_run_conf["drmaa-log-dir"], + dryrun=snakemake_run_details.snakemake_run_conf["dryrun"], + configfiles=[ + WriteYaml( + snakemake_run_details.snakemake_run_parameters, + f"{parsed_input.workdir}/config/run_params.yaml", + ) + ], + restart_times=3, + keepgoing=True, + quiet=["all"], # type: ignore + log_handler=[ + ViroConstrictor.logging.snakemake_logger(logfile=parsed_input.logfile) + ], + ) + + if snakemake_run_details.snakemake_run_conf["dryrun"] is False and status is True: + snakemake.snakemake( + snakefile=parsed_input.snakefile, + workdir=parsed_input.workdir, + report="results/snakemake_report.html", + configfiles=[ + WriteYaml( + snakemake_run_details.snakemake_run_parameters, + f"{parsed_input.workdir}/config/run_params.yaml", + ) + ], + quiet=["all"], # type: ignore + log_handler=[ + ViroConstrictor.logging.snakemake_logger(logfile=parsed_input.logfile) + ], + ) + + workflow_state: Literal["Failed", "Success"] = ( + "Failed" if status is False else "Success" + ) + + WriteReport( + parsed_input.workdir, + parsed_input.input_path, + parsed_input.exec_start_path, + parsed_input.user_config, + snakemake_run_details.snakemake_run_parameters, + snakemake_run_details.snakemake_run_conf, + workflow_state, + ) + + show_preset_warnings( + preset_score_warnings, + preset_fallback_warnings, + parsed_input.flags.disable_presets, + ) + + if status is False: + exit(1) + exit(0) diff --git a/ViroConstrictor/functions.py b/ViroConstrictor/functions.py index f4a3bb3..5fd05fc 100644 --- a/ViroConstrictor/functions.py +++ b/ViroConstrictor/functions.py @@ -6,11 +6,11 @@ import glob import os -import re as _re +import re import readline import shutil import textwrap -from argparse import SUPPRESS, ArgumentParser, HelpFormatter +from argparse import SUPPRESS, Action, ArgumentParser, HelpFormatter from typing import IO, Optional import rich @@ -36,23 +36,24 @@ class FlexibleArgFormatter(HelpFormatter): * Changes the behaviour of the metavar to be only printed once per long AND shorthand argument, instead of printing the metavar multiple times for every possible flag. """ - def __init__(self, prog): + def __init__(self, prog: str) -> None: term_width = shutil.get_terminal_size().columns max_help_position = min(max(24, term_width // 2), 80) super().__init__(prog, max_help_position=max_help_position) - def _get_help_string(self, action): + def _get_help_string(self, action: Action) -> Optional[str]: """ """ - help_text = action.help + help_text: Optional[str] = action.help if ( - action.default != SUPPRESS + help_text is not None + and action.default != SUPPRESS and "default" not in help_text.lower() and action.default is not None ): help_text += f"\n ([underline]default: {str(action.default)}[/underline])" return help_text - def _format_action_invocation(self, action): + def _format_action_invocation(self, action: Action) -> str: """ """ if not action.option_strings or action.nargs == 0: return super()._format_action_invocation(action) @@ -60,35 +61,35 @@ def _format_action_invocation(self, action): args_string = self._format_args(action, default) return ", ".join(action.option_strings) + " " + args_string - def _split_lines(self, text, width): + def _split_lines(self, text: str, width: int) -> list[str]: return self._para_reformat(text, width) - def _fill_text(self, text, width, indent): + def _fill_text(self, text: str, width: int, indent: str) -> str: lines = self._para_reformat(text, width) return "\n".join(lines) - def _indents(self, line): + def _indents(self, line: str) -> tuple[int, int]: """Return line indent level and "sub_indent" for bullet list text.""" - indent = len(_re.match(r"( *)", line).group(1)) - if list_match := _re.match(r"( *)(([*\-+>]+|\w+\)|\w+\.) +)", line): - sub_indent = indent + len(list_match.group(2)) + matches = re.match(r"( *)", line) + indent = len(matches[1]) if matches else 0 + if list_match := re.match(r"( *)(([*\-+>]+|\w+\)|\w+\.) +)", line): + sub_indent = indent + len(list_match[2]) else: sub_indent = indent - return (indent, sub_indent) - def _split_paragraphs(self, text): + def _split_paragraphs(self, text: str) -> list[str]: """Split text in to paragraphs of like-indented lines.""" text = textwrap.dedent(text).strip() - text = _re.sub("\n\n[\n]+", "\n\n", text) + text = re.sub("\n\n[\n]+", "\n\n", text) - last_sub_indent = None - paragraphs = [] + last_sub_indent: Optional[int] = None + paragraphs: list[str] = [] for line in text.splitlines(): (indent, sub_indent) = self._indents(line) - is_text = _re.search(r"[^\s]", line) is not None + is_text = re.search(r"[^\s]", line) is not None if is_text and indent == sub_indent == last_sub_indent: paragraphs[-1] += f" {line}" @@ -98,16 +99,15 @@ def _split_paragraphs(self, text): last_sub_indent = sub_indent if is_text else None return paragraphs - def _para_reformat(self, text, width): + def _para_reformat(self, text: str, width: int) -> list[str]: """Reformat text, by paragraph.""" - paragraphs = [] + paragraphs: list[str] = [] for paragraph in self._split_paragraphs(text): - (indent, sub_indent) = self._indents(paragraph) paragraph = self._whitespace_matcher.sub(" ", paragraph).strip() - new_paragraphs = textwrap.wrap( + new_paragraphs: list[str] = textwrap.wrap( text=paragraph, width=width, initial_indent=" " * indent, @@ -130,23 +130,6 @@ def _print_message(self, message: str, file: Optional[IO[str]] = None) -> None: return rich.print(message) -class color: - """ - define basic colors to use in the terminal - """ - - PURPLE = "\033[95m" - CYAN = "\033[96m" - DARKCYAN = "\033[36m" - BLUE = "\033[94m" - GREEN = "\033[92m" - YELLOW = "\033[93m" - RED = "\033[91m" - BOLD = "\033[1m" - UNDERLINE = "\033[4m" - END = "\033[0m" - - # tabCompleter Class taken from https://gist.github.com/iamatypeofwalrus/5637895 ## this was intended for the raw_input() function of python. But that one is deprecated now ## However, this also seems to work for the new input() functions @@ -159,7 +142,7 @@ class tabCompleter: http://stackoverflow.com/questions/5637124/tab-completion-in-pythons-raw-input """ - def pathCompleter(self, text, state): + def pathCompleter(self, text: str, state: int) -> str: """ This is the tab completer for systems paths. Only tested on *nix systems @@ -174,9 +157,9 @@ def pathCompleter(self, text, state): if os.path.isdir(text): text += "/" - return [x for x in glob.glob(text + "*")][state] + return list(glob.glob(f"{text}*"))[state] - def createListCompleter(self, ll): + def createListCompleter(self, ll: list[str]) -> None: """ This is a closure that creates a method that autocompletes from the given list. @@ -186,12 +169,13 @@ def createListCompleter(self, ll): from. """ - def listCompleter(text, state): - line = readline.get_line_buffer() + def listCompleter(text: str, state: int) -> Optional[str]: + line: str = readline.get_line_buffer() - if not line: - return [c + " " for c in ll][state] - - return [c + " " for c in ll if c.startswith(line)][state] + return ( + [c for c in ll if c.startswith(line)][state] + if line + else [f"{c} " for c in ll][state] + ) self.listCompleter = listCompleter diff --git a/ViroConstrictor/logging.py b/ViroConstrictor/logging.py new file mode 100644 index 0000000..8731574 --- /dev/null +++ b/ViroConstrictor/logging.py @@ -0,0 +1,199 @@ +import datetime +import logging +import os +import pathlib +import re +from typing import Any + +from rich.color import ANSI_COLOR_NAMES +from rich.default_styles import DEFAULT_STYLES +from rich.highlighter import NullHighlighter +from rich.logging import RichHandler + +richstyles = list(set(list(DEFAULT_STYLES.keys()) + list(ANSI_COLOR_NAMES.keys()))) + + +class StripBracketsFilter(logging.Filter): + def filter(self, record): + pattern = r"\[/?\b(%s)\b[^\]]*\]" % "|".join(richstyles) + record.msg = re.sub(pattern, "", record.msg) + record.msg = record.msg.replace("\n", "\n\t\t\t\t\t\t\t") + return True + + +log = logging.getLogger("ViroConstrictor") +log.setLevel("INFO") + + +def setup_logger(workdir: str) -> str: + logging.getLogger("asyncio").setLevel(logging.CRITICAL) + logging.getLogger("snakemake").setLevel(logging.CRITICAL) + logging.getLogger("smart_open").setLevel(logging.CRITICAL) + logging.getLogger("urllib3").setLevel(logging.CRITICAL) + logging.getLogger("fpdf").setLevel(logging.CRITICAL) + + if not os.path.exists(workdir): + os.makedirs(workdir) + logfile = f"{os.path.abspath(workdir)}/ViroConstrictor_{datetime.datetime.now().isoformat()}.log" + + shell_handler = RichHandler( + show_path=False, + omit_repeated_times=False, + markup=True, + highlighter=NullHighlighter(), + rich_tracebacks=True, + ) + file_handler = logging.FileHandler(logfile) + + shell_handler.setLevel("INFO") + file_handler.setLevel("INFO") + + format_shell = "%(message)s" + format_file = "%(asctime)s\t%(levelname)s\t%(message)s" + + shell_formatter = logging.Formatter(format_shell, "[%d/%m/%y %H:%M:%S]") + file_formatter = logging.Formatter(format_file, "[%d/%m/%y %H:%M:%S]") + + shell_handler.setFormatter(shell_formatter) + file_handler.addFilter(StripBracketsFilter()) + file_handler.setFormatter(file_formatter) + + log.addHandler(shell_handler) + log.addHandler(file_handler) + + return logfile + + +def BaseLogMessage(msg: dict[str, Any]) -> None: + if logmessage := msg.get("msg"): + log.info(f"{logmessage}") + + +def CondaEnvInstallerPreamble(msg: dict[str, Any]) -> None: + if logmessage := msg.get("msg"): + log.info(f"Creating conda environment: [yellow]{logmessage}[/yellow]") + + +def CondaEnvInstallSuccess(msg: dict[str, Any]) -> None: + log.info("[green]Conda environment created![/green]") + + +def BaseSnakemakeAbortMessage(msg: dict[str, Any]) -> None: + if logmessage := msg.get("msg"): + log.error(f"[red]{logmessage}[/red]") + + +def ColorizeLogMessagePath(msg: dict[str, Any]) -> None: + if logmessage := msg.get("msg"): + msgparts = logmessage.split(" ") + for word in msgparts: + word = word.replace("...", "") if "..." in word else word.rstrip(".") + if pathlib.Path(word).exists(): + logmessage = logmessage.replace(word, f"[magenta]{word}[/magenta]") + log.info(f"{logmessage}") + + +def LogFileOverride(msg: dict[str, Any], logfile) -> None: + if msg.get("msg") is not None: + log.info(f"Complete log: [magenta]{logfile}[/magenta]") + + +def HandleJobInfoMessage(msg: dict[str, Any]) -> None: + if not (processname := msg.get("name")): + return + wildcards = msg.get("wildcards") + sample = wildcards.get("sample") if wildcards else None + input_target = wildcards.get("Virus") if wildcards else None + refid = wildcards.get("RefID") if wildcards else None + logfile = msg.get("log") + logfile = pathlib.Path.absolute(pathlib.Path(logfile[0])) if logfile else None + if msg.get("local"): + log.info( + f"Executing localjob [green underline]{processname}[/green underline] for sample [blue]{sample}[/blue] with target [blue]{input_target}[/blue] and reference-id [blue]{refid}[/blue]\nJob is using jobID [cyan]{msg.get('jobid')}[/cyan], logging output will be written to [magenta]{logfile}[/magenta]" + ) + else: + log.info( + f"Executing job [green underline]{processname}[/green underline] for sample [blue]{sample}[/blue] with target [blue]{input_target}[/blue] and reference-id [blue]{refid}[/blue]\nJob is using jobID [cyan]{msg.get('jobid')}[/cyan], logging output will be written to [magenta]{logfile}[/magenta]" + ) + + +def HandleJobErrorMessage(msg: dict[str, Any]) -> None: + if not (processname := msg.get("name")): + return + wildcards = msg.get("wildcards") + sample = wildcards.get("sample") if wildcards else None + input_target = wildcards.get("Virus") if wildcards else None + refid = wildcards.get("RefID") if wildcards else None + logfile = msg.get("log") + logfile = pathlib.Path.absolute(pathlib.Path(logfile[0])) if logfile else None + shellcmd = str(msg.get("shellcmd")).strip() + if outputfiles := msg.get("output"): + outputfiles_list = " ".join(list(outputfiles)) + else: + outputfiles_list = "None" + log.error( + f"Job [red underline]{processname}[/red underline] for sample [blue]{sample}[/blue] with target [blue]{input_target}[/blue] and reference-id [blue]{refid}[/blue] failed!\nJob is using jobID [cyan]{msg.get('jobid')}[/cyan], logging output will be written to [magenta]{logfile}[/magenta]\nThe expected output file(s) are as follows:\n[red]{outputfiles_list}[/red]\nThe following shell command was issued:\n[red]{shellcmd}[/red]" + ) + + +def print_jobstatistics_logmessage(msg: dict) -> None: + if logmessage := msg.get("msg"): + logmessage = logmessage.split("\n", 1)[1] + log.info(f"Job statistics:\n[yellow]{logmessage}[/yellow]") + + +logmessage_strings_info: dict[str, Any] = { + "Activating conda environment": ColorizeLogMessagePath, + "Building DAG of jobs": BaseLogMessage, + "Creating conda environment": ColorizeLogMessagePath, + "Removing incomplete Conda environment": ColorizeLogMessagePath, + "Downloading and installing remote packages": CondaEnvInstallerPreamble, + "Environment for": CondaEnvInstallSuccess, + "Cancelling snakemake on user request": BaseSnakemakeAbortMessage, + "Select jobs to execute...": BaseLogMessage, + "Creating report...": BaseLogMessage, + "Downloading resources and rendering HTML.": BaseLogMessage, + "Report created: results/snakemake_report.html.": ColorizeLogMessagePath, + "Terminating processes on user request, this might take some time.": BaseSnakemakeAbortMessage, + "Removing temporary output": ColorizeLogMessagePath, + "will be created.": ColorizeLogMessagePath, + "Nothing to be done (all requested files are present and up to date).": BaseLogMessage, +} +logmessage_strings_warning: list[str] = [ + "Your conda installation is not configured to use strict channel priorities." +] + + +def snakemake_logger(logfile: str) -> object: + def log_handler(msg: dict) -> None: + loglevel = msg.get("level") + logmessage = msg.get("msg") + + if loglevel == "dag_debug": + return None + if loglevel == "debug": + return None + if loglevel == "shellcmd": + return None + + if logmessage is not None and any( + x in logmessage for x in list(logmessage_strings_info.keys()) + ): + for key in logmessage_strings_info.keys(): + if key in logmessage: + logmessage_strings_info[key](msg) + + elif logmessage is not None and "Complete log:" in logmessage: + LogFileOverride(msg, logfile) + elif loglevel == "run_info": + print_jobstatistics_logmessage(msg) + elif loglevel == "job_info": + HandleJobInfoMessage(msg) + elif loglevel == "job_error": + HandleJobErrorMessage(msg) + elif loglevel == "warning": + log.warning(f"{logmessage}") + elif loglevel == "error": + log.error(f"{logmessage}") + + return log_handler diff --git a/ViroConstrictor/parser.py b/ViroConstrictor/parser.py index 9835800..084518a 100644 --- a/ViroConstrictor/parser.py +++ b/ViroConstrictor/parser.py @@ -4,118 +4,645 @@ import pathlib import re import sys +from typing import Any, Hashable import numpy as np import pandas as pd from ViroConstrictor import __prog__, __version__ -from ViroConstrictor.functions import FlexibleArgFormatter, RichParser, color +from ViroConstrictor.functions import FlexibleArgFormatter, RichParser +from ViroConstrictor.logging import log, setup_logger from ViroConstrictor.samplesheet import GetSamples +from ViroConstrictor.userprofile import ReadConfig +from ViroConstrictor.validatefasta import CheckReferenceFile from ViroConstrictor.workflow.presets import match_preset_name -def is_excel_file(ext): - """If the extension is in the list of Excel extensions, return True, otherwise return False +class CLIparser: + def __init__(self, input_args: list[str]) -> None: + self.flags: argparse.Namespace = self._get_args(input_args) + self.logfile = setup_logger(self.flags.output) + log.info(f"ViroConstrictor version: [blue]{__version__}[/blue]") + self.cli_errors = self._validate_cli_args() + if self.cli_errors: + for err in self.cli_errors: + log.error(err) + sys.exit(1) + self.user_config = ReadConfig( + pathlib.Path("~/.ViroConstrictor_defaultprofile.ini").expanduser() + ) + self.flags.presets = self.flags.disable_presets is False + self.samples_df = pd.DataFrame() + self.samples_dict: dict[Hashable, Any] = {} + if self.flags.samplesheet is not None: + self._print_missing_asset_warning(self.flags, True) + self.samples_dict = self._make_samples_dict( + self._check_sample_sheet(self.flags.samplesheet), + self.flags, + GetSamples(self.flags.input, self.flags.platform), + ) + self.samples_df = pd.DataFrame.from_dict(self.samples_dict, orient="index") + else: + self._print_missing_asset_warning(self.flags, False) + self.samples_dict = self._make_samples_dict( + None, self.flags, GetSamples(self.flags.input, self.flags.platform) + ) + if self.samples_df.empty: + self.samples_df = pd.DataFrame.from_dict(self.samples_dict, orient="index") + self.samples_df = self.samples_df.reset_index(drop=False).rename( + columns={"index": "SAMPLE"} + ) + ( + self.input_path, + self.workdir, + self.exec_start_path, + self.snakefile, + ) = self._get_paths_for_workflow(self.flags) + if not self.samples_dict: + sys.exit(1) + log.info("[green]Successfully parsed all command line arguments[/green]") + self._check_sample_properties( + self.samples_dict + ) # raises errors if stuff is not right + + def _validate_cli_args(self) -> list[str] | None: + arg_errors = [] + if dir_path(self.flags.input) is False: + arg_errors.append( + f"'[magenta]{self.flags.input}[/magenta]' is not a directory." + ) + if self.flags.samplesheet is not None: + allowed_extensions = [".xls", ".xlsx", ".csv", ".tsv"] + if file_exists(self.flags.samplesheet) is False: + arg_errors.append( + f"'[magenta]{self.flags.samplesheet}[/magenta]' is not an existing file." + ) + if ( + check_file_extension( + allowed_extensions=allowed_extensions, fname=self.flags.samplesheet + ) + is False + ): + arg_errors.append( + f"'[magenta]{self.flags.samplesheet}[/magenta]' does not have a valid file extension.\nAllowed file extenstions for the samplesheet: {' '.join([f'[blue]{x}[/blue]' for x in allowed_extensions])}" + ) + if self.flags.reference is not None: + allowed_extensions = [".fasta", ".fa"] + if self.flags.reference == "NONE": + arg_errors.append( + f"'[magenta]{self.flags.reference}[/magenta]' cannot be given for the reference file." + ) + if file_exists(self.flags.reference) is False: + arg_errors.append( + f"'[magenta]{self.flags.reference}[/magenta]' is not an existing file." + ) + if ( + check_file_extension( + allowed_extensions=allowed_extensions, fname=self.flags.reference + ) + is False + ): + arg_errors.append( + f"'[magenta]{self.flags.reference}[/magenta]' does not have a valid file extension.\nAllowed file extenstions for the reference: {' '.join([f'[blue]{x}[/blue]' for x in allowed_extensions])}" + ) + if self.flags.primers is not None: + allowed_extensions = [".fasta", ".fa", ".bed"] + if file_exists(self.flags.primers) is False: + arg_errors.append( + f"'[magenta]{self.flags.primers}[/magenta]' is not an existing file." + ) + if ( + check_file_extension( + allowed_extensions=allowed_extensions, fname=self.flags.primers + ) + is False + ): + arg_errors.append( + f"'[magenta]{self.flags.primers}[/magenta]' does not have a valid file extension.\nAllowed file extenstions for the primers: {' '.join([f'[blue]{x}[/blue]' for x in allowed_extensions])}" + ) + if self.flags.features is not None: + allowed_extensions = [".gff", ".gff3"] + if file_exists(self.flags.features) is False: + arg_errors.append( + f"'[magenta]{self.flags.features}[/magenta]' is not an existing file." + ) + if ( + check_file_extension( + allowed_extensions=allowed_extensions, fname=self.flags.features + ) + is False + ): + arg_errors.append( + f"'[magenta]{self.flags.features}[/magenta]' does not have a valid file extension.\nAllowed file extenstions for the features: {' '.join([f'[blue]{x}[/blue]' for x in allowed_extensions])}" + ) + return arg_errors + + def _check_sample_properties(self, sampleinfo: dict[Hashable, Any]) -> None: + """Check that the reference fasta file exists and is valid + + Parameters + ---------- + sampleinfo : dict[str, dict[str, str]] + A dictionary of dictionaries. The outer dictionary is keyed by sample name, and the inner + dictionary is keyed by the parameter name. + + """ + reference_files: set = set() + for item in sampleinfo: + if sample := sampleinfo.get(item): + if reffile := sample.get("REFERENCE"): + if not os.path.isfile(reffile): + log.error( + f"[bold red]The given reference fasta file for sample '{sample}' does not exist. Please check the reference fasta and try again. Exiting...[/bold red]" + ) + sys.exit(1) + reference_files.add(reffile) + reference_files = set(reference_files) + for f in reference_files: + CheckReferenceFile(f) + + def _get_args(self, givenargs: list[str]) -> argparse.Namespace: + """ + Parse the commandline args + """ + parser: argparse.ArgumentParser = RichParser( + prog=f"[bold]{__prog__}[/bold]", + usage="%(prog)s \[required arguments] \[optional arguments]", + description="%(prog)s: a pipeline for analysing Viral targeted (amplicon) sequencing data in order to generate a biologically valid consensus sequence.", + formatter_class=FlexibleArgFormatter, + add_help=False, + ) + required_args = parser.add_argument_group("Required arguments") + optional_args = parser.add_argument_group("Optional arguments") + + required_args.add_argument( + "--input", + "-i", + type=str, + metavar="DIR", + help="The input directory with raw fastq(.gz) files", + required=True, + ) + + required_args.add_argument( + "--output", + "-o", + metavar="DIR", + type=str, + default=os.getcwd(), # Default output dir is the current working dir + help="Output directory", + required=True, + ) + + optional_args.add_argument( + "--samplesheet", + "-samples", + metavar="File", + type=str, + # type=lambda s: check_file_extension([".xls", ".xlsx", ".csv", ".tsv"], s), + help="Sample sheet information file", + ) + + optional_args.add_argument( + "--reference", + "-ref", + type=str, + # type=lambda s: check_file_extension([".fasta", ".fa"], s), + metavar="File", + help="Input Reference sequence genome in FASTA format", + ) + + optional_args.add_argument( + "--primers", + "-pr", + type=str, + # type=lambda s: check_file_extension([".fasta", ".fa", ".bed"], s), + metavar="File", + help="Used primer sequences in FASTA or BED format. If no primers should be removed, supply the value NONE to this flag.", + ) + + required_args.add_argument( + "--platform", + default="nanopore", + const="nanopore", + nargs="?", + choices=("nanopore", "illumina", "iontorrent"), + help="Define the sequencing platform that was used to generate the dataset, either being 'nanopore', 'illumina' or 'iontorrent', see the docs for more info", + required=True, + metavar="'nanopore'/'illumina'/'iontorrent'", + ) + + required_args.add_argument( + "--amplicon-type", + "-at", + default="end-to-end", + const="end-to-end", + nargs="?", + choices=("end-to-end", "end-to-mid", "fragmented"), + help="Define the amplicon-type, either being 'end-to-end', 'end-to-mid', or 'fragmented'. See the docs for more info", + required=True, + metavar="'end-to-end'/'end-to-mid'/'fragmented'", + ) + + optional_args.add_argument( + "--target", + "--preset", + metavar="Str", + help="Define the specific target for the pipeline, if the target matches a certain preset then pre-defined analysis settings will be used, see the docs for more info", + ) + + optional_args.add_argument( + "--match-ref", + "-mr", + default=False, + action="store_true", + help="Match your data to the best reference available in the given reference fasta file.", + ) + + optional_args.add_argument( + "--min-coverage", + "-mc", + default=30, + type=int, + metavar="N", + help="Minimum coverage for the consensus sequence.", + ) + + optional_args.add_argument( + "--features", + "-gff", + type=str, + # type=lambda s: check_file_extension([".gff", ".gff3"], s), + metavar="File", + help="GFF file containing the Open Reading Frame (ORF) information of the reference. Supplying NONE will let ViroConstrictor use prodigal to determine coding regions", + ) + + optional_args.add_argument( + "--primer-mismatch-rate", + "-pmr", + type=float, + default=0.1, + metavar="N", + help="Maximum number of mismatches allowed in the primer sequences during primer coordinate search. Use 0 for exact primer matches\nDefault is 3.", + ) + + optional_args.add_argument( + "--disable-presets", + "-dp", + action="store_true", + default=False, + help="Disable the use of presets, this will cause all analysis settings to be set to default values", + ) + + optional_args.add_argument( + "--threads", + "-t", + default=min(multiprocessing.cpu_count(), 128), + metavar="N", + type=int, + help=f"Number of local threads that are available to use.\nDefault is the number of available threads in your system ({min(multiprocessing.cpu_count(), 128)})", + ) + + optional_args.add_argument( + "--version", + "-v", + version=__version__, + action="version", + help="Show the ViroConstrictor version and exit", + ) + + optional_args.add_argument( + "--help", + "-h", + action="help", + default=argparse.SUPPRESS, + help="Show this help message and exit", + ) + + optional_args.add_argument( + "--dryrun", + action="store_true", + help="Run the workflow without actually doing anything", + ) + + optional_args.add_argument( + "--skip-updates", + action="store_true", + help="Skip the update check", + ) + + if not givenargs: + log.error( + f"{parser.prog} was called but no arguments were given, please try again\nUse '[cyan]{parser.prog} -h[/cyan]' to see the help document" + ) + sys.exit(1) + else: + flags = parser.parse_args(givenargs) + + return flags + + def _check_sample_sheet(self, file: str) -> pd.DataFrame: + """Checks the sample sheet for the required columns. + + Parameters + ---------- + df + the sample sheet + + Returns + ------- + the result of the check_samplesheet_rows function. (pandas dataframe) + + """ + df = open_sample_sheet(file) + if not df.empty: + df.columns = df.columns.str.upper() + req_cols = check_samplesheet_columns(df) + if req_cols is False: + sys.exit(1) + if df.get("PRESET") is None: + df[["PRESET", "PRESET_SCORE"]] = df.apply( + lambda x: pd.Series( + match_preset_name(x["VIRUS"], use_presets=self.flags.presets) + ), + axis=1, + ) + return check_samplesheet_rows(df) + return pd.DataFrame() + + def _make_samples_dict( + self, + df: pd.DataFrame | None, + args: argparse.Namespace, + filedict: dict[str, str] | dict[str, dict[str, str]], + ) -> dict[Hashable, Any]: + """It takes a samplesheet (dataframe) and a dictionary fastq files, and returns a dictionary of sample + information + + Parameters + ---------- + df + the samplesheet dataframe + args + The arguments given to the script. + filedict + a dictionary of the files in the input directory + + Returns + ------- + A dictionary of the samplesheet and input directory. + + """ + if not CheckInputFiles(args.input): + log.error( + f"'[magenta]{args.input}[/magenta]' does not contain any valid FastQ files. Exiting..." + ) + sys.exit(1) + log.info( + f"[green]Valid FastQ files were found in the input directory.[/green] ('[magenta]{args.input}[/magenta]')" + ) + indirFrame: pd.DataFrame = sampledir_to_df(filedict, args.platform) + if df is not None: + df.set_index("SAMPLE", inplace=True) + df = pd.merge(df, indirFrame, left_index=True, right_index=True) + if df.empty: + log.error( + "[bold red]The files given in the samplesheet do not match the files given in the input-directory. Please check your samplesheet or input directory and try again.[/bold red]" + ) + sys.exit(1) + if len(indirFrame) > len(df): + log.error( + "[bold red]Not all samples in the input directory are present in the given samplesheet. Please check your samplesheet or input directory and try again.[/bold red]" + ) + sys.exit(1) + if len(indirFrame) < len(df): + log.error( + "[bold red]Either not all samples in the samplesheet are present in the given input directory, or there are duplicate samples in the samplesheet. Please check your samplesheet or input directory and try again.[/bold red]" + ) + sys.exit(1) + if df.get("PRIMER-MISMATCH-RATE") is None: + df["PRIMER-MISMATCH-RATE"] = args.primer_mismatch_rate + if df.get("MIN-COVERAGE") is None: + df["MIN-COVERAGE"] = args.min_coverage + if df.get("PRIMERS") is None: + df["PRIMERS"] = args.primers + if args.primers is None: + log.error( + "[bold red]No primer file specified in samplesheet or in command line options. Consider adding the -pr flag.[/bold red]" + ) + sys.exit(1) + if df.get("FEATURES") is None: + df["FEATURES"] = args.features + if args.features is None: + log.error( + "[bold red]No features file specified in samplesheet or in command line options. Consider adding the -gff flag.[/bold red]" + ) + sys.exit(1) + if df.get("PRESET") is None: + df[["PRESET", "PRESET_SCORE"]] = df.apply( + lambda x: pd.Series( + match_preset_name(x["VIRUS"], use_presets=args.presets) + ), + axis=1, + ) + df = pd.DataFrame.replace(df, np.nan, None) + return df.to_dict(orient="index") + return args_to_df(args, indirFrame).to_dict(orient="index") + + def _print_missing_asset_warning( + self, args: argparse.Namespace, sheet_present: bool + ) -> None: + """If a sample sheet is present, print a warning that conflicting run-wide settings given through the commandline will be ignored. + If no sample sheet is present, check if all required run-wide settings are given. If not, exit with a corresponding error message + + Parameters + ---------- + args : argparse.Namespace + argparse.Namespace + sheet_present : bool + boolean, whether a sample sheet was provided + + """ + if sheet_present: + if args.primers is not None: + log.warning( + "[yellow]Both a sample sheet and run-wide primer file was given, the primer file given through the commandline will be ignored[/yellow]" + ) + if args.reference is not None: + log.warning( + "[yellow]Both a sample sheet and run-wide reference fasta was given, the reference fasta given through the commandline will be ignored[/yellow]" + ) + if args.features is not None: + log.warn( + "[yellow]Both a sample sheet and run-wide GFF file was given, the GFF file given through the commandline will be ignored[/yellow]" + ) + if not sheet_present and any( + map( + lambda f: f is None, + {args.primers, args.reference, args.features, args.target}, + ) + ): + log.error( + f"[bold red]Run-wide analysis settings were not provided and no samplesheet was given either with per-sample run information.\nPlease either provide all required information ([underline]reference[/underline], [underline]primers[/underline], [underline]genomic features[/underline] and [underline]viral-target[/underline]) for a run-wide analysis or provide a samplesheet with per-sample run information[/bold red]" + ) + sys.exit(1) + + def _get_paths_for_workflow( + self, flags: argparse.Namespace + ) -> tuple[str, str, str, str]: + """Takes the input and output paths from the command line, and then creates the working directory if + it doesn't exist. It then changes the current working directory to the working directory + + Parameters + ---------- + flags : argparse.Namespace + argparse.Namespace: The flags that were passed to the script. + + Returns + ------- + A tuple of strings. + + """ + input_path: str = os.path.abspath(flags.input) + working_directory: str = os.path.abspath(flags.output) + exec_start_path: str = os.path.abspath(os.getcwd()) + snakefile: str = os.path.join( + os.path.abspath(os.path.dirname(__file__)), "workflow", "workflow.smk" + ) + + if not os.path.exists(working_directory): + os.makedirs(working_directory) + if os.getcwd() != working_directory: + os.chdir(working_directory) + + return input_path, working_directory, exec_start_path, snakefile + + +def file_exists(path: str) -> bool: + """Check if a file exists. + + Parameters + ---------- + path : str + The path to the file. + + Returns + ------- + bool + A boolean value. + + """ + return True if path == "NONE" else os.path.isfile(path) + + +def is_excel_file(ext: str) -> bool: + """If the extension is in the list of Excel extensions, return True, otherwise return False. Only checks the file extension, not file integrity. Parameters ---------- - ext + ext : str The extension of the file. Returns ------- + bool A boolean value. """ - return ext in [".xls", ".xlsx"] + return ext in {".xls", ".xlsx"} -def is_csv_file(ext): +def is_csv_file(ext: str) -> bool: """Return True if the file extension is .csv, otherwise return False. Only checks the file extension, not file integrity Parameters ---------- - ext + ext : str The extension of the file. Returns ------- + bool A boolean value. """ - return ext in [".csv"] + return ext in {".csv"} -def is_tsv_file(ext): +def is_tsv_file(ext: str) -> bool: """If the extension is in the list of extensions, return True, otherwise return False. Only checks the file extension, not file integrity. Parameters ---------- - ext + ext : str the file extension Returns ------- - A list of all the files in the directory that end with the extension .tsv - + bool + True if the extension is in the list of extensions, False otherwise """ - return ext in [".tsv"] + return ext in {".tsv"} -def open_sample_sheet(file): +def open_sample_sheet(file: str) -> pd.DataFrame: """Given a file, return a pandas dataframe created with correct open function Parameters ---------- - file + file: Path The file to open. Returns ------- + pd.DataFrame A pandas dataframe. """ # check if file is not empty if os.stat(file).st_size == 0: - print(f"{color.RED + color.BOLD}Samplesheet file is empty.{color.END}") + log.error("[red]Samplesheet file is empty.[/red]") sys.exit(1) file_extension = "".join(pathlib.Path(file).suffixes) - if is_excel_file(file_extension): - return pd.read_excel(file) - if is_csv_file(file_extension): - return pd.read_csv(file) - if is_tsv_file(file_extension): - return pd.read_csv(file, sep="\t") - raise TypeError(f"{file} is not a valid samplesheet file type.") - - -def required_cols(cols): + try: + if is_excel_file(file_extension): + return pd.read_excel(file) + if is_csv_file(file_extension): + return pd.read_csv(file) + if is_tsv_file(file_extension): + return pd.read_csv(file, sep="\t") + except Exception: + log.exception(f"{file} is not a valid samplesheet file type.") + return pd.DataFrame() + return pd.DataFrame() + + +def required_cols(cols: list[str]) -> bool: """The function required_cols takes a list of column names and checks to see if the required columns are present Parameters ---------- - cols + cols : list a list of column names Returns ------- + bool A boolean value. """ cols = [c.upper() for c in cols] - if any(i not in cols for i in ["SAMPLE", "VIRUS", "REFERENCE"]): - return False - return True + return all(i in cols for i in ["SAMPLE", "VIRUS", "REFERENCE"]) -def check_samplesheet_columns(df): +def check_samplesheet_columns(df: pd.DataFrame) -> bool: """Wrapper-function to check whether the samplesheet file has all the required columns or not Parameters @@ -125,19 +652,17 @@ def check_samplesheet_columns(df): Returns ------- + bool A boolean value. """ - if not required_cols(df.columns): - print( - f"{color.RED + color.BOLD}Missing required columns in samplesheet file.{color.END}", - file=sys.stderr, - ) + if not required_cols(df.columns.tolist()): + log.error("[bold red]Missing required columns in samplesheet file.[/bold red]") return False return True -def check_samplesheet_rows(df): +def check_samplesheet_rows(df: pd.DataFrame) -> pd.DataFrame: """Checks whether the row-based contents of the samplesheet dataframe are valid. Parameters @@ -200,26 +725,35 @@ def check_samplesheet_rows(df): "disallowed_characters": None, "path": False, }, + "PRESET": { + "dtype": str, + "required": True, + "disallowed_characters": None, + "path": False, + }, + "PRESET_SCORE": { + "dtype": float, + "required": True, + "disallowed_characters": None, + "path": False, + }, } - for (colName, colValue) in df.items(): + for colName, colValue in df.items(): if colName not in formats: - print( - f"{color.RED + color.BOLD}\tUnknown column '{colName}' in samplesheet.{color.END}\n\tPlease check the column-headers in your samplesheet file and try again.\n\tAllowed column-headers are as follows: {' | '.join(list(formats))}", - file=sys.stderr, + log.error( + f"[bold red]Unknown column '{colName}' in samplesheet.[/bold red]\n[yellow]Please check the column-headers in your samplesheet file and try again.\nAllowed column-headers are as follows: {' | '.join(list(formats))}[/yellow]" ) sys.exit(1) if formats[colName]["required"] is True: if any(colValue.isnull()): - print( - f"{color.RED + color.BOLD}\tNot all required information is given in column {color.UNDERLINE + colName}{color.END}\n\tPlease check your samplesheet file and try again.", - file=sys.stderr, + log.error( + f"[bold red]Not all required information is given in column [underline]{colName}[/underline][/bold red]\n[yellow]Please check your samplesheet file and try again.[/yellow]" ) sys.exit(1) for val in colValue: if not isinstance(val, formats[colName]["dtype"]): - print( - f"{color.RED + color.BOLD}\t{colName} column contains invalid data type.{color.END}\n\tPlease check your samplesheet file and try again.", - file=sys.stderr, + log.error( + f"[bold red]{colName} column contains invalid data type.[/bold red]\n[yellow]Please check your samplesheet file and try again.[/yellow]" ) sys.exit(1) if ( @@ -228,51 +762,28 @@ def check_samplesheet_rows(df): ): chars = re.compile(formats[colName]["disallowed_characters"]) if chars.search(val): - print( - f"{color.RED + color.BOLD}\t{colName} column contains one or more invalid characters.{color.END}\n\tPlease check your samplesheet file and try again.", - file=sys.stderr, + log.error( + f"[bold red]{colName} column contains one or more invalid characters.[/bold red]\n[yellow]Please check your samplesheet file and try again.[/yellow]" ) sys.exit(1) if formats[colName]["path"] is True and not os.path.isfile(val): - print( - f"{color.RED + color.BOLD}\t{colName} column contains a path which doesn't point to a file: '{val}'.{color.END}\n\tPlease check your samplesheet file and try again.", - file=sys.stderr, + log.error( + f"[bold red]{colName} column contains a path which doesn't point to a file: '{val}'.[/bold red]\n[yellow]Please check your samplesheet file and try again.[/yellow]" ) sys.exit(1) return df -def check_sample_sheet(file): - """Wrapper function that takes a samplesheet and triggers the checks for required columns and required (row-based) values. - - Parameters - ---------- - file - the path to the sample sheet - - Returns - ------- - the result of the check_samplesheet_rows function. (pandas dataframe) - - """ - df = open_sample_sheet(file) - df.columns = df.columns.str.upper() - req_cols = check_samplesheet_columns(df) - if req_cols is False: - sys.exit(1) - return check_samplesheet_rows(df) - - -def check_file_extension(allowed_extensions, fname): +def check_file_extension(allowed_extensions: list[str], fname: str) -> bool: """If the input file name is "NONE", return it; otherwise, check that the file exists and has a valid extension, and return the absolute path to the file Parameters ---------- - allowed_extensions - a list of file extensions that are allowed fname The name of the file to be checked. + allowed_extensions + a list of file extensions that are allowed Returns ------- @@ -280,237 +791,60 @@ def check_file_extension(allowed_extensions, fname): """ if fname == "NONE": - return fname - if os.path.isfile(fname): - ext = "".join(pathlib.Path(fname).suffixes) - if not any(ext.endswith(c) for c in allowed_extensions): - raise argparse.ArgumentTypeError( - f"Input file doesn't end with one of {allowed_extensions}" - ) - return os.path.abspath(fname) - print(f'"{fname}" is not a file. Exiting...') + return True + ext = "".join(pathlib.Path(fname).suffixes) + if not any(ext.endswith(c) for c in allowed_extensions): + return False + return True + log.error(f"'[magenta]{fname}[/magenta]' is not a file. Exiting...") sys.exit(-1) -def dir_path(arginput): +def dir_path(arginput: str) -> bool: """If the input is a directory, return it. Otherwise, print an error message and exit Parameters ---------- - arginput + arginput : str The input directory. Returns ------- + str the directory path. """ - if os.path.isdir(arginput): - return arginput - print(f'"{arginput}" is not a directory. Exiting...') - sys.exit(1) + return bool(os.path.isdir(arginput)) -def CheckInputFiles(indir): +def CheckInputFiles(indir: str) -> bool: """Check if the input files are valid fastq files The function takes one argument, indir, which is the directory where the input files are located. - The function returns a boolean value, True or False + The function returns a boolean value, True or False depending on whether the files in 'indir' are valid Parameters ---------- - indir + indir : str The directory where the input files are located Returns ------- + bool A boolean value. """ - allowedextensions = [".fastq", ".fq", ".fastq.gz", ".fq.gz"] - foundfiles = [] + allowedextensions: list[str] = [".fastq", ".fq", ".fastq.gz", ".fq.gz"] + foundfiles: list[str] = [] for filenames in os.listdir(indir): extensions = "".join(pathlib.Path(filenames).suffixes) foundfiles.append(extensions) - return bool(any(i in allowedextensions for i in foundfiles)) - - -def get_args(givenargs, parser): - """ - Parse the commandline args - """ - - required_args = parser.add_argument_group("Required arguments") - optional_args = parser.add_argument_group("Optional arguments") - - required_args.add_argument( - "--input", - "-i", - type=dir_path, - metavar="DIR", - help="The input directory with raw fastq(.gz) files", - required=True, - ) - - required_args.add_argument( - "--output", - "-o", - metavar="DIR", - type=str, - default=os.getcwd(), # Default output dir is the current working dir - help="Output directory", - required=True, - ) - - optional_args.add_argument( - "--samplesheet", - "-samples", - metavar="File", - type=lambda s: check_file_extension( - (".xls", ".xlsx", ".csv", ".tsv", ".json"), s - ), - help="Sample sheet information file", - ) - - optional_args.add_argument( - "--reference", - "-ref", - type=lambda s: check_file_extension((".fasta", ".fa"), s), - metavar="File", - help="Input Reference sequence genome in FASTA format", - ) - - optional_args.add_argument( - "--primers", - "-pr", - type=lambda s: check_file_extension((".fasta", ".fa", ".bed"), s), - metavar="File", - help="Used primer sequences in FASTA or BED format. If no primers should be removed, supply the value NONE to this flag.", - ) - - required_args.add_argument( - "--platform", - default="nanopore", - const="nanopore", - nargs="?", - choices=("nanopore", "illumina", "iontorrent"), - help="Define the sequencing platform that was used to generate the dataset, either being 'nanopore', 'illumina' or 'iontorrent', see the docs for more info", - required=True, - metavar="'nanopore'/'illumina'/'iontorrent'", - ) - - required_args.add_argument( - "--amplicon-type", - "-at", - default="end-to-end", - const="end-to-end", - nargs="?", - choices=("end-to-end", "end-to-mid", "fragmented"), - help="Define the amplicon-type, either being 'end-to-end', 'end-to-mid', or 'fragmented'. See the docs for more info", - required=True, - metavar="'end-to-end'/'end-to-mid'/'fragmented'", - ) - - optional_args.add_argument( - "--target", - "--preset", - metavar="Str", - help="Define the specific target for the pipeline, if the target matches a certain preset then pre-defined analysis settings will be used, see the docs for more info", - ) - - optional_args.add_argument( - "--match-ref", - "-mr", - default=False, - action="store_true", - help="Match your data to the best reference available in the given reference fasta file.", - ) - - optional_args.add_argument( - "--min-coverage", - "-mc", - default=30, - type=int, - metavar="N", - help="Minimum coverage for the consensus sequence.", - ) - - optional_args.add_argument( - "--features", - "-gff", - type=lambda s: check_file_extension((".gff", ".gff3"), s), - metavar="File", - help="GFF file containing the Open Reading Frame (ORF) information of the reference. Supplying NONE will let ViroConstrictor use prodigal to determine coding regions", - ) - - optional_args.add_argument( - "--primer-mismatch-rate", - "-pmr", - type=float, - default=0.1, - metavar="N", - help="Maximum number of mismatches allowed in the primer sequences during primer coordinate search. Use 0 for exact primer matches\nDefault is 3.", - ) - - optional_args.add_argument( - "--disable-presets", - "-dp", - action="store_true", - default=False, - help="Disable the use of presets, this will cause all analysis settings to be set to default values", - ) - - optional_args.add_argument( - "--threads", - "-t", - default=min(multiprocessing.cpu_count(), 128), - metavar="N", - type=int, - help=f"Number of local threads that are available to use.\nDefault is the number of available threads in your system ({min(multiprocessing.cpu_count(), 128)})", - ) - - optional_args.add_argument( - "--version", - "-v", - version=__version__, - action="version", - help="Show the ViroConstrictor version and exit", - ) - - optional_args.add_argument( - "--help", - "-h", - action="help", - default=argparse.SUPPRESS, - help="Show this help message and exit", - ) - - optional_args.add_argument( - "--dryrun", - action="store_true", - help="Run the workflow without actually doing anything", - ) - - optional_args.add_argument( - "--skip-updates", - action="store_true", - help="Skip the update check", - ) - - if len(givenargs) < 1: - print( - f"{parser.prog} was called but no arguments were given, please try again\n\tUse '{parser.prog} -h' to see the help document" - ) - sys.exit(1) - else: - flags = parser.parse_args(givenargs) - - return flags + return any((i in allowedextensions for i in foundfiles)) -def args_to_df(args, df): +def args_to_df(args: argparse.Namespace, df: pd.DataFrame) -> pd.DataFrame: """It takes the arguments from the command line and places them into a dataframe Parameters @@ -537,169 +871,30 @@ def args_to_df(args, df): return df -def sampledir_to_df(sampledict, platform): - """Function converts a dictionary of samples to a pandas dataframe +def sampledir_to_df( + sampledict: dict[str, str] | dict[str, dict[str, str]], platform: str +) -> pd.DataFrame: + """Takes a dictionary of sample names and lists of input files, and returns a dataframe with the + sample names as the index and the input files as the columns Parameters ---------- - sampledict - a dictionary of sample names and their input files - platform + sampledict : dict[str, list[str]] + A dictionary of sample names to lists of input files. + platform : str The sequencing platform used to generate the data. Returns ------- - A dataframe with the sample name as the index and the input file as the column. + A dataframe with the sample names as the index and the input files as the columns. """ frame = pd.DataFrame.from_dict(sampledict, orient="index") if platform == "illumina": frame.index.rename("SAMPLE", inplace=True) return frame - if platform in ["nanopore", "iontorrent"]: + if platform in {"nanopore", "iontorrent"}: frame.index.rename("SAMPLE", inplace=True) frame.rename(columns={0: "INPUTFILE"}, inplace=True) return frame raise ValueError(f"Platform {platform} not supported") - - -def make_sampleinfo_dict(df, args, filedict): - """It takes a samplesheet (dataframe) and a dictionary fastq files, and returns a dictionary of sample - information - - Parameters - ---------- - df - the samplesheet dataframe - args - The arguments given to the script. - filedict - a dictionary of the files in the input directory - - Returns - ------- - A dictionary of the samplesheet and input directory. - - """ - if not CheckInputFiles(args.input): - print( - f"\n{color.RED + color.BOLD}'{args.input}' does not contain any valid FastQ files. Exiting...{color.END}\n" - ) - sys.exit(1) - print( - f"\n{color.GREEN}Valid input files were found in the input directory.{color.END} ('{args.input}')\n" - ) - indirFrame = sampledir_to_df(filedict, args.platform) - if df is not None: - df.set_index("SAMPLE", inplace=True) - df = pd.merge(df, indirFrame, left_index=True, right_index=True) - if df.empty: - print( - f"\n{color.RED + color.BOLD}The files given in the samplesheet do not match the files given in the input-directory. Please check your samplesheet or input directory and try again.{color.END}\n" - ) - sys.exit(1) - if len(indirFrame) > len(df): - print( - f"\n{color.RED + color.BOLD}Not all samples in the input directory are present in the given samplesheet. Please check your samplesheet or input directory and try again.{color.END}\n" - ) - sys.exit(1) - if len(indirFrame) < len(df): - print( - f"\n{color.RED + color.BOLD}Either not all samples in the samplesheet are present in the given input directory, or there are duplicate samples in the samplesheet. Please check your samplesheet or input directory and try again.{color.END}\n" - ) - sys.exit(1) - if df.get("PRIMER-MISMATCH-RATE") is None: - df["PRIMER-MISMATCH-RATE"] = args.primer_mismatch_rate - if df.get("MIN-COVERAGE") is None: - df["MIN-COVERAGE"] = args.min_coverage - if df.get("PRIMERS") is None: - df["PRIMERS"] = args.primers - if args.primers is None: - print( - f"\n{color.RED + color.BOLD}No primer file specified in samplesheet or in command line options. Consider adding the -pr flag.{color.END}\n" - ) - sys.exit(1) - if df.get("FEATURES") is None: - df["FEATURES"] = args.features - if args.features is None: - print( - f"\n{color.RED + color.BOLD}No features file specified in samplesheet or in command line options. Consider adding the -gff flag.{color.END}\n" - ) - sys.exit(1) - if df.get("PRESET") is None: - df[["PRESET", "PRESET_SCORE"]] = df.apply( - lambda x: pd.Series( - match_preset_name(x["VIRUS"], use_presets=args.presets) - ), - axis=1, - ) - df = pd.DataFrame.replace(df, np.nan, None) - return df.to_dict(orient="index") - return args_to_df(args, indirFrame).to_dict(orient="index") - - -def ValidArgs(sysargs): - """Wrapper function which takes the command line arguments and returns a dictionary with all the information needed to run the pipeline - - Parameters - ---------- - sysargs - the command line arguments - - Returns - ------- - args, sampleinfo - - """ - parser = RichParser( - prog=f"[bold]{__prog__}[/bold]", - usage="%(prog)s \[required arguments] \[optional arguments]", - description="%(prog)s: a pipeline for analysing Viral targeted (amplicon) sequencing data in order to generate a biologically valid consensus sequence.", - formatter_class=FlexibleArgFormatter, - add_help=False, - ) - args = get_args(sysargs, parser) - - df = None - - # invert the flag as this is easier to check downstream - args.presets = args.disable_presets is False - - if args.samplesheet is not None: - if args.primers is not None: - print( - f"{color.YELLOW}Both a sample sheet and run-wide primer file was given, the given run-wide primer file will be ignored{color.END}" - ) - if args.reference is not None: - print( - f"{color.YELLOW}Both a sample sheet and run-wide reference fasta was given, the given run-wide reference fasta will be ignored{color.END}" - ) - if args.features is not None: - print( - f"{color.YELLOW}Both a sample sheet and run-wide GFF file was given, the given run-wide GFF file will be ignored{color.END}" - ) - - df = check_sample_sheet(args.samplesheet) - sampleinfo = make_sampleinfo_dict( - df, args, GetSamples(args.input, args.platform) - ) - else: - if any( - map( - lambda f: f is None, - {args.primers, args.reference, args.features, args.target}, - ) - ): - print( - f"{color.RED + color.BOLD}Run-wide analysis settings were not provided and no samplesheet was given either with per-sample run information.\nPlease either provide all required information (reference, primers, genomic features and viral-target) for a run-wide analysis or provide a samplesheet with per-sample run information{color.END}" - ) - sys.exit(1) - sampleinfo = make_sampleinfo_dict( - None, args, GetSamples(args.input, args.platform) - ) - - if df is None: - df = pd.DataFrame.from_dict(sampleinfo, orient="index") - if not sampleinfo: - sys.exit(1) - return args, sampleinfo, df diff --git a/ViroConstrictor/runconfigs.py b/ViroConstrictor/runconfigs.py index 9c3fec6..4865dac 100644 --- a/ViroConstrictor/runconfigs.py +++ b/ViroConstrictor/runconfigs.py @@ -6,23 +6,27 @@ import multiprocessing import os +from typing import Any import yaml +from ViroConstrictor.parser import CLIparser -def WriteYaml(data, filepath): + +def WriteYaml(data: dict, filepath: str) -> str: """WriteYaml takes a dictionary and a filepath, and writes the given dictionary to the filepath as a yaml file Parameters ---------- - data + data : dict The data to be written to the file. - filepath + filepath : str The path to the file you want to write to. Returns ------- + filepath : str The filepath """ @@ -33,166 +37,102 @@ def WriteYaml(data, filepath): return filepath -def set_cores(cores): - """Balance the requested number of cores to the available number of cores on the system to ensure analysis will not overload the system in local-analysis mode. - If the number of cores requested is equal to the number of cores available, return the number of cores requested minus 2. - If the number of cores requested is greater than the number of cores available, then return the - number of cores available minus 2. - Otherwise, return the number of cores requested - - Parameters - ---------- - cores - The number of cores to use for parallel processing given by the user. - - Returns - ------- - The number of cores that can be used by ViroConstrictor during analysis. - - """ - available = multiprocessing.cpu_count() - if cores == available: - return cores - 2 - if cores > available: - return available - 2 - return cores - - -def get_max_local_mem(): - """It returns the maximum amount of local memory that can be allocated to a single process on the - current machine - - Returns - ------- - The amount of memory available in MB. - - """ - avl_mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") - return int(round(avl_mem_bytes / (1024.0**2) - 2000, -3)) - - -def SnakemakeConfig(conf, cores, dryrun): - """Function takes the configuration file and the number of cores to use, and returns a dictionary that can be - used to configure Snakemake - - Parameters - ---------- - conf - the configuration file - cores - The number of cores to use for each job. - dryrun - If True, the pipeline will only execute in dry-run mode. - - Returns - ------- - A dictionary with the configuration for the snakemake workflow. - - """ - cores = set_cores(cores) - compmode = conf["COMPUTING"]["compmode"] - - if compmode == "local": - config = { +class GetSnakemakeRunDetails: + def __init__(self, inputs_obj: CLIparser) -> None: + self.inputs = inputs_obj + self._snakemake_run_config() + self._snakemake_run_params() + + def _snakemake_run_config(self) -> dict[str, Any]: + self.snakemake_run_conf = {} + cores = self._set_cores(self.inputs.flags.threads) + configuration = self.inputs.user_config + compmode = configuration["COMPUTING"]["compmode"] + + if compmode == "grid": + queuename = configuration["COMPUTING"]["queuename"] + # threads = "{threads}" + # mem = "{resources.mem_mb}" + self.snakemake_run_conf = { + "cores": 300, + "latency-wait": 60, + "use-conda": True, + "dryrun": self.inputs.flags.dryrun, + "jobname": "ViroConstrictor_{name}.jobid{jobid}", + "drmaa": f' -q {queuename} -n {{threads}} -R "span[hosts=1]" -M {{resources.mem_mb}}', + "drmaa-log-dir": "logs/drmaa", + } + return self.snakemake_run_conf + self.snakemake_run_conf = { "cores": cores, "latency-wait": 60, "use-conda": True, - "dryrun": dryrun, + "dryrun": self.inputs.flags.dryrun, "jobname": "ViroConstrictor_{name}.jobid{jobid}", } - - if compmode == "grid": - queuename = conf["COMPUTING"]["queuename"] - threads = "{threads}" - mem = "{resources.mem_mb}" - config = { - "cores": 300, - "latency-wait": 60, - "use-conda": True, - "dryrun": dryrun, - "jobname": "ViroConstrictor_{name}.jobid{jobid}", - "drmaa": f' -q {queuename} -n {threads} -R "span[hosts=1]" -M {mem}', - "drmaa-log-dir": "logs/drmaa", + return self.snakemake_run_conf + + def _snakemake_run_params(self) -> dict[str, Any]: + self.snakemake_run_parameters = {} + configuration = self.inputs.user_config + threads_highcpu: int = min(int(self.inputs.flags.threads - 2), 12) + threads_midcpu: int = min(self.inputs.flags.threads // 2, 6) + threads_lowcpu: int = 1 + if configuration["COMPUTING"]["compmode"] == "grid": + threads_highcpu = 12 + threads_midcpu = 6 + threads_lowcpu = 2 + self.snakemake_run_parameters = { + "sample_sheet": WriteYaml( + self.inputs.samples_dict, f"{self.inputs.workdir}/samplesheet.yaml" + ), + "computing_execution": configuration["COMPUTING"]["compmode"], + "max_local_mem": self._get_max_local_mem(), + "platform": self.inputs.flags.platform, + "amplicon_type": self.inputs.flags.amplicon_type, + "threads": { + "Alignments": threads_highcpu, + "QC": threads_midcpu, + "AdapterRemoval": threads_lowcpu, + "PrimerRemoval": threads_highcpu, + "Consensus": threads_midcpu, + "Index": threads_lowcpu, + "Typing": threads_lowcpu, + }, } - - return config - - -# Todo: possibly refactor this function to reduce parameters -def SnakemakeParams(conf, cores, sampleinfo, platform, samplesheet, amplicon_type): - """Function takes the configuration file, the number of cores, the sample information, the platform, the - sample sheet, and the amplicon type, and returns a dictionary with the sample sheet, the computing - execution mode, the maximum local memory, the platform, the amplicon type, the number of threads for - each step, and the run parameters - - Parameters - ---------- - conf - the configuration file - cores - the number of cores to use for the pipeline - sampleinfo - a dictionary of sample information - platform - The sequencing platform used to generate the data. - samplesheet - the path to the samplesheet - amplicon_type - This is the type of amplicon that you are using. This is used to determine the primer sequences to - remove. - - Returns - ------- - A dictionary with the following keys: - sample_sheet - computing_execution - max_local_mem - platform - amplicon_type - threads - runparams - - """ - if conf["COMPUTING"]["compmode"] == "local": - threads_highcpu = min(int(cores - 2), 12) - threads_midcpu = min(int(cores / 2), 6) - threads_lowcpu = 1 - if conf["COMPUTING"]["compmode"] == "grid": - threads_highcpu = 12 - threads_midcpu = 6 - threads_lowcpu = 2 - - return { - "sample_sheet": samplesheet, - "computing_execution": conf["COMPUTING"]["compmode"], - "max_local_mem": get_max_local_mem(), - "platform": platform, - "amplicon_type": amplicon_type, - "threads": { - "Alignments": threads_highcpu, - "QC": threads_midcpu, - "AdapterRemoval": threads_lowcpu, - "PrimerRemoval": threads_highcpu, - "Consensus": threads_midcpu, - "Index": threads_lowcpu, - "Typing": threads_lowcpu, - }, - } - - -def LoadConf(configfile): - """Function opens a yaml file, reads it, and returns the content as a dictionary - - Parameters - ---------- - configfile - The path to the yaml file. - - Returns - ------- - A dictionary of the yaml file. - - """ - with open(configfile, "r") as ConfIn: - conf = yaml.load(ConfIn, Loader=yaml.FullLoader) - return conf + return self.snakemake_run_parameters + + def _set_cores(self, cores: int) -> int: + """Balance the requested number of cores to the available number of cores on the system to ensure analysis will not overload the system in local-analysis mode. + If the number of cores requested is equal to the number of cores available, return the number of cores requested minus 2. + If the number of cores requested is greater than the number of cores available, then return the + number of cores available minus 2. + Otherwise, return the number of cores requested + + Parameters + ---------- + cores : int + The number of cores to use for parallel processing given by the user. + + Returns + ------- + cores : int + The number of cores that can be used by ViroConstrictor during analysis. + + """ + available: int = multiprocessing.cpu_count() + if cores == available: + return cores - 2 + return available - 2 if cores > available else cores + + def _get_max_local_mem(self) -> int: + """It returns the maximum amount of local memory that can be allocated to a single process on the + current machine + + Returns + ------- + The amount of memory available in MB. + + """ + avl_mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") + return int(round(avl_mem_bytes / (1024.0**2) - 2000, -3)) diff --git a/ViroConstrictor/runreport.py b/ViroConstrictor/runreport.py index 0c0df7a..0fccbdb 100644 --- a/ViroConstrictor/runreport.py +++ b/ViroConstrictor/runreport.py @@ -1,6 +1,8 @@ +import configparser import os import sys from datetime import datetime +from typing import Any, Literal from fpdf import FPDF @@ -8,10 +10,10 @@ class PDF(FPDF): - def timestamp(self): + def timestamp(self) -> str: return datetime.now().strftime("%d-%m-%Y %H:%M") - def header(self): + def header(self) -> None: self.set_font("Helvetica", size=23, style="B") self.cell(0, 20, "ViroConstrictor", align="C", ln=1) self.set_font("Helvetica", size=12) @@ -19,20 +21,24 @@ def header(self): self.set_font("Helvetica", size=8) self.cell(0, 5, self.timestamp(), align="C", ln=1) self.set_font("Helvetica", size=10, style="BU") - self.cell(0, 15, "Version: " + __version__, align="C", ln=1) + self.cell(0, 15, f"Version: {__version__}", align="C", ln=1) -def directory_sections(pdf, iteration, contents): +def directory_sections(pdf: PDF, iteration: int, contents: dict[int, list[str]]) -> PDF: pdf.set_font("Helvetica", size=12, style="B") - pdf.cell(40, 12, contents.get(iteration)[0], align="L") + if outdir := contents.get(iteration): + pdf.cell(40, 12, outdir[0], align="L") pdf.set_font("Helvetica", size=10) - pdf.cell(0, 12, contents.get(iteration)[1], align="L", ln=1) + if indir := contents.get(iteration): + pdf.cell(0, 12, indir[1], align="L", ln=1) pdf.set_font("Helvetica", size=12, style="I") - pdf.cell(0, 5, contents.get(iteration)[2], align="L", ln=1) + if startdir := contents.get(iteration): + pdf.cell(0, 5, startdir[2], align="L", ln=1) + # pdf.cell(0, 5, contents.get(iteration)[2], align="L", ln=1) return pdf -def analysis_details(pdf, header, text): +def analysis_details(pdf: PDF, header: str, text: str) -> PDF: pdf.set_font("Helvetica", size=12, style="B") pdf.cell(55, 5, header, align="L") pdf.set_font("Helvetica", size=10) @@ -40,13 +46,21 @@ def analysis_details(pdf, header, text): return pdf -def WriteReport(workingdir, inpath, startpath, conf, sparams, sconfig, status): +def WriteReport( + workingdir: str, + inpath: str, + startpath: str, + conf: configparser.ConfigParser, + sparams: dict[str, Any], + sconfig: dict[str, Any], + status: Literal["Failed", "Success"], +) -> None: if os.getcwd() != workingdir: os.chdir(workingdir) sconfig.update(sparams) - directories = { + directories: dict[int, list[str]] = { 0: [ "Output directory:", "This is the directory where the output files were written as well as this summary.", @@ -107,4 +121,4 @@ def WriteReport(workingdir, inpath, startpath, conf, sparams, sconfig, status): command = str(sys.argv[0]).split("/")[-1], *sys.argv[1:] pdf.multi_cell(0, 5, f'{" ".join(command)}') - pdf.output("Runinfomation.pdf", "F") + pdf.output(name="Runinfomation.pdf") diff --git a/ViroConstrictor/samplesheet.py b/ViroConstrictor/samplesheet.py index 004d7bc..aa03b8c 100644 --- a/ViroConstrictor/samplesheet.py +++ b/ViroConstrictor/samplesheet.py @@ -4,86 +4,88 @@ """ import os +import pathlib import re -import yaml - -def illumina_sheet(inputdir): - """Function takes a directory as input, and returns a dictionary of dictionaries, where the keys of the outer - dictionary are the sample names, and the keys of the inner dictionaries are the read numbers (R1 and - R2) +def illumina_sheet(inputdir: pathlib.Path) -> dict[str, dict[str, str]]: + """Takes a directory, walks through it, and returns a dictionary of dictionaries Parameters ---------- - inputdir - The directory where the fastq files are located. + inputdir : pathlib.Path + pathlib.Path = pathlib.Path("/path/to/input/directory") Returns ------- - A dictionary of dictionaries. + A dictionary of dictionaries. The outer dictionary has the sample name as the key and the inner + dictionary has the read number as the key and the file path as the value. """ - illuminapattern = re.compile(r"(.*)(_|\.)R?(1|2)(?:_.*\.|\..*\.|\.)f(ast)?q(\.gz)?") - samples = {} + illuminapattern: re.Pattern = re.compile( + r"(.*)(_|\.)R?(1|2)(?:_.*\.|\..*\.|\.)f(ast)?q(\.gz)?" + ) + samples: dict[str, dict[str, str]] = {} for dirname, subdir, filename in os.walk(inputdir): for files in filename: - fullpath = os.path.abspath(os.path.join(dirname, files)) + fullpath: str = os.path.abspath(os.path.join(dirname, files)) if match := illuminapattern.fullmatch(files): sample = samples.setdefault(match[1], {}) - sample[f"R{match[3]}"] = str(fullpath) + sample[f"R{match[3]}"] = fullpath return samples -def nanopore_sheet(inputdir): - """Function takes a directory as input, and returns a dictionary of sample names and their corresponding file +def nanopore_sheet(inputdir: pathlib.Path) -> dict[str, str]: + """Takes a directory as input, and returns a dictionary of sample names and their corresponding file paths Parameters ---------- - inputdir - the directory where the fastq files are located + inputdir : pathlib.Path + pathlib.Path = pathlib.Path("/path/to/input/directory") Returns ------- - A dictionary with the sample name as the key and the full path to the file as the value. + A dictionary of samples and their full path. """ - nanoporepattern = re.compile(r"(.*)\.f(ast)?q(\.gz)?") - samples = {} + nanoporepattern: re.Pattern = re.compile(r"(.*)\.f(ast)?q(\.gz)?") + samples: dict[str, str] = {} for dirname, subdir, filename in os.walk(inputdir): for files in filename: - fullpath = os.path.abspath(os.path.join(dirname, files)) + fullpath: str = os.path.abspath(os.path.join(dirname, files)) if match := nanoporepattern.fullmatch(files): samples.setdefault(match[1], fullpath) return samples -def iontorrent_sheet(inputdir): - """Function takes a directory as input, and returns a dictionary of sample names and their corresponding file +def iontorrent_sheet(inputdir: pathlib.Path) -> dict[str, str]: + """Takes a directory as input, and returns a dictionary of sample names and their corresponding file paths Parameters ---------- - inputdir - the directory where the fastq files are located + inputdir : pathlib.Path + pathlib.Path = pathlib.Path("/path/to/input/directory") Returns ------- A dictionary with the sample name as the key and the full path to the file as the value. """ - iontorrentpattern = re.compile(r"(.*)\.f(ast)?q(\.gz)?") - samples = {} + iontorrentpattern: re.Pattern = re.compile(r"(.*)\.f(ast)?q(\.gz)?") + samples: dict[str, str] = {} for dirname, subdir, filename in os.walk(inputdir): for files in filename: - fullpath = os.path.abspath(os.path.join(dirname, files)) + fullpath: str = os.path.abspath(os.path.join(dirname, files)) if match := iontorrentpattern.fullmatch(files): samples.setdefault(match[1], fullpath) return samples -def GetSamples(inputdir, platform): +def GetSamples( + inputdir: pathlib.Path, platform: str +) -> dict[str, str] | dict[str, dict[str, str]]: """Wrapping function taking in a directory and sequencing platform, triggers appropriate sub-function and returns a dictionary of samples Parameters @@ -99,9 +101,12 @@ def GetSamples(inputdir, platform): """ if platform == "illumina": - samples = illumina_sheet(inputdir) - if platform == "nanopore": - samples = nanopore_sheet(inputdir) - if platform == "iontorrent": - samples = iontorrent_sheet(inputdir) - return samples + illumina_samples: dict[str, dict[str, str]] = illumina_sheet(inputdir) + return illumina_samples + elif platform == "iontorrent": + iontorrent_samples: dict[str, str] = iontorrent_sheet(inputdir) + return iontorrent_samples + elif platform == "nanopore": + nanopore_samples: dict[str, str] = nanopore_sheet(inputdir) + return nanopore_samples + return {} diff --git a/ViroConstrictor/update.py b/ViroConstrictor/update.py index 479fee0..93e5c00 100644 --- a/ViroConstrictor/update.py +++ b/ViroConstrictor/update.py @@ -1,3 +1,4 @@ +import configparser import json import os import subprocess @@ -6,13 +7,12 @@ from urllib import request from ViroConstrictor import __version__ +from ViroConstrictor.logging import log +from ViroConstrictor.userprofile import AskPrompts -from .functions import color -from .userprofile import AskPrompts - - -def update(sysargs, conf): +# TODO refactor and split this function +def update(sysargs: list[str], conf: configparser.ConfigParser) -> None: autocontinue = conf["GENERAL"]["auto_update"] == "yes" ask_prompt = not autocontinue and conf["GENERAL"]["ask_for_update"] == "yes" if autocontinue: @@ -21,7 +21,7 @@ def update(sysargs, conf): "https://api.github.com/repos/RIVM-bioinformatics/Viroconstrictor/releases" ) except Exception as e: - sys.stderr.write("Unable to connect to GitHub API\n" f"{e}") + log.warning("Unable to connect to GitHub API\n" f"{e}") return latest_release = json.loads(latest_release.read().decode("utf-8"))[0] @@ -51,8 +51,8 @@ def update(sysargs, conf): stderr=subprocess.DEVNULL, ) - print( - f"ViroConstrictor updated to {color.YELLOW + color.BOLD}{latest_release_tag}{color.END}" + log.info( + f"ViroConstrictor updated to [bold yellow]{latest_release_tag}[/bold yellow]" ) subprocess.run(sysargs) @@ -63,10 +63,10 @@ def update(sysargs, conf): ): if ( AskPrompts( - f"""{color.RED + color.BOLD}There's a new version of ViroConstrictor available. This new version is a {color.UNDERLINE}major{color.END + color.RED + color.BOLD} update and cannot be installed automatically.{color.END} + f"""[bold red]There's a new version of ViroConstrictor available. This new version is a major[/underline] update and cannot be installed automatically.[/bold red] -Current version: {color.RED + color.BOLD}{'v' + __version__}{color.END} -Latest version: {color.GREEN + color.BOLD}{latest_release_tag}{color.END} +Current version: [bold red]{'v' + __version__}[/bold red] +Latest version: [bold green]{latest_release_tag}[/bold green] The auto-updater can't install major version changes for you, as this would (probably) severely break your installation. If you wish to update to the newest version you will have to do so manually. @@ -74,8 +74,8 @@ def update(sysargs, conf): If you want to run ViroConstrictor with the current version then please turn off the auto-updater. If you won't turn off the auto-updater we'll keep nagging you about this until you manually updated to the newest version. """, - "Do you want to turn off the auto-updater so you wont get this message again? (yes/no) ", - ("yes", "no"), + "Do you want to turn off the auto-updater so you wont get this message again? \[yes/no] ", + ["yes", "no"], fixedchoices=True, ) == "yes" @@ -87,15 +87,14 @@ def update(sysargs, conf): os.path.expanduser("~/.ViroConstrictor_defaultprofile.ini"), "w" ) as f: conf.write(f) - print("The ViroConstrictor auto-updater is now turned off") - print( + log.info("The ViroConstrictor auto-updater is now turned off") + log.info( f"Please re-run the ViroConstrictor command to execute the workflow with the current version ({'v' + __version__}) or update manually to the newest version" ) sys.exit(0) - print( - "ViroConstrictor is unable to update itself to the newest version as this is a major version change that cannot be installed automatically.\nPlease update manually and try again or turn-off the auto-updater " + log.error( + "ViroConstrictor is unable to update itself to the newest version as this is a major version change that cannot be installed automatically.\nPlease update manually and try again or turn-off the auto-updater\nExiting..." ) - print("Exiting...") sys.exit(1) return @@ -107,7 +106,7 @@ def update(sysargs, conf): "https://api.github.com/repos/RIVM-bioinformatics/Viroconstrictor/releases" ) except Exception as e: - sys.stderr.write("Unable to connect to GitHub API\n" f"{e}") + log.error("Unable to connect to GitHub API\n" f"{e}") return latest_release = json.loads(latest_release.read().decode("utf-8"))[0] @@ -128,9 +127,9 @@ def update(sysargs, conf): f""" There's a new version of ViroConstrictor available. -Current version: {color.RED + color.BOLD}{'v' + __version__}{color.END} -Latest version: {color.GREEN + color.BOLD}{latest_release_tag}{color.END}\n""", - """Do you want to update? [yes/no] """, +Current version: [bold red]{'v' + __version__}[/bold red] +Latest version: [bold green]{latest_release_tag}[/bold green]\n""", + """Do you want to update? \[yes/no] """, ["yes", "no"], fixedchoices=True, ) @@ -150,22 +149,21 @@ def update(sysargs, conf): stderr=subprocess.DEVNULL, ) - print( - f"ViroConstrictor updated to {color.YELLOW + color.BOLD}{latest_release_tag}{color.END}" + log.info( + f"ViroConstrictor updated to [bold yellow]{latest_release_tag}[/bold yellow]" ) subprocess.run(sysargs) sys.exit(0) - print(f"Skipping update to version {latest_release_tag}") - print("Continuing...") + log.info(f"Skipping update to version {latest_release_tag}\nContinuing...") return if localversion < latest_release_tag_tidied: - print( - f"{color.RED}There's a new version of ViroConstrictor available. This new version is a {color.UNDERLINE}major{color.END + color.RED} update and cannot be installed automatically.{color.END}" + log.warning( + "[red]There's a new version of ViroConstrictor available. This new version is a [underline]major[/underline] update and cannot be installed automatically.[/red]" ) - print( - f"""Current version: {color.RED + color.BOLD}v{__version__}{color.END}\nLatest version: {color.GREEN + color.BOLD}{latest_release_tag}{color.END}""" + log.warning( + f"""Current version: [bold red]v{__version__}[/bold red]\nLatest version: [bold green]{latest_release_tag}[/bold green]""" ) - print("Continuing without updating...\n") + log.warning("Continuing without updating...\n") return diff --git a/ViroConstrictor/userprofile.py b/ViroConstrictor/userprofile.py index 6f11e9f..df1885a 100644 --- a/ViroConstrictor/userprofile.py +++ b/ViroConstrictor/userprofile.py @@ -6,46 +6,45 @@ """ import configparser import os +import pathlib import readline import subprocess import sys -from .functions import color, tabCompleter +from rich import print +from rich.console import Console +from ViroConstrictor.functions import tabCompleter +from ViroConstrictor.logging import log -def FileExists(file): - """Function returns a boolean value (True or False) depending on whether the file exists or not - Parameters - ---------- - file - The file to check if it exists. +def FileExists(file: pathlib.Path) -> bool: + """Return True if the file exists, False if it doesn't. - Returns - ------- - True or False + Args: + file: The file to check. + Returns: + True if the file exists, False if it doesn't. """ return bool(os.path.isfile(file)) -def FileIsPopulated(file): - """If the file exists and is not empty, return True. Otherwise, return False - - Parameters - ---------- - file - The file to check. +def FileIsPopulated(file: pathlib.Path) -> bool: + """Checks if the given file is populated. - Returns - ------- - The size of the file in bytes. + Args: + file: The file to check. + Returns: + True if the given file is populated, False otherwise. """ return os.stat(file).st_size >= 1 -def AskPrompts(intro, prompt, options, fixedchoices=False): +def AskPrompts( + intro: str, prompt: str, options: list, fixedchoices: bool = False +) -> str | None: """This function is used to ask the user a question and provide a list of options to choose from. A free-text user reply is also possible. @@ -73,7 +72,7 @@ def AskPrompts(intro, prompt, options, fixedchoices=False): the reply variable. """ - if fixedchoices is True: + if fixedchoices: completer = tabCompleter() completer.createListCompleter(options) @@ -82,10 +81,13 @@ def AskPrompts(intro, prompt, options, fixedchoices=False): readline.set_completer(completer.listCompleter) subprocess.call("/bin/clear", shell=False) + + print(f"[bold blue]{'='*75}[/bold blue]") print(intro) + print(f"[bold blue]{'='*75}[/bold blue]") while "the answer is invalid": - if fixedchoices is True: - reply = input(prompt).lower().strip() + if fixedchoices: + reply = Console(soft_wrap=True).input(prompt) if reply in options: return reply if reply == "quit": @@ -95,7 +97,7 @@ def AskPrompts(intro, prompt, options, fixedchoices=False): print( "The given answer was invalid. Please choose one of the available options\n" ) - if fixedchoices is False: + if not fixedchoices: reply = input(prompt).strip() if reply == "quit": sys.exit(-1) @@ -103,7 +105,7 @@ def AskPrompts(intro, prompt, options, fixedchoices=False): return reply -def BuildConfig(file): +def BuildConfig(file: pathlib.Path) -> None: """Function asks the user a series of questions and writes the answers to a config file Parameters @@ -118,55 +120,51 @@ def BuildConfig(file): conf_object = configparser.ConfigParser() - conf_object["COMPUTING"] = { + conf_object["COMPUTING"] = { # type: ignore "compmode": AskPrompts( - f""" -ViroConstrictor can run in two computing-modes. -{color.YELLOW + color.UNDERLINE}local{color.END} or {color.YELLOW + color.UNDERLINE}HPC/Grid{color.END} -Please specify the computing-mode that you wish to use for ViroConstrictor. - """, - f"""Do you wish to run ViroConstrictor in {color.YELLOW}local{color.END} or {color.YELLOW}grid{color.END} mode? [local/grid] """, + """ViroConstrictor can run in two computing-modes. +[yellow underline]local[/yellow underline] or [yellow underline]HPC/Grid[/yellow underline] +Please specify the computing-mode that you wish to use for ViroConstrictor.""", + """Do you wish to run ViroConstrictor in [yellow]local[/yellow] or [yellow]grid[/yellow] mode? \[local/grid] """, ["local", "grid"], fixedchoices=True, ) } if conf_object["COMPUTING"]["compmode"] == "grid": - conf_object["COMPUTING"]["queuename"] = AskPrompts( - f""" -Grid mode has been chosen. Please enter the name of computing-queue that you wish to use on your grid/HPC cluster.\nThis is necessary so ViroConstrictor will send all the various tasks to the correct (remote) computers.\n\n{color.BOLD + color.UNDERLINE + color.YELLOW}Please note that this is case-sensitive{color.END}\n""", + conf_object["COMPUTING"]["queuename"] = AskPrompts( # type: ignore + f"""Grid mode has been chosen. Please enter the name of computing-queue that you wish to use on your grid/HPC cluster.\nThis is necessary so ViroConstrictor will send all the various tasks to the correct (remote) computers.\n\n[bold underline yellow]Please note that this is case-sensitive[/bold underline yellow]\n""", "Please specify the name of the Queue on your grid/HPC cluster that you wish to use. [free text] ", [], fixedchoices=False, ) - conf_object["GENERAL"] = { + conf_object["GENERAL"] = { # type: ignore "auto_update": AskPrompts( - f""" -ViroConstrictor can check and update itself everytime you run it. -Please specify whether you wish to enable the auto-update feature. - """, - f"""Do you wish to enable the auto-update feature? [yes/no] """, + """ViroConstrictor can check and update itself everytime you run it. +Please specify whether you wish to enable the auto-update feature.""", + f"""Do you wish to enable the auto-update feature? \[yes/no] """, ["yes", "no"], fixedchoices=True, ) } if conf_object["GENERAL"]["auto_update"] == "no": - conf_object["GENERAL"]["ask_for_update"] = AskPrompts( - f""" -ViroConstrictor will not automatically update itself, but ViroConstrictor can still check for updates and ask you if you wish to update. - """, - f"""Do you want ViroConstrictor to {color.YELLOW}ask you{color.END} to update everytime a new update is available? [yes/no] """, + conf_object["GENERAL"]["ask_for_update"] = AskPrompts( # type: ignore + """ViroConstrictor will not automatically update itself, but ViroConstrictor can still check for updates and ask you if you wish to update.""", + """Do you want ViroConstrictor to [yellow underline]ask you[/yellow underline] to update everytime a new update is available? \[yes/no] """, ["yes", "no"], fixedchoices=True, ) + subprocess.call("/bin/clear", shell=False) + with open(file, "w") as conffile: conf_object.write(conffile) + log.info("[green]Successfully written global configuration settings[/green]") -def AllOptionsGiven(config): +def AllOptionsGiven(config: configparser.ConfigParser) -> bool: """Function checks if all required config options are present in the already existing config file. Necessary to avoid missing config options when a user updates to a new version of ViroConstrictor. @@ -180,7 +178,7 @@ def AllOptionsGiven(config): A boolean value. """ - all_present = True + all_present: bool = True if config.has_section("COMPUTING") is True: if ( @@ -207,7 +205,7 @@ def AllOptionsGiven(config): return all_present -def ReadConfig(file): +def ReadConfig(file: pathlib.Path) -> configparser.ConfigParser: """ReadConfig() reads a config file, and if it doesn't exist, it creates it. -> If it does exist, but is empty, the configfile is recreated. -> If it exists and is populated, it reads it. @@ -235,4 +233,5 @@ def ReadConfig(file): BuildConfig(file) config = configparser.ConfigParser() config.read(file) + log.info("[green]Succesfully read global configuration file[/green]") return config diff --git a/ViroConstrictor/validatefasta.py b/ViroConstrictor/validatefasta.py index e6c958a..94dbca5 100644 --- a/ViroConstrictor/validatefasta.py +++ b/ViroConstrictor/validatefasta.py @@ -8,10 +8,10 @@ from Bio import SeqIO -from ViroConstrictor.functions import color +from ViroConstrictor.logging import log -def ContainsSpecials(seq): +def ContainsSpecials(seq: str) -> bool: """It takes a string as input and returns True if the string contains any characters other than the 20 amino acids, '-', or '*' @@ -22,24 +22,21 @@ def ContainsSpecials(seq): Returns ------- + bool True or False """ - chars = re.compile("[^actgumrwsykvhdbnACTGUMRWSYKVHDBN-]") - - if chars.search(seq) is None: - return False - return True + return chars.search(seq) is not None -def ContainsAmbiguities(seq): +def ContainsAmbiguities(seq: str) -> bool: """If the sequence contains any of the characters in the string "umrwsykvhdbnUMRWSYKVHDBN" (all possible nucleotide ambiguities), then return True, otherwise return False Parameters ---------- - seq + seq : str the sequence to be checked Returns @@ -48,22 +45,21 @@ def ContainsAmbiguities(seq): """ chars = re.compile("[umrwsykvhdbnUMRWSYKVHDBN]") - if chars.search(seq) is None: - return False - return True + return chars.search(seq) is not None -def IsValidRef(inputfile): +def IsValidRef(inputfile: str) -> bool: """If the input file is a valid FASTA file, and none of the sequences in the file contain ambiguous characters, then the file is a valid reference file Parameters ---------- - inputfile + inputfile : str The path to the reference file. Returns ------- + bool A boolean value. """ @@ -75,17 +71,18 @@ def IsValidRef(inputfile): return False -def IsValidFasta(inputfile): +def IsValidFasta(inputfile: str) -> bool: """Function takes a fasta file (path) as input and returns True if all sequences in the file are valid, and False if any sequence in given fasta is invalid Parameters ---------- - inputfile + inputfile : str The input file to check. Returns ------- + bool A boolean value. """ @@ -95,30 +92,30 @@ def IsValidFasta(inputfile): ContainsSpecials(str(record.seq)) for record in SeqIO.parse(inputfile, "fasta") ] - if any(results): - return False - return True + return not any(results) -def CheckReferenceFile(referencefile, warnings_as_errors=False): - """Checks reference file properties to be compatible. Does not return anything. +def CheckReferenceFile(referencefile: str, warnings_as_errors: bool = False) -> None: + """Checks that the reference file is in the correct format, and that it doesn't contain any + ambiguities Parameters ---------- - referencefile - The reference file to check. - warnings_as_errors, optional + referencefile : str + The path to the reference file. + warnings_as_errors : bool, optional If True, warnings will be treated as errors. """ - errors = [] - warnings = [] + errors: list[Exception] = [] + warnings: list[str] = [] for record in SeqIO.parse(referencefile, "fasta"): try: check_ref_header(record.id) except Exception as e: errors.append(e) + # Check whether there are stretches of ambiguities matches = re.split("[ACTGactg]", str(record.seq)) if longer_than_four := [m for m in matches if len(m) > 4]: errors.append( @@ -128,53 +125,47 @@ def CheckReferenceFile(referencefile, warnings_as_errors=False): ) ) + # Check whether there are any ambiguous nucleotides if ambiguities := sum(map(len, matches)): unique_ambiguities = "".join(set("".join(matches))) - w = Warning( - f"{ambiguities} Ambiguous nucleotides found in file {referencefile} in record {record.id}:\n" - f"\t{unique_ambiguities}\n" - "Check whether this is intended." - ) + w = f"""[cyan]{ambiguities}[/cyan] Ambiguous nucleotides found in file [magenta]{referencefile}[/magenta] in record [blue]{record.id}[/blue]:\t[bold yellow]{unique_ambiguities}[/bold yellow]\nPlease check whether this is intended.""" if warnings_as_errors: - errors.append(w) + errors.append(Exception(w)) else: warnings.append(w) if warnings: - print(color.YELLOW) for w in warnings: - print(w.args[0]) - print(color.END) + log.warning(f"{w}") if errors: - print(color.RED) - print("Error:") for e in errors: - print(e.args[0]) - print(color.END) + log.error(f"{e}") exit(1) -def check_ref_header(s): - """Function checks wether or not the reference header is valid. - Checks include blacklisted characters (e.g. characters used in system paths, or '*) and reserved windows words (e.g. 'CON' & 'AUX'). - Returns the header if it is valid, otherwise raises an error. +def check_ref_header(header: str) -> str | None: + """Checks the header of a reference fasta file to make sure there are no blacklisted or forbidden characters. + Returns the header if it is valid, otherwise exits the program with an error message. Parameters ---------- - s - the string to check + header : str + The header of the reference fasta file. Returns ------- - A list of tuples. + str + the fasta header. """ - if not s: - raise ValueError("Reference fasta does not have a header. Please add it.") + if not header: + log.error("Reference fasta does not have a header. Please add it.") + exit(1) blacklisted_characters = {"\\", "/", ":", "*", "?", '"', "<", ">", "|", "\0"} - if found_in_blacklist := {c for c in s if c in blacklisted_characters}: - raise ValueError( - f"Reference fasta header\n\t{s}\ncontains invalid characters\n\t{found_in_blacklist}\nPlease change the fasta header for this reference." + if found_in_blacklist := {c for c in header if c in blacklisted_characters}: + log.error( + f"Reference fasta header '[bold red]{header}[/bold red]' contains the following invalid characters [bold red]{found_in_blacklist}[/bold red]\nPlease change the fasta header for this reference." ) + exit(1) reserved_words_on_windows = { "CON", "PRN", @@ -199,12 +190,14 @@ def check_ref_header(s): "LPT8", "LPT9", } - if s in reserved_words_on_windows: - raise ValueError( - f"Reference fasta header\n\t{s}\nis a reserved word on the windows operating system. Please change it." + if header in reserved_words_on_windows: + log.error( + f"Reference fasta header '[bold red]{header}[/bold red]' is a reserved word on the windows operating system.\nPlease change it as it may cause problems." ) - if all(c == "." for c in s): - raise ValueError( - f"Reference fasta header\n\t{s}\nis not valid.\nPlease change the fasta header for this reference." + exit(1) + if all(c == "." for c in header): + log.error( + f"Reference fasta header '[bold red]{header}[/bold red]' is not a valid fasta header.\nPlease change the fasta header for this reference and try again." ) - return s + exit(1) + return header diff --git a/ViroConstrictor/workflow/presets.py b/ViroConstrictor/workflow/presets.py index c33e288..1c49e6e 100644 --- a/ViroConstrictor/workflow/presets.py +++ b/ViroConstrictor/workflow/presets.py @@ -107,7 +107,7 @@ } -def get_key_from_value(d: dict, value: str) -> str: +def get_key_from_value(d: dict, value: str) -> str | None: """This function finds the key in a dictionary which has a value matching the input value. Parameters @@ -134,7 +134,7 @@ def match_preset_name(targetname: str, use_presets: bool) -> Tuple[str, float]: query = re.sub(r"[^_a-zA-Z0-9/-]+", "", targetname).upper() if query == "DEFAULT": - return "DEFAULT" + return "DEFAULT", 0 # flatten list of lists aliases.values() into a single list aliases_list = [item for sublist in aliases.values() for item in sublist] @@ -144,8 +144,9 @@ def match_preset_name(targetname: str, use_presets: bool) -> Tuple[str, float]: if score < 0.35: return "DEFAULT", 0 - matched_preset = get_key_from_value(aliases, best_match) - return matched_preset, score + if matched_preset := get_key_from_value(aliases, best_match): + return matched_preset, score + return "DEFAULT", 0 def get_preset_parameter(preset_name: str, parameter_name: str) -> str: diff --git a/ViroConstrictor/workflow/workflow.smk b/ViroConstrictor/workflow/workflow.smk index 34c6abe..7b189b6 100644 --- a/ViroConstrictor/workflow/workflow.smk +++ b/ViroConstrictor/workflow/workflow.smk @@ -22,6 +22,11 @@ logger.handlers.clear() # you have to explicitly import the logger again, without this snakemake will not write to the log file. from snakemake import logger +# Elevate the log level of all output generated by the snakemake.logging module to CRITICAL in order to suppress it when snakemake is calling itself in a downstream process. +if "--snakefile" in sys.argv: + logging.getLogger("snakemake.logging").setLevel(logging.CRITICAL) +import ViroConstrictor + min_version("7.15") yaml.warnings({"YAMLLoadWarning": False}) @@ -873,21 +878,12 @@ rule multiqc_report: onsuccess: - print( - """ - ViroConstrictor is finished with processing all the files in the given input directory. - - Generating reports and shutting down... - """ - ) + ViroConstrictor.logging.log.info("[bold green]ViroConstrictor is finished with processing all the files in the given input directory.[/bold green]") + ViroConstrictor.logging.log.info("[bold green]Generating reports and shutting down...[/bold green]") return True onerror: - print( - """ - An error occurred and ViroConstrictor had to shut down. - Please check the input and logfiles for any abnormalities and try again. - """ - ) + ViroConstrictor.logging.log.error("[bold red]An error occurred and ViroConstrictor had to shut down.[/bold red]") + ViroConstrictor.logging.log.error("[bold red]Please check the input and logfiles for any abnormalities and try again.[/bold red]") return False diff --git a/setup.py b/setup.py index 1ff2ad4..86efaa1 100644 --- a/setup.py +++ b/setup.py @@ -75,10 +75,10 @@ ], entry_points={ "console_scripts": [ - "ViroConstrictor = ViroConstrictor.ViroConstrictor:main", - "viroconstrictor = ViroConstrictor.ViroConstrictor:main", - "viroConstrictor = ViroConstrictor.ViroConstrictor:main", - "Viroconstrictor = ViroConstrictor.ViroConstrictor:main", + "ViroConstrictor = ViroConstrictor.__main__:main", + "viroconstrictor = ViroConstrictor.__main__:main", + "viroConstrictor = ViroConstrictor.__main__:main", + "Viroconstrictor = ViroConstrictor.__main__:main", ] }, keywords=[],