Skip to content

Commit

Permalink
Merge c0db3dd into 4b2a95c
Browse files Browse the repository at this point in the history
  • Loading branch information
vladl2802 authored Aug 27, 2024
2 parents 4b2a95c + c0db3dd commit 2f00d9f
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 20 deletions.
129 changes: 129 additions & 0 deletions ydb/library/benchmarks/runner/run_tests/run_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import argparse
import subprocess
import pathlib
import os
from sys import stderr


def variant(string):
if string not in ["h", "ds"]:
raise ValueError("variant must be h or ds")
return string

def paths(string):
return list(map(pathlib.Path, string.split(";")))

def parse_args(passed=None):

parser = argparse.ArgumentParser()

parser.add_argument('--datasize', type=int, default=1)
parser.add_argument('--variant', type=variant, default='h')
parser.add_argument('--tasks', type=int, default=1)

parser.add_argument('--dqrun', type=pathlib.Path)
parser.add_argument('--gen-queries', type=pathlib.Path)
parser.add_argument('--downloaders-dir', type=pathlib.Path)
parser.add_argument('--udfs-dir', type=paths)
parser.add_argument('--fs-cfg', type=pathlib.Path)
parser.add_argument('--flame-graph', type=pathlib.Path)
parser.add_argument('--result-compare', type=pathlib.Path)
parser.add_argument('--gateways-cfg', type=pathlib.Path)
parser.add_argument('--runner-path', type=pathlib.Path)

parser.add_argument('-o', '--output', default="./results")
parser.add_argument('--clean-old', action="store_true", default=False)
parser.add_argument('--query-filter', action="append", default=[])

return parser.parse_args(passed)

class Runner:
def prepare_queries_dir(self, custom_pragmas):
print("Preparing queries...", file=stderr)
self.queries_dir.mkdir(parents=True, exist_ok=True)
print("queries dir: ", self.queries_dir.resolve(), file=stderr)
cmd = [str(self.args.gen_queries)]
cmd += ["--output", f"{self.queries_dir}"]
cmd += ["--variant", f"{self.args.variant}"]
cmd += ["--syntax", "yql"]
cmd += ["--dataset-size", f"{self.args.datasize}"]
for it in custom_pragmas:
cmd += ["--pragma", it]
print(cmd, file=stderr)
subprocess.run(cmd)

def prepare_tpc_dir(self):
print("Preparing tpc...", file=stderr)
cmd = [f"{self.args.downloaders_dir}/download_files_{self.args.variant}_{self.args.datasize}.sh"]
print(cmd, file=stderr)
subprocess.run(cmd)

def __init__(self, args, enable_spilling):
self.args = args
self.enable_spilling = enable_spilling

self.queries_dir = pathlib.Path(f"queries{"+" if self.enable_spilling else "-"}spilling-{args.datasize}-{args.tasks}").resolve()
if self.args.clean_old or not self.queries_dir.exists():
self.prepare_queries_dir([
f"dq.MaxTasksPerStage={self.args.tasks}",
"dq.OptLLVM=ON"
] + [
"dq.UseFinalizeByKey=true",
"dq.EnableSpillingNodes=All",
] if self.enable_spilling else [])

self.tpc_dir = pathlib.Path(f"{self.args.downloaders_dir}/tpc/{self.args.variant}/{self.args.datasize}").resolve()
if self.args.clean_old or not self.tpc_dir.exists():
self.prepare_tpc_dir()
if not pathlib.Path("./tpc").exists():
os.symlink(f"{self.args.downloaders_dir}/tpc", f"{pathlib.Path("./tpc")}", target_is_directory=True)

self.result_dir = pathlib.Path(f"{self.args.output}/{"with" if self.enable_spilling else "no"}-spilling/{args.variant}-{args.datasize}-{args.tasks}").resolve()
self.result_dir.mkdir(parents=True, exist_ok=True)

def run(self):
cmd = ["/usr/bin/time", f"{str(self.args.runner_path)}"]
cmd += ["--perf"]
for it in self.args.query_filter:
cmd += ["--include-q", it]
cmd += ["--query-dir", f"{str(self.queries_dir)}/{self.args.variant}"]
cmd += ["--bindings", f"{str(self.queries_dir)}/{self.args.variant}/bindings.json"]
cmd += ["--result-dir", str(self.result_dir)]
cmd += ["--flame-graph", str(self.flame_graph)]
cmd += [f"{self.args.dqrun}", "-s"]
cmd += ["--enable-spilling"] if self.enable_spilling else []
cmd += ["--udfs-dir", ";".join(map(str, self.args.udfs_dir))]
cmd += ["--fs-cfg", f"{str(self.args.fs_cfg)}"]
cmd += ["--gateways-cfg", f"{str(self.args.gateways_cfg)}"]
subprocess.run(cmd)

