-
Notifications
You must be signed in to change notification settings - Fork 42
/
test-harness
executable file
·284 lines (240 loc) · 8.88 KB
/
test-harness
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
#!/usr/bin/env python3
links = './links'
import sys, re, time, os, signal
import concurrent.futures, threading, multiprocessing
from os import path
from subprocess import Popen, PIPE
successes = failures = ignored = 0
TIMEOUT = 15 # seconds before a test is timed out.
the_lock = threading.Lock()
class TestFileStream:
"""Custom stream for reading test files.
Skips any line starting with # and turns any sequence of empty lines into a
single empty line.
Implements the relevant subset of the interface of the object you get back
from open(file, 'r')
"""
EOF_MARKER = ""
def __init__(self, stream):
self.stream = stream
self.prev_readline_was_empty = False
def __iter__(self):
return self
def __next__(self):
line = self.readline()
if line == TestFileStream.EOF_MARKER:
raise StopIteration
return line
def open_test_file(path):
return TestFileStream(open(path, 'r'))
def readline(self):
is_comment = True
is_empty = False
line = None
while is_comment or (self.prev_readline_was_empty and is_empty):
line = self.stream.readline()
if line == TestFileStream.EOF_MARKER:
return line
is_comment = re.match("^\\s*#", line) is not None
is_empty = re.match("^\\s*$", line) is not None
self.prev_readline_was_empty = is_empty
return line
def peek_line(self):
pos = self.stream.tell()
line = self.readline()
self.stream.seek(pos)
return line
def FAIL(name, msg):
return (' error: %s' % name,' (%s)' % msg)
# must only be called while having the lock
def OK(name):
global successes
successes+=1
print(' SUCCESS: %s' % name)
def parse(stream):
"""Read test information separated by blank lines. The first line
is the test name; the second is the test code; the remaining lines
are auxiliary options"""
def tests():
tests = []
for line in stream:
if not str.strip(line) and tests[-1]:
yield tests
tests = []
else:
tests.append(line)
yield tests
for test in tests():
if not test:
print('WARNING: empty test', file=sys.stderr)
else:
name = str.strip(test.pop(0))
code = str.strip(test.pop(0))
opts = dict([str.split(str.rstrip(line), ' : ', 1) for line in test])
yield name, code, opts
# Lock not required for calling
def check_expected(name, item, got, expected, errors):
"""Check some data against an expected value or predicate. If it
matches, return True. Otherwise, display an error and return
False.
"""
got, expected = got.rstrip(), expected.rstrip()
if expected.startswith('@'):
if not re.match(expected[1:], got, re.DOTALL):
errors.append(FAIL(name, "Unexpected %s: expected `%s' (regex); got `%s'" %
(item, expected[1:], got)))
return False
else:
return True
elif expected != got:
errors.append(FAIL(name, "Unexpected %s:\nexpected `%s';\n got `%s'" % (item, expected, got)))
return False
else:
return True
# Ensures "foo bar" is interpreted as a single argument and not two.
def patch_arguments(args = []):
result = []
i = 0
while i < len(args):
if args[i].startswith("\""):
components = [args[i][1:]]
i += 1
while i < len(args):
if args[i].endswith("\""):
components.append(args[i][0:len(args[i])-1])
i +=1
result.append(" ".join(components))
break
else:
components.append(args[i])
i += 1
else:
result.append(args[i])
i += 1
return result
def split_arguments(args = []):
links_args = []
rest_args = []
i = 0
for i in range(0, len(args)):
if args[i] == "--":
rest_args = args[i:]
break
else:
links_args.append(args[i])
return (links_args, rest_args)
def evaluate(name, code, config_file, stdout='', stderr='', exit = '0', env = None, filemode='', args='', ignore = None):
arg_array, rest_array = split_arguments(patch_arguments(str.split(args)))
global failures
if config_file != None:
for arg in arg_array:
if arg.startswith("--config"):
print(
"FAILURE: Test \"%s\" comes with an args entry to specify the config file," % name +
" but a config file was also passed as a command-line argument to the test harness" )
failures += 1
return
arg_array = ["--config=" + config_file] + arg_array
if not filemode.startswith('true'):
arg_array += ["-e"]
#starts shell executing links in new session. This allows us to kill the
#shell and its sub-processes (i.e., links) in case of a timeout
proc = Popen([links] + arg_array + [code] + rest_array, stdout=PIPE, stderr=PIPE, env=env, start_new_session=True)
passed = True
errors = []
for i in range(0, TIMEOUT*100):
rc = proc.poll()
if rc != None:
passed &= check_expected(name, 'return code', str(rc), exit, errors)
passed &= check_expected(name, 'stdout', proc.stdout.read().decode('ascii'), stdout, errors)
passed &= check_expected(name, 'stderr', str(proc.stderr.read().decode('ascii')), stderr, errors)
if passed:
with the_lock:
OK(name)
else:
if ignore != None:
global ignored
with the_lock:
ignored += 1
print('?IGNORED: %s (%s)' % (name, ignore))
else:
with the_lock:
failures += 1
print('!FAILURE: %s' % name)
for i, j in errors:
print(i)
print(j)
return
else:
time.sleep(0.01)
#Kill the whole process group we started earlier due to timeout
pgid = os.getpgid(proc.pid)
os.killpg(pgid, signal.SIGKILL)
with the_lock:
failures += 1
print('!FAILURE: %s [TIMED OUT]' % name)
def parseTestScript(stream):
header = dict()
if stream.peek_line() == "---\n":
stream.readline()
line_num = 1
line = stream.peek_line()
while not (line == "---\n"):
raw_line = stream.readline()
line_num += 1
line = str.strip(raw_line)
keyvalue = list(map(str.strip, str.split(line, ':', 1)))
if len(keyvalue) == 2:
header[keyvalue[0]] = keyvalue[1]
else:
raise SystemExit("Syntax error on line " + str(line_num) + ": " + raw_line)
line = stream.peek_line()
stream.readline()
stream.readline()
yield header
yield from parse(stream)
def main():
config_file = None
if len(sys.argv) == 3:
filename = sys.argv[1]
config_file = sys.argv[2]
elif len(sys.argv) == 2:
filename = sys.argv[1]
else:
raise SystemExit('Usage: run <test file> [<links config file>]')
cpus=multiprocessing.cpu_count()
tp = concurrent.futures.ThreadPoolExecutor(max_workers=cpus)
stream = parseTestScript(TestFileStream.open_test_file(filename))
# Process the header, if any.
header = next(stream)
if config_file == None and "config" in header:
config_file = header["config"]
# Must remove the "config" key because we combine the header-provided
# options with the per-file ones later on. However, the global config
# file must be provided via the config_file parameter of evaluate only.
del header["config"]
jobs = []
# Process the body.
for name, code, opts in stream:
# Combines the options set in the header with the ones specific to this
# test case. The latter ones overwrite the ones from the header.
opts = {**header, **opts}
# enqueues execution of
# evaluate(name, code, config_file, **opts)
# in the thread pool
jobs.append((name, tp.submit(evaluate, name, code, config_file, **opts)))
# Wait for all jobs to finish
tp.shutdown(wait=True)
# Check that none of the jobs raised an exception:
for (job_name, result) in jobs:
if result.exception():
raise SystemExit(("Error: Test \"%s\" failed to execute, producing "
"the following exception:\n%s")
% (job_name, result.exception()))
print("%d failures (+%d ignored)\n%d successes\n" % (failures, ignored, successes))
if failures > 0:
sys.exit(1)
else:
sys.exit(0)
if __name__ == '__main__':
main()