diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/job_accounting.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/job_accounting.py index 5fca09696..7586a3cef 100644 --- a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/job_accounting.py +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/job_accounting.py @@ -3,15 +3,19 @@ It also does accounting of memory and CPU usage It loads the config_general file to figure out what files it should look for and processes source name and time range For the moment it ignores date_list and skip_*_runs + +It can also update the h5 file with the list of runs to process """ import argparse import glob +import json import os import re from datetime import datetime, timedelta from subprocess import PIPE, run import numpy as np +import pandas as pd import yaml from magicctapipe import __version__ @@ -79,10 +83,45 @@ def main(): help="No CPU/Memory usage check (faster)", ) + parser.add_argument( + "--run-list-file", + "-r", + dest="run_list", + type=str, + default=None, + help="h5 file with run list", + ) + args = parser.parse_args() with open(args.config_file, "r") as f: config = yaml.safe_load(f) + if args.run_list is not None: + try: + h5key = "joint_obs" + run_key = "LST1_run" + ismagic = False + for magic in [1, 2]: + if args.data_level[-2:] == f"M{magic}": + h5key = f"MAGIC{magic}/runs_M{magic}" + run_key = "Run ID" + ismagic = True + + h5runs = pd.read_hdf(args.run_list, key=h5key) + except (FileNotFoundError, KeyError): + print(f"Cannot open {h5key} in {args.run_list}") + exit(1) + + rc_col = "DL1_rc" if ismagic else args.data_level + "_rc" + + if rc_col not in h5runs.keys(): + h5runs[rc_col] = "{}" + h5runs[rc_col + "_all"] = None + + rc_dicts = {} + for rrun, dct in np.array(h5runs[[run_key, rc_col]]): + rc_dicts[rrun] = json.loads(dct) + # TODO: those variables will be needed when more features are implemented source_out = config["data_selection"]["source_name_output"] timerange = config["data_selection"]["time_range"] @@ -127,9 +166,10 @@ def main(): total_time = 0 all_jobs = [] for dir in dirs: - this_date = re.sub(f".+/{args.data_level}/", "", dir) - this_date = re.sub(r"\D", "", this_date.split("/")[0]) - this_date = datetime.strptime(this_date, "%Y%m%d") + this_date_str = re.sub(f".+/{args.data_level}/", "", dir) + this_date_str = re.sub(r"\D", "", this_date_str.split("/")[0]) + this_date = datetime.strptime(this_date_str, "%Y%m%d") + if timerange and (this_date < timemin or this_date > timemax): continue @@ -159,6 +199,19 @@ def main(): file_in = line[0] slurm_id = f"{line[1]}_{line[2]}" if len(line) == 4 else line[1] rc = line[-1] + + if args.run_list is not None: + if ismagic: + run_subrun = file_in.split("/")[-1].split("_")[2] + this_run = int(run_subrun.split(".")[0]) + this_subrun = int(run_subrun.split(".")[1]) + else: + filename = file_in.split("/")[-1] + this_run = filename.split(".")[1].replace("Run", "") + this_subrun = int(filename.split(".")[2]) + + rc_dicts[this_run][str(this_subrun)] = rc + if rc == "0": this_good += 1 # now check accounting @@ -243,6 +296,23 @@ def main(): f"CPU: median={np.median(all_cpu)}, max={all_cpu.max()}, total={total_time:.2f} CPU hrs; memory [M]: median={np.median(all_mem)}, max={all_mem.max()}" ) + if args.run_list is not None: + print("Updating the database") + for rrun in rc_dicts.keys(): + idx = h5runs[run_key] == rrun + h5runs.loc[idx, rc_col] = json.dumps(rc_dicts[rrun]) + if ismagic: + all_subruns = np.array(h5runs[idx]["number of subruns"])[0] + else: + all_subruns = len(rc_dicts[rrun]) + good_subruns = sum(np.array(list(rc_dicts[rrun].values())) == "0") + isgood = np.logical_and(good_subruns == all_subruns, good_subruns > 0) + h5runs.loc[idx, rc_col + "_all"] = isgood + + # fixme: for DL1/M[12] files since htere are two dataframes in the file, we need to append it + # and this causes increase in the file size every time the file is updated + h5runs.to_hdf(args.run_list, key=h5key, mode="r+") + if __name__ == "__main__": main()