Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add migas telemetry in addition to sentry #2817

Merged
merged 5 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ jobs:
- FS_LICENSE: /tmp/fslicense/license.txt
- DATASET: ds005
- CHECK_DS005: true
- MIGAS_OPTOUT: 1
steps:
- checkout:
path: *src
Expand Down Expand Up @@ -593,6 +594,7 @@ jobs:
- FS_LICENSE: /tmp/fslicense/license.txt
- DATASET: ds054
- CHECK_DS054: true
- MIGAS_OPTOUT: 1
steps:
- checkout:
path: *src
Expand Down Expand Up @@ -759,7 +761,7 @@ jobs:
--output-layout legacy \
--fs-no-reconall --sloppy --write-graph \
--output-spaces MNI152NLin2009cAsym \
--mem-mb 14336 --nthreads 4 --anat-only -vv
--mem-mb 14336 --nthreads 4 --anat-only -vv --notrack
fi
- run:
name: Clean-up after anatomical run
Expand Down
54 changes: 46 additions & 8 deletions fmriprep/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
"""fMRI preprocessing workflow."""
from .. import config

EXITCODE: int = -1
effigies marked this conversation as resolved.
Show resolved Hide resolved


def main():
"""Entry point."""
Expand Down Expand Up @@ -61,11 +63,15 @@ def main():

sentry_sdk = None
if not config.execution.notrack and not config.execution.debug:
import atexit

import sentry_sdk

from ..utils.sentry import sentry_setup
from ..utils.telemetry import sentry_setup, setup_migas

sentry_setup()
setup_migas(init_ping=True)
atexit.register(_migas_exit)
mgxd marked this conversation as resolved.
Show resolved Hide resolved

# CRITICAL Save the config to a file. This is necessary because the execution graph
# is built as a separate process to keep the memory footprint low. The most
Expand All @@ -91,7 +97,8 @@ def main():
else:
retval = build_workflow(str(config_file), {})

retcode = retval.get("return_code", 0)
global EXITCODE
EXITCODE = retval.get("return_code", 0)
fmriprep_wf = retval.get("workflow", None)

# CRITICAL Load the config from the file. This is necessary because the ``build_workflow``
Expand All @@ -100,14 +107,14 @@ def main():
config.load(config_file)

if config.execution.reports_only:
sys.exit(int(retcode > 0))
sys.exit(int(EXITCODE > 0))

if fmriprep_wf and config.execution.write_graph:
fmriprep_wf.write_graph(graph2use="colored", format="svg", simple_form=True)

retcode = retcode or (fmriprep_wf is None) * EX_SOFTWARE
if retcode != 0:
sys.exit(retcode)
EXITCODE = EXITCODE or (fmriprep_wf is None) * EX_SOFTWARE
if EXITCODE != 0:
sys.exit(EXITCODE)

# Generate boilerplate
with Manager() as mgr:
Expand All @@ -118,7 +125,7 @@ def main():
p.join()

if config.execution.boilerplate_only:
sys.exit(int(retcode > 0))
sys.exit(int(EXITCODE > 0))

# Clean up master process before running workflow, which may create forks
gc.collect()
Expand All @@ -141,7 +148,7 @@ def main():
fmriprep_wf.run(**config.nipype.get_plugin())
except Exception as e:
if not config.execution.notrack:
from ..utils.sentry import process_crashfile
from ..utils.telemetry import process_crashfile

crashfolders = [
config.execution.fmriprep_dir
Expand Down Expand Up @@ -221,6 +228,37 @@ def main():
sys.exit(int((errno + failed_reports) > 0))


def migas_exit() -> None:
"""
Send a final crumb to the migas server signaling if the run successfully completed
This function should be registered with `atexit` to run at termination.
"""
import sys

from ..utils.telemetry import send_breadcrumb

global EXITCODE
migas_kwargs = {'status': 'C'}
# `sys` will not have these attributes unless an error has been handled
if hasattr(sys, 'last_type'):
migas_kwargs = {
'status': 'F',
'status_desc': 'Finished with error(s)',
'error_type': sys.last_type,
'error_desc': sys.last_value,
}
elif EXITCODE != 0:
migas_kwargs.update(
{
'status': 'F',
'status_desc': f'Completed with exitcode {EXITCODE}',
}
)
else:
migas_kwargs['status_desc'] = 'Success'
send_breadcrumb(**migas_kwargs)


if __name__ == "__main__":
raise RuntimeError(
"fmriprep/cli/run.py should not be run directly;\n"
Expand Down
2 changes: 1 addition & 1 deletion fmriprep/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ class execution(_Config):
md_only_boilerplate = False
"""Do not convert boilerplate from MarkDown to LaTex and HTML."""
notrack = False
"""Do not monitor *fMRIPrep* using Sentry.io."""
"""Do not collect telemetry information for *fMRIPrep*."""
track_carbon = False
"""Tracks power draws using CodeCarbon package."""
country_code = "CAN"
Expand Down
29 changes: 27 additions & 2 deletions fmriprep/utils/sentry.py → fmriprep/utils/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@
#
# https://www.nipreps.org/community/licensing/
#
"""Stripped out routines for Sentry."""
"""Stripped out routines for telemetry"""
import os
import re

from nibabel.optpkg import optional_package
from niworkflows.utils.misc import read_crashfile

from .. import config
from .. import __version__, config

sentry_sdk = optional_package("sentry_sdk")[0]
migas = optional_package("migas")[0]

CHUNK_SIZE = 16384
# Group common events with pre specified fingerprints
Expand Down Expand Up @@ -182,3 +183,27 @@ def _chunks(string, length=CHUNK_SIZE):

"""
return (string[i : i + length] for i in range(0, len(string), length))


def setup_migas(init: bool = True) -> None:
"""
Prepare the migas python client to communicate with a migas server.
If ``init`` is ``True``, send an initial breadcrumb.
"""
# generate session UUID from generated run UUID
session_id = None
if config.execution.run_uuid:
session_id = config.execution.run_uuid.split('_', 1)[-1]

migas.setup(session_id=session_id)
if init:
# send initial status ping
send_breadcrumb(status='R', status_desc='workflow start')
mgxd marked this conversation as resolved.
Show resolved Hide resolved


def send_breadcrumb(**kwargs) -> dict:
"""
Communicate with the migas telemetry server. This requires `migas.setup()` to be called.
"""
res = migas.add_project("nipreps/fmriprep", __version__, **kwargs)
return res
6 changes: 4 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ docs =
%(doc)s
duecredit = duecredit
resmon =
sentry = sentry-sdk ~= 1.3.0
telemetry =
migas >= 0.3.0
sentry-sdk ~= 1.3.0
tests =
coverage
codecov
Expand All @@ -84,7 +86,7 @@ all =
%(doc)s
%(duecredit)s
%(maint)s
%(sentry)s
%(telemetry)s
%(tests)s

[options.package_data]
Expand Down