-
Notifications
You must be signed in to change notification settings - Fork 2
/
tc-decision.py
237 lines (186 loc) · 7.16 KB
/
tc-decision.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
from glob import glob
from functools import reduce
import collections
import json
import jsone
import os
import sys
import requests
import slugid
import yaml
import subprocess
import networkx as nx
TASKS_ROOT = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])), 'taskcluster')
TASKCLUSTER_API_BASEURL = 'http://taskcluster/queue/v1/task/%(task_id)s'
def string_to_dict(id, value):
parts = id.split('.')
def pack(parts):
if len(parts) == 1:
return {parts[0]: value}
elif len(parts):
return {parts[0]: pack(parts[1:])}
return parts
return pack(parts)
def merge_dicts(*dicts):
if not reduce(lambda x, y: isinstance(y, dict) and x, dicts, True):
raise TypeError("Object in *dicts not of type dict")
if len(dicts) < 2:
raise ValueError("Requires 2 or more dict objects")
def merge(a, b):
for d in set(a.keys()).union(b.keys()):
if d in a and d in b:
if type(a[d]) == type(b[d]):
if not isinstance(a[d], dict):
ret = list({a[d], b[d]})
if len(ret) == 1: ret = ret[0]
yield (d, sorted(ret))
else:
yield (d, dict(merge(a[d], b[d])))
else:
raise TypeError("Conflicting key:value type assignment", type(a[d]), a[d], type(b[d]), b[d])
elif d in a:
yield (d, a[d])
elif d in b:
yield (d, b[d])
else:
raise KeyError
return reduce(lambda x, y: dict(merge(x, y)), dicts[1:], dicts[0])
def taskcluster_event_context():
das_context = {}
# Pre-filterting
for k in os.environ.keys():
if k == 'GITHUB_HEAD_USER':
os.environ['GITHUB_HEAD_USER_LOGIN'] = os.environ[k]
del os.environ['GITHUB_HEAD_USER']
for k in os.environ.keys():
if k == 'TASK_ID':
parts = string_to_dict('taskcluster.taskGroupId', os.environ[k])
das_context = merge_dicts(das_context, parts)
if k.startswith('GITHUB_'):
parts = string_to_dict(k.lower().replace('_', '.').replace('github', 'event'), os.environ[k])
das_context = merge_dicts(das_context, parts)
return das_context
def load_specific_contextFile(file):
specific_context = {}
try:
with open(os.path.join(TASKS_ROOT, file)) as src:
specific_context = yaml.load(src, Loader=yaml.FullLoader)
if specific_context is None:
specific_context = {}
except FileNotFoundError:
specific_context = {}
return specific_context
def defaultValues_build_context():
return load_specific_contextFile('.build.yml')
def shared_context():
return load_specific_contextFile('.shared.yml')
def create_task_payload(build, base_context):
build_type = os.path.splitext(os.path.basename(build))[0]
build_context = defaultValues_build_context()
with open(build) as src:
build_context['build'].update(yaml.load(src, Loader=yaml.FullLoader)['build'])
# Be able to use what has been defined in base_context
# e.g., the {${event.head.branch}}
build_context = jsone.render(build_context, base_context)
template_context = {
'taskcluster': {
'taskId': as_slugid(build_type)
},
'build_type': build_type
}
with open(os.path.join(TASKS_ROOT, build_context['build']['template_file'])) as src:
template = yaml.load(src, Loader=yaml.FullLoader)
contextes = merge_dicts({}, base_context, template_context, build_context)
for one_context in glob(os.path.join(TASKS_ROOT, '*.cyml')):
with open(one_context) as src:
contextes = merge_dicts(contextes, yaml.load(src, Loader=yaml.FullLoader))
return jsone.render(template, contextes)
def send_task(t):
url = TASKCLUSTER_API_BASEURL % { 'task_id': t['taskId'] }
del t['taskId']
r = requests.put(url, json=t)
print(url, r.status_code)
if r.status_code != requests.codes.ok:
print(json.dumps(t, indent=2))
print(r.content)
print(json.loads(r.content.decode())['message'])
return r.status_code == requests.codes.ok
slugids = {}
def as_slugid(name):
if name not in slugids:
slugids[name] = slugid.nice().decode()
print('cache miss', name, slugids[name])
else:
print('cache hit', name, slugids[name])
return slugids[name]
def to_int(x):
return int(x)
def functions_context():
return {
'as_slugid': as_slugid,
'to_int': to_int
}
def is_dry_run():
return (len(sys.argv) > 1) and (sys.argv[1] == '--dry')
def should_run():
# Make a quick clone to fetch the last commit
try:
subprocess.check_call([
'git', 'clone', '--quiet', '-b', os.environ.get('GITHUB_HEAD_BRANCH'),
'--single-branch', os.environ.get('GITHUB_HEAD_REPO_URL'),
'--depth=1', '/tmp/ds-clone/'
], env={ 'GIT_LFS_SKIP_SMUDGE': '1'})
except subprocess.CalledProcessError as e:
print("Error while git cloning:", e, file=sys.stderr)
return False
try:
git_msg = subprocess.check_output([
'git', '--git-dir=/tmp/ds-clone/.git/',
'log', '--format=%b', '-n', '1',
os.environ.get('GITHUB_HEAD_SHA')
]).decode('utf-8').strip().upper()
except subprocess.CalledProcessError as e:
print("Error while git show:", e, file=sys.stderr)
return False
print('Commit message:', git_msg)
x_deepspeech = filter(lambda x: 'X-DEEPSPEECH:' in x, git_msg.split('\n'))
if len(list(filter(lambda x: 'NOBUILD' in x, x_deepspeech))) == 1:
print('Not running anything according to commit message')
return False
return True
if __name__ == '__main__' :
if not is_dry_run():
# We might want to NOT run in some cases
if not should_run():
sys.exit(0)
base_context = taskcluster_event_context()
base_context = merge_dicts(base_context, functions_context())
base_context = merge_dicts(base_context, shared_context())
root_task = base_context['taskcluster']['taskGroupId']
tasks_graph = nx.DiGraph()
tasks = {}
for build in glob(os.path.join(TASKS_ROOT, '*.yml')):
t = create_task_payload(build, base_context)
# We allow template to produce completely empty output
if not t:
continue
if 'dependencies' in t and len(t['dependencies']) > 0:
for dep in t['dependencies']:
tasks_graph.add_edge(t['taskId'], dep)
else:
tasks_graph.add_edge(t['taskId'], root_task)
tasks[t['taskId']] = t
for task in nx.dfs_postorder_nodes(tasks_graph):
# root_task is the task group and also the task id that is already
# running, so we don't have to schedule that
if task == root_task:
continue
t = tasks[task]
if is_dry_run():
print(json.dumps(t, indent=2))
continue
p = send_task(t)
if not p:
sys.exit(1)