From 183b3f20b66a1ed43bb2e04fbb126d8fc0da4642 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Tue, 28 Nov 2023 16:30:59 +0100 Subject: [PATCH 01/16] enh: prepare CLI script to run estimation, add ``--dry-run`` --- pyproject.toml | 4 +- sdcflows/cli/{find_estimators.py => main.py} | 68 +++++++++++++++----- sdcflows/cli/tests/test_find_estimators.py | 34 ++++++++-- sdcflows/utils/tests/test_wrangler.py | 2 +- 4 files changed, 85 insertions(+), 23 deletions(-) rename sdcflows/cli/{find_estimators.py => main.py} (52%) diff --git a/pyproject.toml b/pyproject.toml index befa79cb49..99cd5f33e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ dependencies = [ "nibabel >=3.1.0", "nipype >=1.8.5,<2.0", "traits <6.4", + "migas >= 0.4.0", "niworkflows >= 1.7.0", "nitransforms >= 23.0.1", "numpy >= 1.21.0", @@ -54,6 +55,7 @@ doc = [ "pydot >= 1.2.3", "pydotplus", "sphinx >= 7.2.2", + "sphinx-argparse", "sphinxcontrib-apidoc", "templateflow" ] @@ -83,7 +85,7 @@ tests = ["sdcflows[test]"] all = ["sdcflows[doc,test,mem,dev,test]"] [project.scripts] -sdcflows-find-estimators = "sdcflows.cli.find_estimators:main" +sdcflows = "sdcflows.cli.main:main" # # Hatch configurations diff --git a/sdcflows/cli/find_estimators.py b/sdcflows/cli/main.py similarity index 52% rename from sdcflows/cli/find_estimators.py rename to sdcflows/cli/main.py index c0192b442c..09ffd396fa 100644 --- a/sdcflows/cli/find_estimators.py +++ b/sdcflows/cli/main.py @@ -1,3 +1,26 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2023 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +"""Standalone command line executable for estimation of fieldmaps.""" import argparse from pathlib import Path @@ -12,6 +35,13 @@ def _parser(): formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("bids_dir", type=Path, help="The input BIDS directory to parse") + parser.add_argument( + "-n", + "--dry-run", + action="store_true", + default=False, + help="only find estimable fieldmaps (that is, estimation is not triggered)", + ) parser.add_argument( "-s", "--subjects", @@ -30,7 +60,7 @@ def _parser(): metavar="PATH", type=Path, help="Path to a PyBIDS database folder, for faster indexing (especially " - "useful for large datasets). Will be created if not present." + "useful for large datasets). Will be created if not present.", ) parser.add_argument( "-v", @@ -54,14 +84,16 @@ def gen_layout(bids_dir, database_dir=None): "models", "derivatives", re.compile(r"^\."), - re.compile(r"sub-[a-zA-Z0-9]+(/ses-[a-zA-Z0-9]+)?/(beh|eeg|ieeg|meg|micr|perf)"), + re.compile( + r"sub-[a-zA-Z0-9]+(/ses-[a-zA-Z0-9]+)?/(beh|eeg|ieeg|meg|micr|perf)" + ), ), ) - layout_kwargs = {'indexer': _indexer} + layout_kwargs = {"indexer": _indexer} if database_dir: - layout_kwargs['database_path'] = database_dir + layout_kwargs["database_path"] = database_dir layout = BIDSLayout(bids_dir, **layout_kwargs) return layout @@ -80,7 +112,7 @@ def main(argv=None): bids_dir = pargs.bids_dir.resolve(strict=True) layout = gen_layout(bids_dir, pargs.bids_database_dir) subjects = collect_participants(layout, pargs.subjects) - logger = create_logger('sdcflow.wrangler', level=10 if pargs.verbose else 40) + logger = create_logger("sdcflow.wrangler", level=10 if pargs.verbose else 40) estimators_record = {} for subject in subjects: estimators_record[subject] = find_estimators( @@ -91,18 +123,20 @@ def main(argv=None): ) # pretty print results - print(f"Estimation for <{str(bids_dir)}> complete. Found:") - for subject, estimators in estimators_record.items(): - print(f"\tsub-{subject}") - if not estimators: - print("\t\tNo estimators found") - continue - for estimator in estimators: - print(f"\t\t{estimator}") - for fl in estimator.sources: - fl_relpath = fl.path.relative_to(str(bids_dir / f'sub-{subject}')) - pe_dir = fl.metadata.get("PhaseEncodingDirection") - print(f"\t\t\t{pe_dir}\t{fl_relpath}") + if pargs.dry_run: + print(f"Estimation for <{str(bids_dir)}> complete. Found:") + for subject, estimators in estimators_record.items(): + print(f"\tsub-{subject}") + if not estimators: + print("\t\tNo estimators found") + continue + for estimator in estimators: + print(f"\t\t{estimator}") + for fl in estimator.sources: + fl_relpath = fl.path.relative_to(str(bids_dir / f"sub-{subject}")) + pe_dir = fl.metadata.get("PhaseEncodingDirection") + print(f"\t\t\t{pe_dir}\t{fl_relpath}") + return if __name__ == "__main__": diff --git a/sdcflows/cli/tests/test_find_estimators.py b/sdcflows/cli/tests/test_find_estimators.py index 81c79446ba..3bdd9034c8 100644 --- a/sdcflows/cli/tests/test_find_estimators.py +++ b/sdcflows/cli/tests/test_find_estimators.py @@ -1,8 +1,31 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2023 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +"""Check the CLI.""" import pytest from niworkflows.utils.testing import generate_bids_skeleton -from ..find_estimators import main as find_estimators -from ...fieldmaps import clear_registry +from sdcflows.cli.main import main as find_estimators +from sdcflows.fieldmaps import clear_registry OUTPUT = """\ Estimation for <{path}> complete. Found: @@ -102,12 +125,15 @@ @pytest.mark.parametrize( "test_id,config,estimator_id", - [("intendedfor", intendedfor_config, "auto_00000"), ("b0field", b0field_config, "pepolar")], + [ + ("intendedfor", intendedfor_config, "auto_00000"), + ("b0field", b0field_config, "pepolar"), + ], ) def test_find_estimators(tmp_path, capsys, test_id, config, estimator_id): path = tmp_path / test_id generate_bids_skeleton(path, config) - find_estimators([str(path)]) + find_estimators([str(path), "--dry-run"]) output = OUTPUT.format(path=path, estimator_id=estimator_id) out, _ = capsys.readouterr() assert out == output diff --git a/sdcflows/utils/tests/test_wrangler.py b/sdcflows/utils/tests/test_wrangler.py index 5d6ec120ad..ddc318a34b 100644 --- a/sdcflows/utils/tests/test_wrangler.py +++ b/sdcflows/utils/tests/test_wrangler.py @@ -3,7 +3,7 @@ from niworkflows.utils.testing import generate_bids_skeleton -from sdcflows.cli.find_estimators import gen_layout +from sdcflows.cli.main import gen_layout from sdcflows.utils.wrangler import find_estimators from sdcflows.fieldmaps import clear_registry From f29785f7a9f1f7dce4ccaf71092b1bcc9039cdfb Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Tue, 28 Nov 2023 16:54:51 +0100 Subject: [PATCH 02/16] enh: add CLI to documentation using sphinx extension for argparse Resolves: #392. --- docs/cli.rst | 8 ++++++++ docs/conf.py | 1 + docs/index.rst | 1 + docs/requirements.txt | 1 + 4 files changed, 11 insertions(+) create mode 100644 docs/cli.rst diff --git a/docs/cli.rst b/docs/cli.rst new file mode 100644 index 0000000000..619c7d9477 --- /dev/null +++ b/docs/cli.rst @@ -0,0 +1,8 @@ +Standalone command line usage +============================= +*SDCFlows* can execute fieldmap estimation from a BIDS-compliant dataset by using the *standalone command line interface*: + +.. argparse:: + :module: sdcflows.cli.main + :func: _parser + :prog: sdcflows diff --git a/docs/conf.py b/docs/conf.py index 09abb8c6db..a249a94531 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -35,6 +35,7 @@ "sphinx.ext.intersphinx", "sphinx.ext.mathjax", "sphinx.ext.viewcode", + "sphinxarg.ext", "sphinxcontrib.apidoc", "nbsphinx", "nipype.sphinxext.apidoc", diff --git a/docs/index.rst b/docs/index.rst index 4045ec7d62..7615905544 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -9,5 +9,6 @@ Contents installation examples methods + cli api changes diff --git a/docs/requirements.txt b/docs/requirements.txt index ae30735e4d..3a8b556259 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -15,6 +15,7 @@ pydot >= 1.2.3 pydotplus scipy sphinx >= 7.2.2 +sphinx-argparse sphinxcontrib-apidoc templateflow traits < 6.4 From d65c845fa5344f5e30911b17b493086663f7ed24 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Tue, 28 Nov 2023 17:17:05 +0100 Subject: [PATCH 03/16] enh: populate CLI with BIDS-Apps options --- sdcflows/cli/main.py | 236 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 208 insertions(+), 28 deletions(-) diff --git a/sdcflows/cli/main.py b/sdcflows/cli/main.py index 09ffd396fa..080922b743 100644 --- a/sdcflows/cli/main.py +++ b/sdcflows/cli/main.py @@ -21,53 +21,230 @@ # https://www.nipreps.org/community/licensing/ # """Standalone command line executable for estimation of fieldmaps.""" -import argparse +import re +from argparse import Action, ArgumentDefaultsHelpFormatter, ArgumentParser +from functools import partial from pathlib import Path +from sdcflows import __version__ as thisversion -def _drop_sub(value): - return value[4:] if value.startswith("sub-") else value + +def _parse_participant_labels(value): + """ + Drop ``sub-`` prefix of participant labels. + + >>> _parse_participant_labels("s060") + ['s060'] + >>> _parse_participant_labels("sub-s060") + ['s060'] + >>> _parse_participant_labels("s060 sub-s050") + ['s050', 's060'] + >>> _parse_participant_labels("s060 sub-s060") + ['s060'] + >>> _parse_participant_labels("s060\tsub-s060") + ['s060'] + + """ + return sorted( + set( + re.sub(r"^sub-", "", item.strip()) + for item in re.split(r"\s+", f"{value}".strip()) + ) + ) def _parser(): - parser = argparse.ArgumentParser( - description="Parse a BIDS directory and show what estimators are available", - formatter_class=argparse.RawDescriptionHelpFormatter, + class ParticipantLabelAction(Action): + def __call__(self, parser, namespace, values, option_string=None): + setattr(namespace, self.dest, _parse_participant_labels(" ".join(values))) + + def _path_exists(path, parser): + """Ensure a given path exists.""" + if path is None or not Path(path).exists(): + raise parser.error(f"Path does not exist: <{path}>.") + return Path(path).expanduser().absolute() + + def _min_one(value, parser): + """Ensure an argument is not lower than 1.""" + value = int(value) + if value < 1: + raise parser.error("Argument can't be less than one.") + return value + + def _to_gb(value): + scale = {"G": 1, "T": 10**3, "M": 1e-3, "K": 1e-6, "B": 1e-9} + digits = "".join([c for c in value if c.isdigit()]) + n_digits = len(digits) + units = value[n_digits:] or "G" + return int(digits) * scale[units[0]] + + def _bids_filter(value): + from json import loads + + if value and Path(value).exists(): + return loads(Path(value).read_text()) + + parser = ArgumentParser( + description=f"""\ +SDCFlows {thisversion} + +Estimate fieldmaps available in a BIDS-compliant MRI dataset.""", + formatter_class=ArgumentDefaultsHelpFormatter, ) - parser.add_argument("bids_dir", type=Path, help="The input BIDS directory to parse") + PathExists = partial(_path_exists, parser=parser) + PositiveInt = partial(_min_one, parser=parser) + parser.add_argument( - "-n", - "--dry-run", - action="store_true", - default=False, - help="only find estimable fieldmaps (that is, estimation is not triggered)", + "bids_dir", + action="store", + type=PathExists, + help="The root folder of a BIDS valid dataset (sub-XXXXX folders should " + "be found at the top level in this folder).", ) parser.add_argument( - "-s", - "--subjects", - type=_drop_sub, + "output_dir", + action="store", + type=Path, + help="The directory where the output files " + "should be stored. If you are running group level analysis " + "this folder should be prepopulated with the results of the " + "participant level analysis.", + ) + parser.add_argument( + "analysis_level", + action="store", nargs="+", - help="One or more subject identifiers", + help="Level of the analysis that will be performed. " + "Multiple participant level analyses can be run independently " + "(in parallel) using the same output_dir.", + choices=["participant", "group"], ) + + # optional arguments parser.add_argument( - "--fmapless", - action="store_true", - default=False, - help="Allow fieldmap-less estimation", + "--version", action="version", version=f"SDCFlows {thisversion}" ) parser.add_argument( + "-v", + "--verbose", + dest="verbose_count", + action="count", + default=0, + help="Increases log verbosity for each occurrence, debug level is -vvv.", + ) + + # main options + g_bids = parser.add_argument_group("Options related to BIDS") + g_bids.add_argument( + "--participant-label", + "--participant_label", + "--participant-labels", + "--participant_labels", + dest="participant_label", + action=ParticipantLabelAction, + nargs="+", + help="A space delimited list of participant identifiers or a single " + "identifier (the sub- prefix can be removed).", + ) + g_bids.add_argument( + "--bids-filter-file", + action="store", + type=Path, + metavar="PATH", + help="a JSON file describing custom BIDS input filter using pybids " + "{:{:,...},...} " + "(https://github.com/bids-standard/pybids/blob/master/bids/layout/config/bids.json)", + ) + g_bids.add_argument( "--bids-database-dir", metavar="PATH", + type=PathExists, + help="Path to an existing PyBIDS database folder, for faster indexing " + "(especially useful for large datasets).", + ) + + # General performance + g_perfm = parser.add_argument_group("Options to handle performance") + g_perfm.add_argument( + "--nprocs", + "--n_procs", + "--n_cpus", + "-n-cpus", + action="store", + type=PositiveInt, + help="""\ +Maximum number of simultaneously running parallel processes executed by *MRIQC* \ +(e.g., several instances of ANTs' registration). \ +However, when ``--nprocs`` is greater or equal to the ``--omp-nthreads`` option, \ +it also sets the maximum number of threads that simultaneously running processes \ +may aggregate (meaning, with ``--nprocs 16 --omp-nthreads 8`` a maximum of two \ +8-CPU-threaded processes will be running at a given time). \ +Under this mode of operation, ``--nprocs`` sets the maximum number of processors \ +that can be assigned work within an *MRIQC* job, which includes all the processors \ +used by currently running single- and multi-threaded processes. \ +If ``None``, the number of CPUs available will be automatically assigned (which may \ +not be what you want in, e.g., shared systems like a HPC cluster.""", + ) + g_perfm.add_argument( + "--omp-nthreads", + "--ants-nthreads", + action="store", + type=PositiveInt, + help="""\ +Maximum number of threads that multi-threaded processes executed by *MRIQC* \ +(e.g., ANTs' registration) can use. \ +If ``None``, the number of CPUs available will be automatically assigned (which may \ +not be what you want in, e.g., shared systems like a HPC cluster.""", + ) + g_perfm.add_argument( + "--mem", + "--mem_gb", + "--mem-gb", + dest="memory_gb", + action="store", + type=_to_gb, + help="Upper bound memory limit for MRIQC processes.", + ) + g_perfm.add_argument( + "--testing", + dest="debug", + action="store_true", + default=False, + help="Use testing settings for a minimal footprint.", + ) + + g_perfm.add_argument( + "--pdb", + dest="pdb", + action="store_true", + default=False, + help="Open Python debugger (pdb) on exceptions.", + ) + + # Control instruments + g_outputs = parser.add_argument_group("Instrumental options") + g_outputs.add_argument( + "-w", + "--work-dir", + action="store", type=Path, - help="Path to a PyBIDS database folder, for faster indexing (especially " - "useful for large datasets). Will be created if not present.", + default=Path("work").absolute(), + help="Path where intermediate results should be stored.", ) - parser.add_argument( - "-v", - "--verbose", + g_outputs.add_argument( + "-n", + "--dry-run", + action="store_true", + default=False, + help="only find estimable fieldmaps (that is, estimation is not triggered)", + ) + g_outputs.add_argument( + "--fmapless", action="store_true", - help="Print information while finding estimators (Useful for debugging)", + default=False, + help="Allow fieldmap-less estimation", ) + return parser @@ -103,6 +280,7 @@ def main(argv=None): """ Parse a BIDS directory and print subject estimators """ + from logging import DEBUG from niworkflows.utils.bids import collect_participants from sdcflows.utils.wrangler import find_estimators from sdcflows.utils.misc import create_logger @@ -111,8 +289,10 @@ def main(argv=None): bids_dir = pargs.bids_dir.resolve(strict=True) layout = gen_layout(bids_dir, pargs.bids_database_dir) - subjects = collect_participants(layout, pargs.subjects) - logger = create_logger("sdcflow.wrangler", level=10 if pargs.verbose else 40) + subjects = collect_participants(layout, pargs.participant_label) + logger = create_logger( + "sdcflow.wrangler", int(max(25 - 5 * pargs.verbose_count, DEBUG)) + ) estimators_record = {} for subject in subjects: estimators_record[subject] = find_estimators( From d13e7c973b0e15274a45302936574253e2e10c53 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Tue, 28 Nov 2023 17:52:46 +0100 Subject: [PATCH 04/16] wip: execute workflow --- sdcflows/cli/main.py | 5 +++++ sdcflows/workflows/fit/base.py | 38 ++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+) create mode 100644 sdcflows/workflows/fit/base.py diff --git a/sdcflows/cli/main.py b/sdcflows/cli/main.py index 080922b743..5ca50055b7 100644 --- a/sdcflows/cli/main.py +++ b/sdcflows/cli/main.py @@ -318,6 +318,11 @@ def main(argv=None): print(f"\t\t\t{pe_dir}\t{fl_relpath}") return + from sdcflows.workflows.fit.base import init_sdcflows_wf + + wf = init_sdcflows_wf(estimators_record, pargs.work_dir) + wf.run() + if __name__ == "__main__": main() diff --git a/sdcflows/workflows/fit/base.py b/sdcflows/workflows/fit/base.py new file mode 100644 index 0000000000..a3b01f107a --- /dev/null +++ b/sdcflows/workflows/fit/base.py @@ -0,0 +1,38 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2021 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +"""Build a dataset-wide estimation workflow.""" + + +def init_sdcflows_wf(estimators, work_dir): + """Create a multi-subject, multi-estimator *SDCFlows* workflow.""" + from nipype.pipeline.engine import Workflow + + # Create parent workflow + workflow = Workflow(name="sdcflows_wf") + workflow.base_dir = work_dir + + for subject, sub_estimators in estimators.items(): + for estim in sub_estimators: + workflow.add_nodes([estim.get_workflow()]) + + return workflow From ec601f69ac567e8c3ce7a477d202a7d34b944493 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Wed, 29 Nov 2023 11:56:47 +0100 Subject: [PATCH 05/16] enh: add config module and execution a la MRIQC --- sdcflows/_warnings.py | 49 ++ sdcflows/cli/main.py | 423 ++++++---------- sdcflows/cli/parser.py | 352 ++++++++++++++ sdcflows/cli/workflow.py | 52 ++ sdcflows/config.py | 667 ++++++++++++++++++++++++++ sdcflows/data/config-example.toml | 53 ++ sdcflows/utils/telemetry.py | 63 +++ sdcflows/utils/tests/test_wrangler.py | 29 +- sdcflows/workflows/fit/base.py | 23 +- 9 files changed, 1422 insertions(+), 289 deletions(-) create mode 100644 sdcflows/_warnings.py create mode 100644 sdcflows/cli/parser.py create mode 100644 sdcflows/cli/workflow.py create mode 100644 sdcflows/config.py create mode 100644 sdcflows/data/config-example.toml create mode 100644 sdcflows/utils/telemetry.py diff --git a/sdcflows/_warnings.py b/sdcflows/_warnings.py new file mode 100644 index 0000000000..e9708230ac --- /dev/null +++ b/sdcflows/_warnings.py @@ -0,0 +1,49 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2023 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +# STATEMENT OF CHANGES: This file is derived from sources licensed under the Apache-2.0 terms, +# and this file has been changed. +# The original file this work derives from is found at: +# https://github.com/nipreps/mriqc/blob/8ceadba8669cc2a86119a97b9311ab968f11c6eb/mriqc/_warnings.py +"""Manipulate Python warnings.""" +import logging +import warnings + +_wlog = logging.getLogger("py.warnings") +_wlog.addHandler(logging.NullHandler()) + + +def _warn(message, category=None, stacklevel=1, source=None): + """Redefine the warning function.""" + if category is not None: + category = type(category).__name__ + category = category.replace("type", "WARNING") + + logging.getLogger("py.warnings").warning(f"{category or 'WARNING'}: {message}") + + +def _showwarning(message, category, filename, lineno, file=None, line=None): + _warn(message, category=category) + + +warnings.warn = _warn +warnings.showwarning = _showwarning diff --git a/sdcflows/cli/main.py b/sdcflows/cli/main.py index 5ca50055b7..4cbb0de5a8 100644 --- a/sdcflows/cli/main.py +++ b/sdcflows/cli/main.py @@ -21,290 +21,61 @@ # https://www.nipreps.org/community/licensing/ # """Standalone command line executable for estimation of fieldmaps.""" -import re -from argparse import Action, ArgumentDefaultsHelpFormatter, ArgumentParser -from functools import partial -from pathlib import Path - -from sdcflows import __version__ as thisversion - - -def _parse_participant_labels(value): - """ - Drop ``sub-`` prefix of participant labels. - - >>> _parse_participant_labels("s060") - ['s060'] - >>> _parse_participant_labels("sub-s060") - ['s060'] - >>> _parse_participant_labels("s060 sub-s050") - ['s050', 's060'] - >>> _parse_participant_labels("s060 sub-s060") - ['s060'] - >>> _parse_participant_labels("s060\tsub-s060") - ['s060'] - - """ - return sorted( - set( - re.sub(r"^sub-", "", item.strip()) - for item in re.split(r"\s+", f"{value}".strip()) - ) - ) - - -def _parser(): - class ParticipantLabelAction(Action): - def __call__(self, parser, namespace, values, option_string=None): - setattr(namespace, self.dest, _parse_participant_labels(" ".join(values))) - - def _path_exists(path, parser): - """Ensure a given path exists.""" - if path is None or not Path(path).exists(): - raise parser.error(f"Path does not exist: <{path}>.") - return Path(path).expanduser().absolute() - - def _min_one(value, parser): - """Ensure an argument is not lower than 1.""" - value = int(value) - if value < 1: - raise parser.error("Argument can't be less than one.") - return value - - def _to_gb(value): - scale = {"G": 1, "T": 10**3, "M": 1e-3, "K": 1e-6, "B": 1e-9} - digits = "".join([c for c in value if c.isdigit()]) - n_digits = len(digits) - units = value[n_digits:] or "G" - return int(digits) * scale[units[0]] - - def _bids_filter(value): - from json import loads - - if value and Path(value).exists(): - return loads(Path(value).read_text()) - - parser = ArgumentParser( - description=f"""\ -SDCFlows {thisversion} - -Estimate fieldmaps available in a BIDS-compliant MRI dataset.""", - formatter_class=ArgumentDefaultsHelpFormatter, - ) - PathExists = partial(_path_exists, parser=parser) - PositiveInt = partial(_min_one, parser=parser) - - parser.add_argument( - "bids_dir", - action="store", - type=PathExists, - help="The root folder of a BIDS valid dataset (sub-XXXXX folders should " - "be found at the top level in this folder).", - ) - parser.add_argument( - "output_dir", - action="store", - type=Path, - help="The directory where the output files " - "should be stored. If you are running group level analysis " - "this folder should be prepopulated with the results of the " - "participant level analysis.", - ) - parser.add_argument( - "analysis_level", - action="store", - nargs="+", - help="Level of the analysis that will be performed. " - "Multiple participant level analyses can be run independently " - "(in parallel) using the same output_dir.", - choices=["participant", "group"], - ) - - # optional arguments - parser.add_argument( - "--version", action="version", version=f"SDCFlows {thisversion}" - ) - parser.add_argument( - "-v", - "--verbose", - dest="verbose_count", - action="count", - default=0, - help="Increases log verbosity for each occurrence, debug level is -vvv.", - ) - - # main options - g_bids = parser.add_argument_group("Options related to BIDS") - g_bids.add_argument( - "--participant-label", - "--participant_label", - "--participant-labels", - "--participant_labels", - dest="participant_label", - action=ParticipantLabelAction, - nargs="+", - help="A space delimited list of participant identifiers or a single " - "identifier (the sub- prefix can be removed).", - ) - g_bids.add_argument( - "--bids-filter-file", - action="store", - type=Path, - metavar="PATH", - help="a JSON file describing custom BIDS input filter using pybids " - "{:{:,...},...} " - "(https://github.com/bids-standard/pybids/blob/master/bids/layout/config/bids.json)", - ) - g_bids.add_argument( - "--bids-database-dir", - metavar="PATH", - type=PathExists, - help="Path to an existing PyBIDS database folder, for faster indexing " - "(especially useful for large datasets).", - ) - - # General performance - g_perfm = parser.add_argument_group("Options to handle performance") - g_perfm.add_argument( - "--nprocs", - "--n_procs", - "--n_cpus", - "-n-cpus", - action="store", - type=PositiveInt, - help="""\ -Maximum number of simultaneously running parallel processes executed by *MRIQC* \ -(e.g., several instances of ANTs' registration). \ -However, when ``--nprocs`` is greater or equal to the ``--omp-nthreads`` option, \ -it also sets the maximum number of threads that simultaneously running processes \ -may aggregate (meaning, with ``--nprocs 16 --omp-nthreads 8`` a maximum of two \ -8-CPU-threaded processes will be running at a given time). \ -Under this mode of operation, ``--nprocs`` sets the maximum number of processors \ -that can be assigned work within an *MRIQC* job, which includes all the processors \ -used by currently running single- and multi-threaded processes. \ -If ``None``, the number of CPUs available will be automatically assigned (which may \ -not be what you want in, e.g., shared systems like a HPC cluster.""", - ) - g_perfm.add_argument( - "--omp-nthreads", - "--ants-nthreads", - action="store", - type=PositiveInt, - help="""\ -Maximum number of threads that multi-threaded processes executed by *MRIQC* \ -(e.g., ANTs' registration) can use. \ -If ``None``, the number of CPUs available will be automatically assigned (which may \ -not be what you want in, e.g., shared systems like a HPC cluster.""", - ) - g_perfm.add_argument( - "--mem", - "--mem_gb", - "--mem-gb", - dest="memory_gb", - action="store", - type=_to_gb, - help="Upper bound memory limit for MRIQC processes.", - ) - g_perfm.add_argument( - "--testing", - dest="debug", - action="store_true", - default=False, - help="Use testing settings for a minimal footprint.", - ) - - g_perfm.add_argument( - "--pdb", - dest="pdb", - action="store_true", - default=False, - help="Open Python debugger (pdb) on exceptions.", - ) - - # Control instruments - g_outputs = parser.add_argument_group("Instrumental options") - g_outputs.add_argument( - "-w", - "--work-dir", - action="store", - type=Path, - default=Path("work").absolute(), - help="Path where intermediate results should be stored.", - ) - g_outputs.add_argument( - "-n", - "--dry-run", - action="store_true", - default=False, - help="only find estimable fieldmaps (that is, estimation is not triggered)", - ) - g_outputs.add_argument( - "--fmapless", - action="store_true", - default=False, - help="Allow fieldmap-less estimation", - ) - - return parser - - -def gen_layout(bids_dir, database_dir=None): - import re - from bids.layout import BIDSLayout, BIDSLayoutIndexer - - _indexer = BIDSLayoutIndexer( - validate=False, - ignore=( - "code", - "stimuli", - "sourcedata", - "models", - "derivatives", - re.compile(r"^\."), - re.compile( - r"sub-[a-zA-Z0-9]+(/ses-[a-zA-Z0-9]+)?/(beh|eeg|ieeg|meg|micr|perf)" - ), - ), - ) - - layout_kwargs = {"indexer": _indexer} - - if database_dir: - layout_kwargs["database_path"] = database_dir - - layout = BIDSLayout(bids_dir, **layout_kwargs) - return layout def main(argv=None): - """ - Parse a BIDS directory and print subject estimators - """ - from logging import DEBUG - from niworkflows.utils.bids import collect_participants - from sdcflows.utils.wrangler import find_estimators - from sdcflows.utils.misc import create_logger - - pargs = _parser().parse_args(argv) - - bids_dir = pargs.bids_dir.resolve(strict=True) - layout = gen_layout(bids_dir, pargs.bids_database_dir) - subjects = collect_participants(layout, pargs.participant_label) - logger = create_logger( - "sdcflow.wrangler", int(max(25 - 5 * pargs.verbose_count, DEBUG)) - ) - estimators_record = {} - for subject in subjects: - estimators_record[subject] = find_estimators( - layout=layout, - subject=subject, - fmapless=pargs.fmapless, - logger=logger, + """Entry point for SDCFlows' CLI.""" + import gc + import os + import sys + from tempfile import mktemp + import atexit + from sdcflows import config + from sdcflows.cli.parser import parse_args + + atexit.register(config.restore_env) + + # Run parser + parse_args(argv) + + if config.execution.pdb: + from mriqc.utils.debug import setup_exceptionhook + + setup_exceptionhook() + config.nipype.plugin = "Linear" + + # CRITICAL Save the config to a file. This is necessary because the execution graph + # is built as a separate process to keep the memory footprint low. The most + # straightforward way to communicate with the child process is via the filesystem. + # The config file name needs to be unique, otherwise multiple mriqc instances + # will create write conflicts. + config_file = mktemp( + dir=config.execution.work_dir, prefix=".sdcflows.", suffix=".toml" + ) + config.to_filename(config_file) + config.file_path = config_file + exitcode = 0 + + if config.workflow.analysis_level != ["participant"]: + raise ValueError("Analysis level can only be 'participant'") + + if config.execution.dry_run: # --dry-run: pretty print results + from niworkflows.utils.bids import collect_participants + from sdcflows.utils.wrangler import find_estimators + + subjects = collect_participants( + config.execution.layout, + config.execution.participant_label, ) - - # pretty print results - if pargs.dry_run: - print(f"Estimation for <{str(bids_dir)}> complete. Found:") + estimators_record = {} + for subject in subjects: + estimators_record[subject] = find_estimators( + layout=config.execution.layout, + subject=subject, + fmapless=config.workflow.fmapless, + logger=config.loggers.cli, + ) + print(f"Estimation for <{config.execution.bids_dir}> complete. Found:") for subject, estimators in estimators_record.items(): print(f"\tsub-{subject}") if not estimators: @@ -313,15 +84,97 @@ def main(argv=None): for estimator in estimators: print(f"\t\t{estimator}") for fl in estimator.sources: - fl_relpath = fl.path.relative_to(str(bids_dir / f"sub-{subject}")) + fl_relpath = fl.path.relative_to(config.execution.bids_dir / f"sub-{subject}") pe_dir = fl.metadata.get("PhaseEncodingDirection") print(f"\t\t\t{pe_dir}\t{fl_relpath}") - return + sys.exit(exitcode) + + # Initialize process pool if multiprocessing + _pool = None + if config.nipype.plugin in ("MultiProc", "LegacyMultiProc"): + from contextlib import suppress + import multiprocessing as mp + import multiprocessing.forkserver + from concurrent.futures import ProcessPoolExecutor + + os.environ["OMP_NUM_THREADS"] = "1" + + with suppress(RuntimeError): + mp.set_start_method("fork") + gc.collect() + + _pool = ProcessPoolExecutor( + max_workers=config.nipype.nprocs, + initializer=config._process_initializer, + initargs=(config.file_path,), + ) + + if not config.execution.notrack: + from sdcflows.utils.telemetry import setup_migas + + setup_migas() + + # CRITICAL Call build_workflow(config_file, retval) in a subprocess. + # Because Python on Linux does not ever free virtual memory (VM), running the + # workflow construction jailed within a process preempts excessive VM buildup. + from multiprocessing import Manager, Process + + with Manager() as mgr: + from sdcflows.cli.workflow import build_workflow + + retval = mgr.dict() + p = Process(target=build_workflow, args=(str(config_file), retval)) + p.start() + p.join() + + sdcflows_wf = retval.get("workflow", None) + exitcode = p.exitcode or retval.get("return_code", 0) + + # CRITICAL Load the config from the file. This is necessary because the ``build_workflow`` + # function executed constrained in a process may change the config (and thus the global + # state of MRIQC). + config.load(config_file) + + exitcode = exitcode or (sdcflows_wf is None) * os.EX_SOFTWARE + if exitcode != 0: + sys.exit(exitcode) + + # Initialize nipype config + config.nipype.init() + # Make sure loggers are started + config.loggers.init() + + # Resource management options + if config.nipype.plugin in ("MultiProc", "LegacyMultiProc") and ( + 1 < config.nipype.nprocs < config.nipype.omp_nthreads + ): + config.loggers.cli.warning( + "Per-process threads (--omp-nthreads=%d) exceed total " + "threads (--nthreads/--n_cpus=%d)", + config.nipype.omp_nthreads, + config.nipype.nprocs, + ) - from sdcflows.workflows.fit.base import init_sdcflows_wf + if sdcflows_wf is None: + sys.exit(os.EX_SOFTWARE) - wf = init_sdcflows_wf(estimators_record, pargs.work_dir) - wf.run() + if sdcflows_wf and config.execution.write_graph: + sdcflows_wf.write_graph(graph2use="colored", format="svg", simple_form=True) + + # Clean up master process before running workflow, which may create forks + gc.collect() + # run MRIQC + _plugin = config.nipype.get_plugin() + if _pool: + from mriqc.engine.plugin import MultiProcPlugin + + _plugin = { + "plugin": MultiProcPlugin( + pool=_pool, plugin_args=config.nipype.plugin_args + ), + } + sdcflows_wf.run(**_plugin) + config.loggers.cli.log(25, "Finished all workload") if __name__ == "__main__": diff --git a/sdcflows/cli/parser.py b/sdcflows/cli/parser.py new file mode 100644 index 0000000000..76fdb1e07d --- /dev/null +++ b/sdcflows/cli/parser.py @@ -0,0 +1,352 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2023 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +"""Standalone command line executable for estimation of fieldmaps.""" +import re +from argparse import Action, ArgumentDefaultsHelpFormatter, ArgumentParser +from functools import partial +from pathlib import Path + +from sdcflows import config + + +def _parse_participant_labels(value): + """ + Drop ``sub-`` prefix of participant labels. + + >>> _parse_participant_labels("s060") + ['s060'] + >>> _parse_participant_labels("sub-s060") + ['s060'] + >>> _parse_participant_labels("s060 sub-s050") + ['s050', 's060'] + >>> _parse_participant_labels("s060 sub-s060") + ['s060'] + >>> _parse_participant_labels("s060\tsub-s060") + ['s060'] + + """ + return sorted( + set( + re.sub(r"^sub-", "", item.strip()) + for item in re.split(r"\s+", f"{value}".strip()) + ) + ) + + +def _parser(): + class ParticipantLabelAction(Action): + def __call__(self, parser, namespace, values, option_string=None): + setattr(namespace, self.dest, _parse_participant_labels(" ".join(values))) + + def _path_exists(path, parser): + """Ensure a given path exists.""" + if path is None or not Path(path).exists(): + raise parser.error(f"Path does not exist: <{path}>.") + return Path(path).expanduser().absolute() + + def _min_one(value, parser): + """Ensure an argument is not lower than 1.""" + value = int(value) + if value < 1: + raise parser.error("Argument can't be less than one.") + return value + + def _to_gb(value): + scale = {"G": 1, "T": 10**3, "M": 1e-3, "K": 1e-6, "B": 1e-9} + digits = "".join([c for c in value if c.isdigit()]) + n_digits = len(digits) + units = value[n_digits:] or "G" + return int(digits) * scale[units[0]] + + def _bids_filter(value): + from json import loads + + if value and Path(value).exists(): + return loads(Path(value).read_text()) + + parser = ArgumentParser( + description=f"""\ +SDCFlows {config.environment.version} + +Estimate fieldmaps available in a BIDS-compliant MRI dataset.""", + formatter_class=ArgumentDefaultsHelpFormatter, + ) + PathExists = partial(_path_exists, parser=parser) + PositiveInt = partial(_min_one, parser=parser) + + parser.add_argument( + "bids_dir", + action="store", + type=PathExists, + help="The root folder of a BIDS valid dataset (sub-XXXXX folders should " + "be found at the top level in this folder).", + ) + parser.add_argument( + "output_dir", + action="store", + type=Path, + help="The directory where the output files " + "should be stored. If you are running group level analysis " + "this folder should be prepopulated with the results of the " + "participant level analysis.", + ) + parser.add_argument( + "analysis_level", + action="store", + nargs="+", + help="Level of the analysis that will be performed. " + "Multiple participant level analyses can be run independently " + "(in parallel) using the same output_dir.", + choices=["participant", "group"], + ) + + # optional arguments + parser.add_argument( + "--version", action="version", version=f"SDCFlows {config.environment.version}" + ) + parser.add_argument( + "-v", + "--verbose", + dest="verbose_count", + action="count", + default=0, + help="Increases log verbosity for each occurrence, debug level is -vvv.", + ) + + # main options + g_bids = parser.add_argument_group("Options related to BIDS") + g_bids.add_argument( + "--participant-label", + "--participant_label", + "--participant-labels", + "--participant_labels", + dest="participant_label", + action=ParticipantLabelAction, + nargs="+", + help="A space delimited list of participant identifiers or a single " + "identifier (the sub- prefix can be removed).", + ) + g_bids.add_argument( + "--session", + action="store", + nargs="*", + type=str, + help="Filter input dataset by session label.", + ) + g_bids.add_argument( + "--bids-filter-file", + action="store", + type=Path, + metavar="PATH", + help="a JSON file describing custom BIDS input filter using pybids " + "{:{:,...},...} " + "(https://github.com/bids-standard/pybids/blob/master/bids/layout/config/bids.json)", + ) + g_bids.add_argument( + "--bids-database-dir", + metavar="PATH", + type=PathExists, + help="Path to an existing PyBIDS database folder, for faster indexing " + "(especially useful for large datasets).", + ) + g_bids.add_argument( + "--bids-database-wipe", + action="store_true", + default=False, + help="Wipe out previously existing BIDS indexing caches, forcing re-indexing.", + ) + + # General performance + g_perfm = parser.add_argument_group("Options to handle performance") + g_perfm.add_argument( + "--nprocs", + "--n_procs", + "--n_cpus", + "-n-cpus", + action="store", + type=PositiveInt, + help="""\ +Maximum number of simultaneously running parallel processes executed by *SDCFlows* \ +(e.g., several instances of ANTs' registration). \ +However, when ``--nprocs`` is greater or equal to the ``--omp-nthreads`` option, \ +it also sets the maximum number of threads that simultaneously running processes \ +may aggregate (meaning, with ``--nprocs 16 --omp-nthreads 8`` a maximum of two \ +8-CPU-threaded processes will be running at a given time). \ +Under this mode of operation, ``--nprocs`` sets the maximum number of processors \ +that can be assigned work within an *SDCFlows* job, which includes all the processors \ +used by currently running single- and multi-threaded processes. \ +If ``None``, the number of CPUs available will be automatically assigned (which may \ +not be what you want in, e.g., shared systems like a HPC cluster.""", + ) + g_perfm.add_argument( + "--omp-nthreads", + "--ants-nthreads", + action="store", + type=PositiveInt, + help="""\ +Maximum number of threads that multi-threaded processes executed by *SDCFlows* \ +(e.g., ANTs' registration) can use. \ +If ``None``, the number of CPUs available will be automatically assigned (which may \ +not be what you want in, e.g., shared systems like a HPC cluster.""", + ) + g_perfm.add_argument( + "--mem", + "--mem_gb", + "--mem-gb", + dest="memory_gb", + action="store", + type=_to_gb, + help="Upper bound memory limit for SDCFlows processes.", + ) + g_perfm.add_argument( + "--testing", + dest="debug", + action="store_true", + default=False, + help="Use testing settings for a minimal footprint.", + ) + + g_perfm.add_argument( + "--pdb", + dest="pdb", + action="store_true", + default=False, + help="Open Python debugger (pdb) on exceptions.", + ) + + # Control instruments + g_outputs = parser.add_argument_group("Instrumental options") + g_outputs.add_argument( + "-w", + "--work-dir", + action="store", + type=Path, + default=Path("work").absolute(), + help="Path where intermediate results should be stored.", + ) + g_outputs.add_argument( + "-n", + "--dry-run", + action="store_true", + default=False, + help="only find estimable fieldmaps (that is, estimation is not triggered)", + ) + g_outputs.add_argument( + "--fmapless", + action="store_true", + default=False, + help="Allow fieldmap-less estimation", + ) + g_outputs.add_argument( + "--use-plugin", + action="store", + default=None, + type=Path, + help="Nipype plugin configuration file.", + ) + g_outputs.add_argument( + "--notrack", + action="store_true", + help="Opt-out of sending tracking information of this run to the NiPreps developers. This" + " information helps to improve MRIQC and provides an indicator of real world usage " + " crucial for obtaining funding.", + ) + + return parser + + +def parse_args(args=None, namespace=None): + """Parse args and run further checks on the command line.""" + from logging import DEBUG + from json import loads + + parser = _parser() + opts = parser.parse_args(args, namespace) + config.execution.log_level = int(max(25 - 5 * opts.verbose_count, DEBUG)) + config.from_dict(vars(opts)) + + # Load base plugin_settings from file if --use-plugin + if opts.use_plugin is not None: + from yaml import load as loadyml + + with open(opts.use_plugin) as f: + plugin_settings = loadyml(f) + _plugin = plugin_settings.get("plugin") + if _plugin: + config.nipype.plugin = _plugin + config.nipype.plugin_args = plugin_settings.get("plugin_args", {}) + config.nipype.nprocs = config.nipype.plugin_args.get( + "nprocs", config.nipype.nprocs + ) + + # Load BIDS filters + if opts.bids_filter_file: + config.execution.bids_filters = loads(opts.bids_filter_file.read_text()) + + bids_dir = config.execution.bids_dir + output_dir = config.execution.output_dir + work_dir = config.execution.work_dir + version = config.environment.version + + # Ensure input and output folders are not the same + if output_dir == bids_dir: + parser.error( + "The selected output folder is the same as the input BIDS folder. " + "Please modify the output path (suggestion: %s)." + % bids_dir + / "derivatives" + / ("sdcflows_%s" % version.split("+")[0]) + ) + + if bids_dir in work_dir.parents: + parser.error( + "The selected working directory is a subdirectory of the input BIDS folder. " + "Please modify the output path." + ) + + # Setup directories + config.execution.log_dir = output_dir / "logs" + # Check and create output and working directories + config.execution.log_dir.mkdir(exist_ok=True, parents=True) + output_dir.mkdir(exist_ok=True, parents=True) + work_dir.mkdir(exist_ok=True, parents=True) + + # Force initialization of the BIDSLayout + config.execution.init() + + participant_label = config.execution.layout.get_subjects() + if config.execution.participant_label is not None: + selected_label = set(config.execution.participant_label) + missing_subjects = selected_label - set(participant_label) + if missing_subjects: + parser.error( + "One or more participant labels were not found in the BIDS directory: " + f"{', '.join(missing_subjects)}." + ) + participant_label = selected_label + + config.execution.participant_label = sorted(participant_label) + + # Handle analysis_level + analysis_level = set(config.workflow.analysis_level) + config.workflow.analysis_level = list(analysis_level) diff --git a/sdcflows/cli/workflow.py b/sdcflows/cli/workflow.py new file mode 100644 index 0000000000..3e2a7932ca --- /dev/null +++ b/sdcflows/cli/workflow.py @@ -0,0 +1,52 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2023 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +"""Build the SDCFlows' standalone workflow.""" + + +def build_workflow(config_file, retval): + """Create the Nipype Workflow that supports the whole execution graph.""" + import os + # We do not need OMP > 1 for workflow creation + os.environ["OMP_NUM_THREADS"] = "1" + + from sdcflows import config + from sdcflows.workflows.fit.base import init_sdcflows_wf + + config.load(config_file) + # Initialize nipype config + config.nipype.init() + # Make sure loggers are started + config.loggers.init() + + config.loggers.cli.log(25, f"""\ +Running SDCFlows {config.environment.version}: + * BIDS dataset path: {config.execution.bids_dir}. + * Output folder: {config.execution.output_dir}. + * Analysis levels: {config.workflow.analysis_level}. +""") + + retval["return_code"] = 1 + retval["workflow"] = None + retval["workflow"] = init_sdcflows_wf() + retval["return_code"] = int(retval["workflow"] is None) + return retval diff --git a/sdcflows/config.py b/sdcflows/config.py new file mode 100644 index 0000000000..6d47287287 --- /dev/null +++ b/sdcflows/config.py @@ -0,0 +1,667 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2023 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +# STATEMENT OF CHANGES: This file is derived from sources licensed under the Apache-2.0 terms, +# and this file has been changed. +# The original file this work derives from is found at: +# https://github.com/nipreps/mriqc/blob/8ceadba8669cc2a86119a97b9311ab968f11c6eb/mriqc/config.py +# +# [Nov 2023] CHANGES: +# * ENH: Adapt the config module structure from MRIQC to SDCFlows +# +# ORIGINAL WORK'S ATTRIBUTION NOTICE: +# +# Copyright 2023 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +r""" +A Python module to maintain unique, run-wide *SDCFlows* settings. + +This module implements the memory structures to keep a consistent, singleton config. +Settings are passed across processes via filesystem, and a copy of the settings for +each run and subject is left under +``/sub-/log//sdcflows.toml``. +Settings are stored using :abbr:`ToML (Tom's Markup Language)`. +The module has a :py:func:`~sdcflows.config.to_filename` function to allow writing out +the settings to hard disk in *ToML* format, which looks like: + +.. literalinclude:: ../sdcflows/data/config-example.toml + :language: toml + :name: sdcflows.toml + :caption: **Example file representation of SDCFlows settings**. + +This config file is used to pass the settings across processes, +using the :py:func:`~sdcflows.config.load` function. + +Configuration sections +---------------------- +.. autoclass:: environment + :members: +.. autoclass:: execution + :members: +.. autoclass:: workflow + :members: +.. autoclass:: nipype + :members: + +Usage +----- +A config file is used to pass settings and collect information as the execution +graph is built across processes. + +.. code-block:: Python + + from sdcflows import config + config_file = mktemp(dir=config.execution.work_dir, prefix='.sdcflows.', suffix='.toml') + config.to_filename(config_file) + # Call build_workflow(config_file, retval) in a subprocess + with Manager() as mgr: + from .workflow import build_workflow + retval = mgr.dict() + p = Process(target=build_workflow, args=(str(config_file), retval)) + p.start() + p.join() + config.load(config_file) + # Access configs from any code section as: + value = config.section.setting + +Logging +------- +.. autoclass:: loggers + :members: + +Other responsibilities +---------------------- +The :py:mod:`config` is responsible for other conveniency actions. + + * Switching Python's :obj:`multiprocessing` to *forkserver* mode. + * Set up a filter for warnings as early as possible. + * Automated I/O magic operations. Some conversions need to happen in the + store/load processes (e.g., from/to :obj:`~pathlib.Path` \<-\> :obj:`str`, + :py:class:`~bids.layout.BIDSLayout`, etc.) + +""" +import os +import sys +from pathlib import Path +from time import strftime +from uuid import uuid4 + +try: + # This option is only available with Python 3.8 + from importlib.metadata import version as get_version +except ImportError: + from importlib_metadata import version as get_version + +# Ignore annoying warnings +from sdcflows._warnings import logging + +__version__ = get_version("sdcflows") +_pre_exec_env = dict(os.environ) + +# Reduce numpy's vms by limiting OMP_NUM_THREADS +_default_omp_threads = int(os.getenv("OMP_NUM_THREADS", os.cpu_count())) + +# Disable NiPype etelemetry always +_disable_et = bool( + os.getenv("NO_ET") is not None or os.getenv("NIPYPE_NO_ET") is not None +) +os.environ["NIPYPE_NO_ET"] = "1" +os.environ["NO_ET"] = "1" + +if not hasattr(sys, "_is_pytest_session"): + sys._is_pytest_session = False # Trick to avoid sklearn's FutureWarnings +# Disable all warnings in main and children processes only on production versions +if not any( + ( + "+" in __version__, + __version__.endswith(".dirty"), + os.getenv("SDCFLOWS_DEV", "0").lower() in ("1", "on", "true", "y", "yes"), + ) +): + os.environ["PYTHONWARNINGS"] = "ignore" + +logging.addLevelName(25, "IMPORTANT") # Add a new level between INFO and WARNING +logging.addLevelName(15, "VERBOSE") # Add a new level between INFO and DEBUG + +DEFAULT_MEMORY_MIN_GB = 0.01 + +_exec_env = os.name +_docker_ver = None +# special variable set in the container +if os.getenv("IS_DOCKER_8395080871"): + _exec_env = "singularity" + _cgroup = Path("/proc/1/cgroup") + if _cgroup.exists() and "docker" in _cgroup.read_text(): + _docker_ver = os.getenv("DOCKER_VERSION_8395080871") + _exec_env = "docker" + del _cgroup + +_templateflow_home = Path( + os.getenv( + "TEMPLATEFLOW_HOME", + os.path.join(os.getenv("HOME"), ".cache", "templateflow"), + ) +) + +try: + from psutil import virtual_memory + + _free_mem_at_start = round(virtual_memory().free / 1024**3, 1) +except Exception: + _free_mem_at_start = None + +_oc_limit = "n/a" +_oc_policy = "n/a" +try: + # Memory policy may have a large effect on types of errors experienced + _proc_oc_path = Path("/proc/sys/vm/overcommit_memory") + if _proc_oc_path.exists(): + _oc_policy = {"0": "heuristic", "1": "always", "2": "never"}.get( + _proc_oc_path.read_text().strip(), "unknown" + ) + if _oc_policy != "never": + _proc_oc_kbytes = Path("/proc/sys/vm/overcommit_kbytes") + if _proc_oc_kbytes.exists(): + _oc_limit = _proc_oc_kbytes.read_text().strip() + if ( + _oc_limit in ("0", "n/a") + and Path("/proc/sys/vm/overcommit_ratio").exists() + ): + _oc_limit = "{}%".format( + Path("/proc/sys/vm/overcommit_ratio").read_text().strip() + ) +except Exception: + pass + +_memory_gb = None +try: + if "linux" in sys.platform: + with open("/proc/meminfo", "r") as f_in: + _meminfo_lines = f_in.readlines() + _mem_total_line = [line for line in _meminfo_lines if "MemTotal" in line][0] + _mem_total = float(_mem_total_line.split()[1]) + _memory_gb = _mem_total / (1024.0**2) + elif "darwin" in sys.platform: + _mem_str = os.popen("sysctl hw.memsize").read().strip().split(" ")[-1] + _memory_gb = float(_mem_str) / (1024.0**3) +except Exception: + pass + +file_path: Path = None +""" +Path to configuration file. +""" + + +class _Config: + """An abstract class forbidding instantiation.""" + + _paths = tuple() + + def __init__(self): + """Avert instantiation.""" + raise RuntimeError("Configuration type is not instantiable.") + + @classmethod + def load(cls, settings, init=True): + """Store settings from a dictionary.""" + for k, v in settings.items(): + if v is None: + continue + if k in cls._paths: + setattr(cls, k, Path(v).absolute()) + continue + if hasattr(cls, k): + setattr(cls, k, v) + + if init: + try: + cls.init() + except AttributeError: + pass + + @classmethod + def get(cls): + """Return defined settings.""" + out = {} + for k, v in cls.__dict__.items(): + if k.startswith("_") or v is None: + continue + if callable(getattr(cls, k)): + continue + if k in cls._paths: + v = str(v) + out[k] = v + return out + + +class environment(_Config): + """ + Read-only options regarding the platform and environment. + + Crawls runtime descriptive settings (e.g., default FreeSurfer license, + execution environment, nipype and *SDCFlows* versions, etc.). + The ``environment`` section is not loaded in from file, + only written out when settings are exported. + This config section is useful when reporting issues, + and these variables are tracked whenever the user does not + opt-out using the ``--notrack`` argument. + + """ + + cpu_count = os.cpu_count() + """Number of available CPUs.""" + exec_docker_version = _docker_ver + """Version of Docker Engine.""" + exec_env = _exec_env + """A string representing the execution platform.""" + free_mem = _free_mem_at_start + """Free memory at start.""" + overcommit_policy = _oc_policy + """Linux's kernel virtual memory overcommit policy.""" + overcommit_limit = _oc_limit + """Linux's kernel virtual memory overcommit limits.""" + nipype_version = get_version("nipype") + """Nipype's current version.""" + templateflow_version = get_version("templateflow") + """The TemplateFlow client version installed.""" + total_memory = _memory_gb + """Total memory available, in GB.""" + version = __version__ + """*SDCFlows*' version.""" + _pre_sdcflows = _pre_exec_env + """Environment variables before *SDCFlows*' execution.""" + + +class nipype(_Config): + """Nipype settings.""" + + crashfile_format = "txt" + """The file format for crashfiles, either text or pickle.""" + get_linked_libs = False + """Run NiPype's tool to enlist linked libraries for every interface.""" + local_hash_check = True + """Check if interface is cached locally before executing.""" + memory_gb = None + """Estimation in GB of the RAM this workflow can allocate at any given time.""" + nprocs = os.cpu_count() + """Number of processes (compute tasks) that can be run in parallel (multiprocessing only).""" + omp_nthreads = _default_omp_threads + """Number of CPUs a single process can access for multithreaded execution.""" + plugin = "MultiProc" + """NiPype's execution plugin.""" + plugin_args = { + "maxtasksperchild": 1, + "raise_insufficient": False, + } + """Settings for NiPype's execution plugin.""" + remove_node_directories = False + """Remove directories whose outputs have already been used up.""" + resource_monitor = False + """Enable resource monitor.""" + stop_on_first_crash = True + """Whether the workflow should stop or continue after the first error.""" + + @classmethod + def get_plugin(cls): + """Format a dictionary for Nipype consumption.""" + out = { + "plugin": cls.plugin, + "plugin_args": cls.plugin_args, + } + if cls.plugin in ("MultiProc", "LegacyMultiProc"): + out["plugin_args"]["n_procs"] = int(cls.nprocs) + if cls.memory_gb: + out["plugin_args"]["memory_gb"] = float(cls.memory_gb) + return out + + @classmethod + def init(cls): + """Set NiPype configurations.""" + from nipype import config as ncfg + + # Nipype config (logs and execution) + ncfg.update_config( + { + "execution": { + "crashdump_dir": str(execution.log_dir), + "crashfile_format": cls.crashfile_format, + "get_linked_libs": cls.get_linked_libs, + "stop_on_first_crash": cls.stop_on_first_crash, + } + } + ) + + +class execution(_Config): + """Configure run-level settings.""" + + ants_float = False + """Use float number precision for ANTs computations.""" + bids_dir = None + """An existing path to the dataset, which must be BIDS-compliant.""" + bids_database_dir = None + """Path to the directory containing SQLite database indices for the input BIDS dataset.""" + bids_database_wipe = False + """Wipe out previously existing BIDS indexing caches, forcing re-indexing.""" + bids_description_hash = None + """Checksum (SHA256) of the ``dataset_description.json`` of the BIDS dataset.""" + bids_filters = None + """A dictionary describing custom BIDS input filter using PyBIDS.""" + cwd = os.getcwd() + """Current working directory.""" + debug = False + """Run in sloppy mode (meaning, suboptimal parameters that minimize run-time).""" + dry_run = False + """Just test, do not run.""" + layout = None + """A :py:class:`~bids.layout.BIDSLayout` object, see :py:func:`init`.""" + log_dir = None + """The path to a directory that contains execution logs.""" + log_level = 25 + """Output verbosity.""" + notrack = False + """Disable the sharing of usage information with developers.""" + output_dir = None + """Folder where derivatives will be stored.""" + participant_label = None + """List of participant identifiers that are to be preprocessed.""" + pdb = False + """Drop into PDB when exceptions are encountered.""" + run_uuid = "{}_{}".format(strftime("%Y%m%d-%H%M%S"), uuid4()) + """Unique identifier of this particular run.""" + session = None + """Filter input dataset by session identifier.""" + templateflow_home = _templateflow_home + """The root folder of the TemplateFlow client.""" + work_dir = Path("work").absolute() + """Path to a working directory where intermediate results will be available.""" + write_graph = False + """Write out the computational graph corresponding to the planned preprocessing.""" + + _layout = None + + _paths = ( + "bids_dir", + "bids_database_dir", + "layout", + "log_dir", + "output_dir", + "templateflow_home", + "work_dir", + ) + + @classmethod + def init(cls): + """Create a new BIDS Layout accessible with :attr:`~execution.layout`.""" + + if cls.bids_filters is None: + cls.bids_filters = {} + + if cls._layout is None: + import re + from bids.layout.index import BIDSLayoutIndexer + from bids.layout import BIDSLayout + + ignore_paths = [ + # Ignore folders at the top if they don't start with /sub-