return self.result_dir

def result_compare(args, to_compare):
cmd = [f"{args.result_compare}"]
cmd += ["-v"]
cmd += to_compare
print(cmd, file=stderr)
with open(f"{args.output}/result-{args.variant}-{args.datasize}-{args.tasks}.htm", "w") as result_table:
subprocess.run(cmd, stdout=result_table)

def run(passed=None):
args = parse_args(passed)

print(args.query_filter)

results = []
print("With spilling...", file=stderr)
results.append(Runner(args, True).run())
print("No spilling...", file=stderr)
results.append(Runner(args, False).run())

print(results, file=stderr)

result_compare(args, results)

def main():
run()

if __name__ == "__main__":
main()
10 changes: 10 additions & 0 deletions ydb/library/benchmarks/runner/run_tests/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
PY3_PROGRAM()

PY_SRCS(
MAIN run_tests.py
)

PEERDIR(
)

END()
38 changes: 21 additions & 17 deletions ydb/library/benchmarks/runner/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,25 @@ def run(argv, out, err, timeout=30*60, hard_timeout=5):


def main():

parser = argparse.ArgumentParser()
parser.add_argument('--query-dir', type=str, default='q/scalar')
parser.add_argument('--bindings', type=str, default='bindings.json')
parser.add_argument('--result-dir', type=str, default="result-{:%Y%m%dT%H%M%S}".format(datetime.datetime.now()))
parser.add_argument('--result-dir', type=Path, default="result-{:%Y%m%dT%H%M%S}".format(datetime.datetime.now()))
parser.add_argument('--timeout', type=int, default=30*60)
parser.add_argument('--perf', action='store_true')
parser.add_argument('--arc-path', type=str, default='{}/arcadia'.format(os.environ['HOME']))
parser.add_argument('--flame-graph', type=Path, default=None)
parser.add_argument('--include-q', default=[], action='append')
parser.add_argument('--exclude-q', default=[], action='append')

args, argv = parser.parse_known_intermixed_args()

