Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upated user script for DQM job launcher #61

Merged
merged 8 commits into from
Jun 9, 2023
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/env bash
#
# Time-stamp: "2023-01-30 16:21:22 jlenain"
# Time-stamp: "2023-05-30 12:05:14 jlenain"


function usage ()
Expand Down Expand Up @@ -41,52 +41,75 @@ if [ -z $runnb ]; then
exit 1
fi

CONTAINER=nectarchain_w_2023_02.sif
WRAPPER="singularity_wrapper.sh"
CONTAINER="oras://ghcr.io/cta-observatory/nectarchain:latest"
OUTDIR=NectarCAM_DQM_Run${runnb}
DIRAC_OUTDIR=/vo.cta.in2p3.fr/user/j/jlenain/nectarcam/dqm

function exit_script() {
return_code=$1

# Some cleanup before leaving:
# [ -d $CONTAINER ] && rm -rf $CONTAINER
# [ -f $CONTAINER ] && rm -f $CONTAINER
[ -d $OUTDIR ] && rm -rf $OUTDIR
[ -f ${OUTDIR}.tar.gz ] && rm -f ${OUTDIR}.tar.gz
[ -d ${OUTDIR} ] && rm -rf ${OUTDIR}
[ -f $WRAPPER ] && rm -f $WRAPPER

exit $return_code
}

# Halim's DQM code needs to use a specific output directory:
export NECTARDIR=$PWD/$OUTDIR
[ ! -d $NECTARDIR ] && mkdir -p $NECTARDIR
mv nectarcam*.sqlite NectarCAM.Run*.fits.fz $NECTARDIR/.
[ ! -d $NECTARDIR ] && mkdir -p $NECTARDIR || exit_script $?
# mv nectarcam*.sqlite NectarCAM.Run*.fits.fz $NECTARDIR/.

LISTRUNS=""
for run in $NECTARDIR/NectarCAM.Run${runnb}.*.fits.fz; do
LISTRUNS="$LISTRUNS $run"
for run in $PWD/NectarCAM.Run${runnb}.*.fits.fz; do
LISTRUNS="$LISTRUNS $(basename $run)"
done

# Create a wrapper BASH script with cleaned environment, see https://redmine.cta-observatory.org/issues/51483
WRAPPER="sing.sh"
cat > $WRAPPER <<EOF
#!/bin/env bash
echo "Cleaning environment \$CLEANED_ENV"
[ -z "\$CLEANED_ENV" ] && exec /bin/env -i CLEANED_ENV="Done" HOME=\${HOME} SHELL=/bin/bash /bin/bash -l "\$0" "\$@"


# Some environment variables related to python, to be passed to container:
# Some environment variables related to python, to be passed to container, be it for old Singularity version or recent Apptainer ones:
export SINGULARITYENV_MPLCONFIGDIR=/tmp
export SINGULARITYENV_NUMBA_CACHE_DIR=/tmp
export SINGULARITYENV_NECTARDIR=$NECTARDIR

export APPTAINERENV_MPLCONFIGDIR=/tmp
export APPTAINERENV_NUMBA_CACHE_DIR=/tmp
export APPTAINERENV_NECTARDIR=$NECTARDIR

# Handle Singularity or Apptainer case:
if command -v singularity &> /dev/null; then
CALLER=singularity
elif command -v apptainer &> /dev/null; then
CALLER=apptainer
else
echo "It seems neither Singularity nor Apptainer are available from here"
exit 1
fi

echo
echo "Running"
# Instantiate the nectarchain Singularity image, run our DQM example run within it:
cmd="singularity exec --home $PWD $CONTAINER /opt/conda/envs/nectarchain/bin/python /opt/cta/nectarchain/nectarchain/dqm/start_calib.py $LISTRUNS"
cmd="\$CALLER exec --home $PWD $CONTAINER /opt/conda/envs/nectarchain/bin/python /opt/cta/nectarchain/src/nectarchain/dqm/start_calib.py $PWD $NECTARDIR -i $LISTRUNS"
echo \$cmd
eval \$cmd
EOF

chmod u+x $WRAPPER
./${WRAPPER}
chmod u+x $WRAPPER || exit_script $?
./${WRAPPER} || exit_script $?


# Archive the output directory and push it on DIRAC before leaving the job:
tar zcf ${OUTDIR}.tar.gz ${OUTDIR}output/
dirac-dms-add-file ${DIRAC_OUTDIR}/${OUTDIR}.tar.gz ${OUTDIR}.tar.gz LPNHE-USER

# Some cleanup before leaving:
[ -d $CONTAINER ] && rm -rf $CONTAINER
[ -f $CONTAINER ] && rm -f $CONTAINER
[ -d $OUTDIR ] && rm -rf $OUTDIR
[ -d ${OUTDIR}output ] && rm -rf ${OUTDIR}output
[ -f $WRAPPER ] && rm -f $WRAPPER
tar zcf ${OUTDIR}.tar.gz ${OUTDIR}/ || exit_script $?
dirac-dms-add-file ${DIRAC_OUTDIR}/${OUTDIR}.tar.gz ${OUTDIR}.tar.gz LPNHE-USER || exit_script $?

