diff --git a/ydb/library/benchmarks/runner/run_tests/run_tests.py b/ydb/library/benchmarks/runner/run_tests/run_tests.py new file mode 100644 index 000000000000..8ccc83e3503f --- /dev/null +++ b/ydb/library/benchmarks/runner/run_tests/run_tests.py @@ -0,0 +1,129 @@ +import argparse +import subprocess +import pathlib +import os +from sys import stderr + + +def variant(string): + if string not in ["h", "ds"]: + raise ValueError("variant must be h or ds") + return string + +def paths(string): + return list(map(pathlib.Path, string.split(";"))) + +def parse_args(passed=None): + + parser = argparse.ArgumentParser() + + parser.add_argument('--datasize', type=int, default=1) + parser.add_argument('--variant', type=variant, default='h') + parser.add_argument('--tasks', type=int, default=1) + + parser.add_argument('--dqrun', type=pathlib.Path) + parser.add_argument('--gen-queries', type=pathlib.Path) + parser.add_argument('--downloaders-dir', type=pathlib.Path) + parser.add_argument('--udfs-dir', type=paths) + parser.add_argument('--fs-cfg', type=pathlib.Path) + parser.add_argument('--flame-graph', type=pathlib.Path) + parser.add_argument('--result-compare', type=pathlib.Path) + parser.add_argument('--gateways-cfg', type=pathlib.Path) + parser.add_argument('--runner-path', type=pathlib.Path) + + parser.add_argument('-o', '--output', default="./results") + parser.add_argument('--clean-old', action="store_true", default=False) + parser.add_argument('--query-filter', action="append", default=[]) + + return parser.parse_args(passed) + +class Runner: + def prepare_queries_dir(self, custom_pragmas): + print("Preparing queries...", file=stderr) + self.queries_dir.mkdir(parents=True, exist_ok=True) + print("queries dir: ", self.queries_dir.resolve(), file=stderr) + cmd = [str(self.args.gen_queries)] + cmd += ["--output", f"{self.queries_dir}"] + cmd += ["--variant", f"{self.args.variant}"] + cmd += ["--syntax", "yql"] + cmd += ["--dataset-size", f"{self.args.datasize}"] + for it in custom_pragmas: + cmd += ["--pragma", it] + print(cmd, file=stderr) + subprocess.run(cmd) + + def prepare_tpc_dir(self): + print("Preparing tpc...", file=stderr) + cmd = [f"{self.args.downloaders_dir}/download_files_{self.args.variant}_{self.args.datasize}.sh"] + print(cmd, file=stderr) + subprocess.run(cmd) + + def __init__(self, args, enable_spilling): + self.args = args + self.enable_spilling = enable_spilling + + self.queries_dir = pathlib.Path(f"queries{"+" if self.enable_spilling else "-"}spilling-{args.datasize}-{args.tasks}").resolve() + if self.args.clean_old or not self.queries_dir.exists(): + self.prepare_queries_dir([ + f"dq.MaxTasksPerStage={self.args.tasks}", + "dq.OptLLVM=ON" + ] + [ + "dq.UseFinalizeByKey=true", + "dq.EnableSpillingNodes=All", + ] if self.enable_spilling else []) + + self.tpc_dir = pathlib.Path(f"{self.args.downloaders_dir}/tpc/{self.args.variant}/{self.args.datasize}").resolve() + if self.args.clean_old or not self.tpc_dir.exists(): + self.prepare_tpc_dir() + if not pathlib.Path("./tpc").exists(): + os.symlink(f"{self.args.downloaders_dir}/tpc", f"{pathlib.Path("./tpc")}", target_is_directory=True) + + self.result_dir = pathlib.Path(f"{self.args.output}/{"with" if self.enable_spilling else "no"}-spilling/{args.variant}-{args.datasize}-{args.tasks}").resolve() + self.result_dir.mkdir(parents=True, exist_ok=True) + + def run(self): + cmd = ["/usr/bin/time", f"{str(self.args.runner_path)}"] + cmd += ["--perf"] + for it in self.args.query_filter: + cmd += ["--include-q", it] + cmd += ["--query-dir", f"{str(self.queries_dir)}/{self.args.variant}"] + cmd += ["--bindings", f"{str(self.queries_dir)}/{self.args.variant}/bindings.json"] + cmd += ["--result-dir", str(self.result_dir)] + cmd += ["--flame-graph", str(self.flame_graph)] + cmd += [f"{self.args.dqrun}", "-s"] + cmd += ["--enable-spilling"] if self.enable_spilling else [] + cmd += ["--udfs-dir", ";".join(map(str, self.args.udfs_dir))] + cmd += ["--fs-cfg", f"{str(self.args.fs_cfg)}"] + cmd += ["--gateways-cfg", f"{str(self.args.gateways_cfg)}"] + subprocess.run(cmd) + + return self.result_dir + +def result_compare(args, to_compare): + cmd = [f"{args.result_compare}"] + cmd += ["-v"] + cmd += to_compare + print(cmd, file=stderr) + with open(f"{args.output}/result-{args.variant}-{args.datasize}-{args.tasks}.htm", "w") as result_table: + subprocess.run(cmd, stdout=result_table) + +def run(passed=None): + args = parse_args(passed) + + print(args.query_filter) + + results = [] + print("With spilling...", file=stderr) + results.append(Runner(args, True).run()) + print("No spilling...", file=stderr) + results.append(Runner(args, False).run()) + + print(results, file=stderr) + + result_compare(args, results) + +def main(): + run() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ydb/library/benchmarks/runner/run_tests/ya.make b/ydb/library/benchmarks/runner/run_tests/ya.make new file mode 100644 index 000000000000..8242b8b8067d --- /dev/null +++ b/ydb/library/benchmarks/runner/run_tests/ya.make @@ -0,0 +1,10 @@ +PY3_PROGRAM() + +PY_SRCS( + MAIN run_tests.py +) + +PEERDIR( +) + +END() diff --git a/ydb/library/benchmarks/runner/runner/runner.py b/ydb/library/benchmarks/runner/runner/runner.py index a8b7009d32f3..cd13235bdfe6 100755 --- a/ydb/library/benchmarks/runner/runner/runner.py +++ b/ydb/library/benchmarks/runner/runner/runner.py @@ -65,25 +65,25 @@ def run(argv, out, err, timeout=30*60, hard_timeout=5): def main(): - parser = argparse.ArgumentParser() parser.add_argument('--query-dir', type=str, default='q/scalar') parser.add_argument('--bindings', type=str, default='bindings.json') - parser.add_argument('--result-dir', type=str, default="result-{:%Y%m%dT%H%M%S}".format(datetime.datetime.now())) + parser.add_argument('--result-dir', type=Path, default="result-{:%Y%m%dT%H%M%S}".format(datetime.datetime.now())) parser.add_argument('--timeout', type=int, default=30*60) parser.add_argument('--perf', action='store_true') - parser.add_argument('--arc-path', type=str, default='{}/arcadia'.format(os.environ['HOME'])) + parser.add_argument('--flame-graph', type=Path, default=None) parser.add_argument('--include-q', default=[], action='append') parser.add_argument('--exclude-q', default=[], action='append') + args, argv = parser.parse_known_intermixed_args() + qdir = args.query_dir bindings = args.bindings outdir = args.result_dir assert len(argv) querydir = Path(qdir) - os.makedirs(outdir + '/' + qdir, exist_ok=True) - with open(outdir + '/' + qdir + "/summary.tsv", "w") as outf, \ - open(outdir + '/' + qdir + "/summary.json", "w") as outj: + with open(outdir / "summary.tsv", "w") as outf, \ + open(outdir / "summary.json", "w") as outj: print(' '.join(argv + ['-p', qdir, '--bindings-file', bindings]), file=outf) print(json.dumps({ 'cmdline': argv, @@ -92,12 +92,13 @@ def main(): 'version': 100 }), file=outj) for query in sorted(querydir.glob('**/*.sql'), key=lambda x: tuple(map(lambda y: int(y) if re.match(RE_DIGITS, y) else y, re.split(RE_DIGITS, str(x))))): - q = str(query) - name = outdir + '/' + q + q = str(query.stem) + print(f"{q}", end="", flush=True) + name = str(outdir / q) if len(args.include_q): include = False for r in args.include_q: - if re.search(r, name): + if re.search(r, str(query)): include = True break if not include: @@ -105,13 +106,14 @@ def main(): if len(args.exclude_q): include = True for r in args.exclude_q: - if re.search(r, name): + if re.search(r, str(query)): include = False break if not include: continue print(q, end='\t', file=outf) outname = name + '-result.yson' + print(".", end="", flush=True) exitcode, rusage, elapsed, iostat = run( argv + [ '--result-file', outname, @@ -120,7 +122,7 @@ def main(): '--err-file', name + '-err.txt', '--expr-file', name + '-expr.txt', '--stat', name + '-stat.yson', - '-p', q + '-p', str(query) ], name + '-stdout.txt', name + '-stderr.txt', @@ -164,25 +166,27 @@ def main(): } }), file=outj) outj.flush() + print(".", end="", flush=True) if args.perf: exitcode, rusage, elapsed, iostat = run( - ['{}/ya'.format(args.arc_path), 'tool', 'perf', 'record', '-F250', '-g', '--call-graph', 'dwarf', '-o', '{}/perf.data'.format(outdir), '--'] + + ['/usr/bin/perf', 'record', '-F250', '-g', '--call-graph', 'dwarf', '-o', '{}/perf.data'.format(outdir), '--'] + argv + [ '--result-file', '/dev/null', '--bindings-file', bindings, '--plan-file', '/dev/null', '--err-file', '/dev/null', '--expr-file', '/dev/null', - '-p', q + '-p', str(query) ], name + '-stdout-perf.txt', name + '-stderr-perf.txt', timeout=args.timeout) os.system(''' - {0}/ya tool perf script -i {2}/perf.data --header | - {0}/contrib/tools/flame-graph/stackcollapse-perf.pl | - {0}/contrib/tools/flame-graph/flamegraph.pl > {1}.svg - '''.format(args.arc_path, name, outdir)) + perf script -i {2}/perf.data --header | + {0}/stackcollapse-perf.pl | + {0}/flamegraph.pl > {1}.svg + '''.format(args.flame_graph, name, outdir)) + print(".", flush=True) if __name__ == "__main__": diff --git a/ydb/library/benchmarks/runner/runner/ya.make b/ydb/library/benchmarks/runner/runner/ya.make index a2358288f04b..8bb5d555dc5c 100644 --- a/ydb/library/benchmarks/runner/runner/ya.make +++ b/ydb/library/benchmarks/runner/runner/ya.make @@ -4,7 +4,4 @@ PY_SRCS( MAIN runner.py ) -PEERDIR( -) - END() diff --git a/ydb/library/benchmarks/runner/tpc_tests.py b/ydb/library/benchmarks/runner/tpc_tests.py new file mode 100644 index 000000000000..e1f89c21590a --- /dev/null +++ b/ydb/library/benchmarks/runner/tpc_tests.py @@ -0,0 +1,62 @@ +import run_tests.run_tests as run_tests +import yatest.common +import pathlib +import sys + + +class TestRunner: + DEPS = { + "dqrun" : "ydb/library/yql/tools/dqrun", + "gen-queries" : "ydb/library/benchmarks/gen_queries", + "result-compare" : "ydb/library/benchmarks/runner/result_compare", + "runner" : "ydb/library/benchmarks/runner/runner" + } + + DATA = { + "fs-cfg" : "ydb/library/yql/tools/dqrun/examples/fs.conf", + "gateways-cfg" : "ydb/library/benchmarks/runner/runner/test-gateways.conf", + "flame-graph" : "contrib/tools/flame-graph" + } + + UDFS = [ + "ydb/library/yql/udfs/common/set", + "ydb/library/yql/udfs/common/url_base", + "ydb/library/yql/udfs/common/datetime2", + "ydb/library/yql/udfs/common/re2" + ] + + def __init__(self): + self.deps = {name : pathlib.Path(yatest.common.binary_path(path)) for name, path in self.DEPS.items()} + self.udfs = [pathlib.Path(yatest.common.binary_path(path)) for path in self.UDFS] + self.data = {name : pathlib.Path(yatest.common.source_path(path)) for name, path in self.DATA.items()} + self.output = pathlib.Path(yatest.common.output_path()).resolve() + self.results_path = self.output / "results" + self.results_path.mkdir() + + self.cmd = [] + self.cmd += ["--dqrun", str(self.deps["dqrun"]) + "/dqrun"] + self.cmd += ["--gen-queries", str(self.deps["gen-queries"]) + "/gen_queries"] + self.cmd += ["--result-compare", str(self.deps["result-compare"]) + "/result_compare"] + self.cmd += ["--downloaders-dir", "/home/vladluk/ydbwork/ydb/ydb/library/benchmarks/runner"] + self.cmd += ["--runner", str(self.deps["runner"]) + "/runner"] + self.cmd += ["--flama-graph", str(self.data["flame-graph"])] + self.cmd += ["--udfs-dir", ";".join(map(str, self.udfs))] + self.cmd += ["--fs-cfg", str(self.data["fs-cfg"])] + self.cmd += ["--gateways-cfg", str(self.data["gateways-cfg"])] + self.cmd += ["-o", str(self.results_path)] + + + def wrapped_run(self, variant, datasize, tasks, query_filter): + cmd = self.cmd + cmd += ["--variant", f"{variant}"] + cmd += ["--datasize", f"{datasize}"] + cmd += ["--tasks", f"{tasks}"] + cmd += ["--query-filter", f"{query_filter}"] + print(" ".join(cmd), file=sys.stderr) + run_tests.run(cmd) + + +def test_tpc(): + runner = TestRunner() + runner.wrapped_run("h", 1, 1, r"q1\.sql") + print("results path:", runner.results_path.resolve(), file=sys.stderr) diff --git a/ydb/library/benchmarks/runner/ya.make b/ydb/library/benchmarks/runner/ya.make index 8f96bcc8153c..5d2097120683 100644 --- a/ydb/library/benchmarks/runner/ya.make +++ b/ydb/library/benchmarks/runner/ya.make @@ -1,4 +1,37 @@ +PY3TEST() + +SIZE(MEDIUM) + +PY_SRCS( + run_tests/run_tests.py +) + +TEST_SRCS( + tpc_tests.py +) + +DEPENDS( + ydb/library/yql/tools/dqrun + ydb/library/benchmarks/gen_queries + ydb/library/benchmarks/runner/result_compare + ydb/library/benchmarks/runner/runner + + ydb/library/yql/udfs/common/set + ydb/library/yql/udfs/common/url_base + ydb/library/yql/udfs/common/datetime2 + ydb/library/yql/udfs/common/re2 +) + +DATA( + arcadia/ydb/library/yql/tools/dqrun/examples/fs.conf + arcadia/ydb/library/benchmarks/runner/runner/test-gateways.conf + contrib/tools/flame-graph +) + +END() + RECURSE( + run_tests runner result_convert result_compare