qdir = args.query_dir
bindings = args.bindings
outdir = args.result_dir
assert len(argv)
querydir = Path(qdir)
os.makedirs(outdir + '/' + qdir, exist_ok=True)
with open(outdir + '/' + qdir + "/summary.tsv", "w") as outf, \
open(outdir + '/' + qdir + "/summary.json", "w") as outj:
with open(outdir / "summary.tsv", "w") as outf, \
open(outdir / "summary.json", "w") as outj:
print(' '.join(argv + ['-p', qdir, '--bindings-file', bindings]), file=outf)
print(json.dumps({
'cmdline': argv,
Expand All @@ -92,26 +92,28 @@ def main():
'version': 100
}), file=outj)
for query in sorted(querydir.glob('**/*.sql'), key=lambda x: tuple(map(lambda y: int(y) if re.match(RE_DIGITS, y) else y, re.split(RE_DIGITS, str(x))))):
q = str(query)
name = outdir + '/' + q
q = str(query.stem)
print(f"{q}", end="", flush=True)
name = str(outdir / q)
if len(args.include_q):
include = False
for r in args.include_q:
if re.search(r, name):
if re.search(r, str(query)):
include = True
break
if not include:
continue
if len(args.exclude_q):
include = True
for r in args.exclude_q:
if re.search(r, name):
if re.search(r, str(query)):
include = False
break
if not include:
continue
print(q, end='\t', file=outf)
outname = name + '-result.yson'
print(".", end="", flush=True)
exitcode, rusage, elapsed, iostat = run(
argv + [
'--result-file', outname,
Expand All @@ -120,7 +122,7 @@ def main():
'--err-file', name + '-err.txt',
'--expr-file', name + '-expr.txt',
'--stat', name + '-stat.yson',
'-p', q
'-p', str(query)
],
name + '-stdout.txt',
name + '-stderr.txt',
Expand Down Expand Up @@ -164,25 +166,27 @@ def main():
}
}), file=outj)
outj.flush()
print(".", end="", flush=True)
if args.perf:
exitcode, rusage, elapsed, iostat = run(
['{}/ya'.format(args.arc_path), 'tool', 'perf', 'record', '-F250', '-g', '--call-graph', 'dwarf', '-o', '{}/perf.data'.format(outdir), '--'] +
['/usr/bin/perf', 'record', '-F250', '-g', '--call-graph', 'dwarf', '-o', '{}/perf.data'.format(outdir), '--'] +
argv + [
'--result-file', '/dev/null',
'--bindings-file', bindings,
'--plan-file', '/dev/null',
'--err-file', '/dev/null',
'--expr-file', '/dev/null',
'-p', q
'-p', str(query)
],
name + '-stdout-perf.txt',
name + '-stderr-perf.txt',
timeout=args.timeout)
os.system('''
{0}/ya tool perf script -i {2}/perf.data --header |
{0}/contrib/tools/flame-graph/stackcollapse-perf.pl |
{0}/contrib/tools/flame-graph/flamegraph.pl > {1}.svg
'''.format(args.arc_path, name, outdir))
perf script -i {2}/perf.data --header |
{0}/stackcollapse-perf.pl |
{0}/flamegraph.pl > {1}.svg
'''.format(args.flame_graph, name, outdir))
print(".", flush=True)


if __name__ == "__main__":
Expand Down
3 changes: 0 additions & 3 deletions ydb/library/benchmarks/runner/runner/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,4 @@ PY_SRCS(
MAIN runner.py
)

PEERDIR(
)

END()
62 changes: 62 additions & 0 deletions ydb/library/benchmarks/runner/tpc_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import run_tests.run_tests as run_tests
import yatest.common
import pathlib
import sys


class TestRunner:
DEPS = {
"dqrun" : "ydb/library/yql/tools/dqrun",
"gen-queries" : "ydb/library/benchmarks/gen_queries",
"result-compare" : "ydb/library/benchmarks/runner/result_compare",
"runner" : "ydb/library/benchmarks/runner/runner"
}

DATA = {
"fs-cfg" : "ydb/library/yql/tools/dqrun/examples/fs.conf",
"gateways-cfg" : "ydb/library/benchmarks/runner/runner/test-gateways.conf",
"flame-graph" : "contrib/tools/flame-graph"
}

UDFS = [
"ydb/library/yql/udfs/common/set",
"ydb/library/yql/udfs/common/url_base",
"ydb/library/yql/udfs/common/datetime2",
"ydb/library/yql/udfs/common/re2"
]

def __init__(self):
self.deps = {name : pathlib.Path(yatest.common.binary_path(path)) for name, path in self.DEPS.items()}
self.udfs = [pathlib.Path(yatest.common.binary_path(path)) for path in self.UDFS]
self.data = {name : pathlib.Path(yatest.common.source_path(path)) for name, path in self.DATA.items()}
self.output = pathlib.Path(yatest.common.output_path()).resolve()
self.results_path = self.output / "results"
self.results_path.mkdir()

self.cmd = []
self.cmd += ["--dqrun", str(self.deps["dqrun"]) + "/dqrun"]
self.cmd += ["--gen-queries", str(self.deps["gen-queries"]) + "/gen_queries"]
self.cmd += ["--result-compare", str(self.deps["result-compare"]) + "/result_compare"]
self.cmd += ["--downloaders-dir", "/home/vladluk/ydbwork/ydb/ydb/library/benchmarks/runner"]
self.cmd += ["--runner", str(self.deps["runner"]) + "/runner"]
self.cmd += ["--flama-graph", str(self.data["flame-graph"])]
self.cmd += ["--udfs-dir", ";".join(map(str, self.udfs))]
self.cmd += ["--fs-cfg", str(self.data["fs-cfg"])]
self.cmd += ["--gateways-cfg", str(self.data["gateways-cfg"])]
self.cmd += ["-o", str(self.results_path)]


def wrapped_run(self, variant, datasize, tasks, query_filter):
cmd = self.cmd
cmd += ["--variant", f"{variant}"]
cmd += ["--datasize", f"{datasize}"]
cmd += ["--tasks", f"{tasks}"]
cmd += ["--query-filter", f"{query_filter}"]
print(" ".join(cmd), file=sys.stderr)
run_tests.run(cmd)


def test_tpc():
runner = TestRunner()
runner.wrapped_run("h", 1, 1, r"q1\.sql")
print("results path:", runner.results_path.resolve(), file=sys.stderr)
33 changes: 33 additions & 0 deletions ydb/library/benchmarks/runner/ya.make
Original file line number Diff line number Diff line change
@@ -1,4 +1,37 @@
PY3TEST()

SIZE(MEDIUM)

PY_SRCS(
run_tests/run_tests.py
)

TEST_SRCS(
tpc_tests.py
)

DEPENDS(
ydb/library/yql/tools/dqrun
ydb/library/benchmarks/gen_queries
ydb/library/benchmarks/runner/result_compare
ydb/library/benchmarks/runner/runner

ydb/library/yql/udfs/common/set
ydb/library/yql/udfs/common/url_base
ydb/library/yql/udfs/common/datetime2
ydb/library/yql/udfs/common/re2
)

DATA(
arcadia/ydb/library/yql/tools/dqrun/examples/fs.conf
arcadia/ydb/library/benchmarks/runner/runner/test-gateways.conf
contrib/tools/flame-graph
)

END()

RECURSE(
run_tests
runner
result_convert
result_compare
Expand Down

0 comments on commit 2f00d9f

Please sign in to comment.