forked from htautau/hhntup
-
Notifications
You must be signed in to change notification settings - Fork 0
/
run
executable file
·149 lines (131 loc) · 5.11 KB
/
run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python
from argparse import ArgumentParser
parser = ArgumentParser(usage="%(prog)s [args] samplename1 samplename2 ...")
parser.add_argument("-v","--verbose", action="store_true",
help="verbose", default=False)
parser.add_argument('-n',"--nproc", type=int,
help="number of students (parallel processes)", default=1)
parser.add_argument("--events", type=int,
help="number of events to process", default=-1)
parser.add_argument("--queue", action='store_true',
help="use queue to feed files to students", default=False)
parser.add_argument("--profile", action='store_true',
help="profile execution time of each student's work method",
default=False)
parser.add_argument("--random-sample", type=int,
help=("specify length of a random sampling of "
"input files to process"), default=None)
parser.add_argument("--nice", type=int, dest="nice",
help="nice students", default=0)
parser.add_argument('-p',"--periods",
help=("data periods separated by commas or all period by "
"default if not specified"), default=None)
parser.add_argument('-r',"--runs",
help=("data runs separated by commas "
"(must not also specify periods)"), default=None)
parser.add_argument("--suffix",
help="suffix appended to sample name", default=None)
parser.add_argument("--output-path",
help="directory in which to write the output",
default='.',
dest='outputpath')
parser.add_argument("--split",
help=("n:k split files into n equal subsets and run on "
"kth subset"), default=None)
parser.add_argument('-s',"--student",
help="the file (excluding .py extension) containing a "
"class of the same name inheriting from "
"rootpy.batch.Student", required=True)
parser.add_argument("--db",
help="name of the datasets database file (excluding the "
".yml extension)", default='datasets')
parser.add_argument('datasets', nargs='+')
args, user_args = parser.parse_known_args()
import sys
import os
import random
import multiprocessing
from higgstautau.batch import ATLASSupervisor
from higgstautau import datasets
sys.path.insert(0,'.')
if len(args.datasets) == 1:
# WHAT IS THIS??
if args.datasets[0].lower() == 'all':
dataroot = os.getenv('DATAROOT', None)
if dataroot is None:
sys.exit("$DATAROOT not defined!")
args = []
dirs = glob.glob(os.path.join(dataroot,'*'))
for dir in dirs:
print dir
if os.path.isfile(os.path.join(dir,'meta.yml')):
args.append(os.path.basename(dir))
db = datasets.Database(args.db)
# expand globs
_datasets = []
for dataset in args.datasets:
result = db.search(dataset)
if not result:
sys.exit("No datasets matching %s" % dataset)
_datasets += result
def get_chunk(l, n, k):
"""
Get kth chunk of list split into n equal sublists
"""
step = len(l) / n
start = step * (k - 1)
if k == n:
stop = None
else:
stop = step * k
return l[start:stop]
if args.nproc < 1:
args.nproc = 1
supervisors = {}
for dataset in _datasets:
parent_connection, child_connection = multiprocessing.Pipe()
outputname = '_'.join(
[dataset.name, args.suffix]) if args.suffix else dataset.name
files = dataset.files
if args.random_sample is not None:
if args.random_sample < len(files):
files = random.sample(files, args.random_sample)
if args.split is not None:
n, k = args.split.split(':')
files = get_chunk(files, int(n), int(k))
outputname += '_%d' % int(k)
supervisor = ATLASSupervisor(
student = args.student,
outputname = outputname,
outputpath = args.outputpath,
files = files,
metadata = dataset,
nstudents = args.nproc,
connection = child_connection,
queuemode = args.queue,
profile = args.profile,
grl = dataset.grl,
events = args.events,
nice = args.nice,
options = user_args)
# skip if output file already exists
fulloutputname = os.path.join(
args.outputpath, supervisor.outputname + '.root')
if os.path.exists(fulloutputname):
print ("Skipping dataset %s since the output file "
"%s already exists...") % (
dataset.name, fulloutputname)
print "Delete or move it if you want to rerun on this dataset."
continue
supervisors[parent_connection] = supervisor
for conn, supervisor in supervisors.items():
print "Processing %s..." % supervisor.metadata.name
supervisor.start()
try:
for conn, supervisor in supervisors.items():
supervisor.join()
except KeyboardInterrupt, SystemExit:
print "Cleaning up..."
for conn, supervisor in supervisors.items():
conn.send(None)
supervisor.join()