Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduced updating of the database #234

Merged
merged 4 commits into from
Jul 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
It also does accounting of memory and CPU usage
It loads the config_general file to figure out what files it should look for and processes source name and time range
For the moment it ignores date_list and skip_*_runs

It can also update the h5 file with the list of runs to process
"""
import argparse
import glob
import json
import os
import re
from datetime import datetime, timedelta
from subprocess import PIPE, run

import numpy as np
import pandas as pd
import yaml

from magicctapipe import __version__
Expand Down Expand Up @@ -79,10 +83,45 @@ def main():
help="No CPU/Memory usage check (faster)",
)

parser.add_argument(
"--run-list-file",
"-r",
dest="run_list",
type=str,
default=None,
help="h5 file with run list",
)

args = parser.parse_args()
with open(args.config_file, "r") as f:
config = yaml.safe_load(f)

if args.run_list is not None:
try:
h5key = "joint_obs"
run_key = "LST1_run"
ismagic = False
for magic in [1, 2]:
if args.data_level[-2:] == f"M{magic}":
h5key = f"MAGIC{magic}/runs_M{magic}"
run_key = "Run ID"
ismagic = True

h5runs = pd.read_hdf(args.run_list, key=h5key)
except (FileNotFoundError, KeyError):
print(f"Cannot open {h5key} in {args.run_list}")
exit(1)

rc_col = "DL1_rc" if ismagic else args.data_level + "_rc"

if rc_col not in h5runs.keys():
h5runs[rc_col] = "{}"
h5runs[rc_col + "_all"] = None

rc_dicts = {}
for rrun, dct in np.array(h5runs[[run_key, rc_col]]):
rc_dicts[rrun] = json.loads(dct)

# TODO: those variables will be needed when more features are implemented
source_out = config["data_selection"]["source_name_output"]
timerange = config["data_selection"]["time_range"]
Expand Down Expand Up @@ -127,9 +166,10 @@ def main():
total_time = 0
all_jobs = []
for dir in dirs:
this_date = re.sub(f".+/{args.data_level}/", "", dir)
this_date = re.sub(r"\D", "", this_date.split("/")[0])
this_date = datetime.strptime(this_date, "%Y%m%d")
this_date_str = re.sub(f".+/{args.data_level}/", "", dir)
this_date_str = re.sub(r"\D", "", this_date_str.split("/")[0])
this_date = datetime.strptime(this_date_str, "%Y%m%d")

if timerange and (this_date < timemin or this_date > timemax):
continue

Expand Down Expand Up @@ -159,6 +199,19 @@ def main():
file_in = line[0]
slurm_id = f"{line[1]}_{line[2]}" if len(line) == 4 else line[1]
rc = line[-1]

if args.run_list is not None:
if ismagic:
run_subrun = file_in.split("/")[-1].split("_")[2]
this_run = int(run_subrun.split(".")[0])
this_subrun = int(run_subrun.split(".")[1])
else:
filename = file_in.split("/")[-1]
this_run = filename.split(".")[1].replace("Run", "")
this_subrun = int(filename.split(".")[2])

rc_dicts[this_run][str(this_subrun)] = rc

if rc == "0":
this_good += 1
# now check accounting
Expand Down Expand Up @@ -243,6 +296,23 @@ def main():
f"CPU: median={np.median(all_cpu)}, max={all_cpu.max()}, total={total_time:.2f} CPU hrs; memory [M]: median={np.median(all_mem)}, max={all_mem.max()}"
)

if args.run_list is not None:
print("Updating the database")
for rrun in rc_dicts.keys():
idx = h5runs[run_key] == rrun
h5runs.loc[idx, rc_col] = json.dumps(rc_dicts[rrun])
if ismagic:
all_subruns = np.array(h5runs[idx]["number of subruns"])[0]
else:
all_subruns = len(rc_dicts[rrun])
good_subruns = sum(np.array(list(rc_dicts[rrun].values())) == "0")
isgood = np.logical_and(good_subruns == all_subruns, good_subruns > 0)
h5runs.loc[idx, rc_col + "_all"] = isgood

# fixme: for DL1/M[12] files since htere are two dataframes in the file, we need to append it
# and this causes increase in the file size every time the file is updated
h5runs.to_hdf(args.run_list, key=h5key, mode="r+")


if __name__ == "__main__":
main()