exit_script 0
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Time-stamp: "2023-02-07 21:39:09 jlenain"
# Time-stamp: "2023-05-30 13:09:04 jlenain"

import argparse
import sys
Expand All @@ -10,6 +10,7 @@

# astropy imports
from astropy import time
from astropy import units as u

# DIRAC imports
from DIRAC.Interfaces.API.Dirac import Dirac
Expand All @@ -36,18 +37,17 @@
default=False,
help='dry run (does not actually submit jobs)')
parser.add_argument('--log',
default=logging.INFO,
default='info',
help='debug output',
type=str)
args = parser.parse_args()

logger.setLevel(args.log)
logger.setLevel(args.log.upper())

if args.date is None:
logger.critical('A date should be provided, in a format astropy.time.Time compliant. E.g. "2022-04-01".')
sys.exit(1)

container="nectarchain_w_2023_02.sif"
executable_wrapper="dqm_processor.sh"

## Possible massive job processing via loop on run numbers:
Expand All @@ -57,6 +57,10 @@
processDate = time.Time(args.date)
dfcDir = f'/vo.cta.in2p3.fr/nectarcam/{processDate.ymdhms[0]}/{processDate.ymdhms[0]}{str(processDate.ymdhms[1]).zfill(2)}{str(processDate.ymdhms[2]).zfill(2)}'

# The relevant DB file may be stored in the directory corresponding to the day after:
processDateTomorrow = processDate + 1. * u.day
dfcDirTomorrow = f'/vo.cta.in2p3.fr/nectarcam/{processDateTomorrow.ymdhms[0]}/{processDateTomorrow.ymdhms[0]}{str(processDateTomorrow.ymdhms[1]).zfill(2)}{str(processDateTomorrow.ymdhms[2]).zfill(2)}'

# Sometimes, for unkown reason, the connection to the DFC can fail, try a few times:
sleep_time = 2
num_retries = 3
Expand All @@ -76,21 +80,28 @@
sys.exit(1)

infos = dfc.listDirectory(dfcDir)
infosTomorrow = dfc.listDirectory(dfcDirTomorrow)
if not infos['OK'] or not infos['Value']['Successful']:
logger.critical(f"Could not properly retrieve the file metadata for {dfcDir} ... Exiting !")
sys.exit(1)
if not infosTomorrow['OK'] or not infosTomorrow['Value']['Successful']:
logger.warning(f"Could not properly retrieve the file metadata for {dfcDirTomorrow} ... Continuing !")
meta = infos['Value']['Successful'][dfcDir]
metaTomorrow = infosTomorrow['Value']['Successful'][dfcDirTomorrow]

runlist = []

sqlfile = None
sqlfilelist = []
for f in meta['Files']:
if f.endswith('.fits.fz'):
run = f.split('NectarCAM.Run')[1].split('.')[0]
if run not in runlist and run is not None:
runlist.append(run)
if f.endswith('.sqlite'):
sqlfile = f
sqlfilelist.append(f)
for f in metaTomorrow['Files']:
if f.endswith('.sqlite'):
sqlfilelist.append(f)
if args.run is not None:
if args.run not in runlist:
logger.critical(f'Your specified run {args.run} was not found in {dfcDir}, aborting...')
Expand All @@ -99,10 +110,10 @@

logger.info(f'Found runs {runlist} in {dfcDir}')

if sqlfile is None:
logger.critical('Could not find any SQLite file in {dfcDir}, aborting...')
if len(sqlfilelist) == 0:
logger.critical('Could not find any SQLite file in {dfcDir} nor in {dfcDirTomorrow}, aborting...')
sys.exit(1)
logger.info(f'Found SQLite file {sqlfile} in {dfcDir}')
logger.info(f'Found SQLite files {sqlfilelist} in {dfcDir} and {dfcDirTomorrow}')

# Now, submit the DIRAC jobs:
# for run in ['2721']:
Expand All @@ -114,13 +125,13 @@
# j.setDestination('LCG.GRIF.fr')
j.setName(f'NectarCAM DQM run {run}')
j.setJobGroup('NectarCAM DQM')
sandboxlist = [f'{executable_wrapper}',
f'LFN:/vo.cta.in2p3.fr/user/j/jlenain/local/opt/singularity/nectarchain/{container}',
f'LFN:{sqlfile}']
sandboxlist = [f'{executable_wrapper}']
for f in meta['Files']:
if f.endswith('.fits.fz') and f'NectarCAM.Run{run}' in f:
sandboxlist.append(f'LFN:{f}')
if len(sandboxlist) < 4:
for s in sqlfilelist:
sandboxlist.append(f'LFN:{s}')
if len(sandboxlist) < 2:
logger.critical(f'''Misformed sandboxlist, actual data .fits.fz files missing:
{sandboxlist}

Expand All @@ -133,5 +144,5 @@
j.setInputSandbox(sandboxlist)

if not args.dry_run:
res = dirac.submitJob(j) #, mode='local') # for local execution, simulating a DIRAC job on the local machine, instead of submitting it to a DIRAC Computing Element
res = dirac.submitJob(j) # , mode='local') # for local execution, simulating a DIRAC job on the local machine, instead of submitting it to a DIRAC Computing Element
logger.info(f"Submission Result: {res['Value']}")