Skip to content

Commit

Permalink
Merge 6e14f85 into be39b5a
Browse files Browse the repository at this point in the history
  • Loading branch information
vladl2802 authored Sep 3, 2024
2 parents be39b5a + 6e14f85 commit 0bcaedf
Show file tree
Hide file tree
Showing 7 changed files with 497 additions and 21 deletions.
162 changes: 162 additions & 0 deletions ydb/library/benchmarks/runner/run_tests/run_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import argparse
import subprocess
import pathlib
import os
from sys import stderr


def variant(string):
if string not in ["h", "ds"]:
raise ValueError("variant must be h or ds")
return string


def paths(string):
return list(map(pathlib.Path, string.split(";")))


def parse_args():
subparser = argparse.ArgumentParser()

subparser.add_argument('--is-test', action="store_true", default=False)

subparser.add_argument('--datasize', type=int, default=1)
subparser.add_argument('--variant', type=variant, default='h')
subparser.add_argument('--tasks', type=int, default=1)

subparser.add_argument('-o', '--output', default="./results")
subparser.add_argument('--clean-old', action="store_true", default=False)
subparser.add_argument('--query-filter', action="append", default=[])

args, argv = subparser.parse_known_args()

if args.is_test:
parser = argparse.ArgumentParser()

parser.add_argument('--dqrun', type=pathlib.Path)
parser.add_argument('--gen-queries', type=pathlib.Path)
parser.add_argument('--downloaders-dir', type=pathlib.Path)
parser.add_argument('--udfs-dir', type=paths)
parser.add_argument('--fs-cfg', type=pathlib.Path)
parser.add_argument('--flame-graph', type=pathlib.Path)
parser.add_argument('--result-compare', type=pathlib.Path)
parser.add_argument('--gateways-cfg', type=pathlib.Path)
parser.add_argument('--runner-path', type=pathlib.Path)

return parser.parse_args(argv, namespace=args)
else:
parser = argparse.ArgumentParser()

parser.add_argument('--ydb-root', type=lambda path: pathlib.Path(path).resolve(), default="../../../../")

args = parser.parse_args(argv, namespace=args)

args.dqrun = args.ydb_root / "ydb" / "library" / "yql" / "tools" / "dqrun" / "dqrun"
args.gen_queries = args.ydb_root / "ydb" / "library" / "benchmarks" / "gen_queries" / "gen_queries"
args.downloaders_dir = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner"
args.fs_cfg = args.ydb_root / "ydb" / "library" / "yql" / "tools" / "dqrun" / "examples" / "fs.conf"
args.flame_graph = args.ydb_root / "contrib" / "tools" / "flame-graph"
args.result_compare = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" / "result_compare" / "result_compare"
args.gateways_cfg = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" / "runner" / "test-gateways.conf"
args.runner_path = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" / "runner" / "runner"

udfs_prefix = args.ydb_root / "ydb" / "library" / "yql" / "udfs" / "common"
args.udfs_dir = [udfs_prefix / name for name in ["set", "url_base", "datetime2", "re2", "math", "unicode_base"]]

return args


class Runner:
def prepare_queries_dir(self, custom_pragmas):
print("Preparing queries...", file=stderr)
self.queries_dir.mkdir(parents=True, exist_ok=True)
cmd = [str(self.args.gen_queries)]
cmd += ["--output", f"{self.queries_dir}"]
cmd += ["--variant", f"{self.args.variant}"]
cmd += ["--syntax", "yql"]
cmd += ["--dataset-size", f"{self.args.datasize}"]
for it in custom_pragmas:
cmd += ["--pragma", it]
res = subprocess.run(cmd)
if res.returncode != 0:
raise OSError("Failed to prepare queries")

def prepare_tpc_dir(self):
print("Preparing tpc...", file=stderr)
cmd = [f"./download_files_{self.args.variant}_{self.args.datasize}.sh"]
res = subprocess.run(cmd)
if res.returncode != 0:
raise OSError("Failed to prepare tpc")

def __init__(self, args, enable_spilling):
self.args = args
self.enable_spilling = enable_spilling

self.queries_dir = pathlib.Path(f"queries{"+" if self.enable_spilling else "-"}spilling-{args.datasize}-{args.tasks}")
if self.args.clean_old or not self.queries_dir.exists():
self.prepare_queries_dir([
f"dq.MaxTasksPerStage={self.args.tasks}",
"dq.OptLLVM=ON"
] + [
"dq.UseFinalizeByKey=true",
"dq.EnableSpillingNodes=All",
] if self.enable_spilling else [])
self.tpc_dir = pathlib.Path(f"{self.args.downloaders_dir}/tpc/{self.args.variant}/{self.args.datasize}")
print(self.tpc_dir, file=stderr)
if self.args.clean_old or not self.tpc_dir.exists():
path = os.getcwd()
os.chdir(self.args.downloaders_dir)
print(os.getcwd(), file=stderr)
self.prepare_tpc_dir()
os.chdir(path)

self.result_dir = pathlib.Path(f"{self.args.output}/{"with" if self.enable_spilling else "no"}-spilling/{args.variant}-{args.datasize}-{args.tasks}").resolve()
self.result_dir.mkdir(parents=True, exist_ok=True)

def run(self):
cmd = ["/usr/bin/time", f"{str(self.args.runner_path)}"]
# cmd += ["--perf"]
for it in self.args.query_filter:
cmd += ["--include-q", it]
cmd += ["--query-dir", f"{str(self.queries_dir)}/{self.args.variant}"]
cmd += ["--bindings", f"{str(self.queries_dir)}/{self.args.variant}/bindings.json"]
cmd += ["--result-dir", str(self.result_dir)]
cmd += ["--flame-graph", str(self.args.flame_graph)]
cmd += [f"{self.args.dqrun}", "-s"]
cmd += ["--enable-spilling"] if self.enable_spilling else []
cmd += ["--udfs-dir", ";".join(map(str, self.args.udfs_dir))]
cmd += ["--fs-cfg", f"{str(self.args.fs_cfg)}"]
cmd += ["--gateways-cfg", f"{str(self.args.gateways_cfg)}"]
print("Running runner...", file=stderr)
subprocess.run(cmd)

print("Run results at: ", self.result_dir)
return self.result_dir


def result_compare(args, to_compare):
print("Comparing...")
cmd = [f"{args.result_compare}"]
cmd += ["-v"]
cmd += to_compare
with open(f"{args.output}/result-{args.variant}-{args.datasize}-{args.tasks}.htm", "w") as result_table:
res = subprocess.run(cmd, stdout=result_table)
if res.returncode != 0:
raise OSError("Failed to compare result")


def main():
args = parse_args()

results = []
print("With spilling...", file=stderr)
results.append(Runner(args, True).run())
print("No spilling...", file=stderr)
results.append(Runner(args, False).run())

if not args.is_test:
result_compare(args, results)


if __name__ == "__main__":
main()
10 changes: 10 additions & 0 deletions ydb/library/benchmarks/runner/run_tests/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
PY3_PROGRAM()

PY_SRCS(
MAIN run_tests.py
)

PEERDIR(
)

END()
40 changes: 22 additions & 18 deletions ydb/library/benchmarks/runner/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def run(argv, out, err, timeout=30*60, hard_timeout=5):
oldmask = signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGCHLD})
try:
start_time = time_ns()
pid = os.posix_spawn(argv[0], argv, os.environ, setsigmask=oldmask, file_actions=(
pid = os.posix_spawnp(argv[0], argv, os.environ, setsigmask=oldmask, file_actions=(
([(os.POSIX_SPAWN_OPEN, 1, out, os.O_WRONLY | os.O_CREAT, 0o666)] if out else []) +
([(os.POSIX_SPAWN_OPEN, 2, err, os.O_WRONLY | os.O_CREAT, 0o666)] if err else [])
))
Expand Down Expand Up @@ -65,25 +65,25 @@ def run(argv, out, err, timeout=30*60, hard_timeout=5):


def main():

parser = argparse.ArgumentParser()
parser.add_argument('--query-dir', type=str, default='q/scalar')
parser.add_argument('--bindings', type=str, default='bindings.json')
parser.add_argument('--result-dir', type=str, default="result-{:%Y%m%dT%H%M%S}".format(datetime.datetime.now()))
parser.add_argument('--result-dir', type=Path, default="result-{:%Y%m%dT%H%M%S}".format(datetime.datetime.now()))
parser.add_argument('--timeout', type=int, default=30*60)
parser.add_argument('--perf', action='store_true')
parser.add_argument('--arc-path', type=str, default='{}/arcadia'.format(os.environ['HOME']))
parser.add_argument('--flame-graph', type=Path, default=None)
parser.add_argument('--include-q', default=[], action='append')
parser.add_argument('--exclude-q', default=[], action='append')

args, argv = parser.parse_known_intermixed_args()

qdir = args.query_dir
bindings = args.bindings
outdir = args.result_dir
assert len(argv)
querydir = Path(qdir)
os.makedirs(outdir + '/' + qdir, exist_ok=True)
with open(outdir + '/' + qdir + "/summary.tsv", "w") as outf, \
open(outdir + '/' + qdir + "/summary.json", "w") as outj:
with open(outdir / "summary.tsv", "w") as outf, \
open(outdir / "summary.json", "w") as outj:
print(' '.join(argv + ['-p', qdir, '--bindings-file', bindings]), file=outf)
print(json.dumps({
'cmdline': argv,
Expand All @@ -92,26 +92,28 @@ def main():
'version': 100
}), file=outj)
for query in sorted(querydir.glob('**/*.sql'), key=lambda x: tuple(map(lambda y: int(y) if re.match(RE_DIGITS, y) else y, re.split(RE_DIGITS, str(x))))):
q = str(query)
name = outdir + '/' + q
q = str(query.stem)
print(f"{q}", end="", flush=True)
name = str(outdir / q)
if len(args.include_q):
include = False
for r in args.include_q:
if re.search(r, name):
if re.search(r, str(query)):
include = True
break
if not include:
continue
if len(args.exclude_q):
include = True
for r in args.exclude_q:
if re.search(r, name):
if re.search(r, str(query)):
include = False
break
if not include:
continue
print(q, end='\t', file=outf)
outname = name + '-result.yson'
print(".", end="", flush=True)
exitcode, rusage, elapsed, iostat = run(
argv + [
'--result-file', outname,
Expand All @@ -120,7 +122,7 @@ def main():
'--err-file', name + '-err.txt',
'--expr-file', name + '-expr.txt',
'--stat', name + '-stat.yson',
'-p', q
'-p', str(query)
],
name + '-stdout.txt',
name + '-stderr.txt',
Expand Down Expand Up @@ -164,25 +166,27 @@ def main():
}
}), file=outj)
outj.flush()
print(".", end="", flush=True)
if args.perf:
exitcode, rusage, elapsed, iostat = run(
['{}/ya'.format(args.arc_path), 'tool', 'perf', 'record', '-F250', '-g', '--call-graph', 'dwarf', '-o', '{}/perf.data'.format(outdir), '--'] +
['perf', 'record', '-F250', '-g', '--call-graph', 'dwarf', '-o', '{}/perf.data'.format(outdir), '--'] +
argv + [
'--result-file', '/dev/null',
'--bindings-file', bindings,
'--plan-file', '/dev/null',
'--err-file', '/dev/null',
'--expr-file', '/dev/null',
'-p', q
'-p', str(query)
],
name + '-stdout-perf.txt',
name + '-stderr-perf.txt',
timeout=args.timeout)
os.system('''
{0}/ya tool perf script -i {2}/perf.data --header |
{0}/contrib/tools/flame-graph/stackcollapse-perf.pl |
{0}/contrib/tools/flame-graph/flamegraph.pl > {1}.svg
'''.format(args.arc_path, name, outdir))
perf script -i {2}/perf.data --header |
{0}/stackcollapse-perf.pl |
{0}/flamegraph.pl > {1}.svg
'''.format(args.flame_graph, name, outdir))
print(".", flush=True)


if __name__ == "__main__":
Expand Down
3 changes: 0 additions & 3 deletions ydb/library/benchmarks/runner/runner/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,4 @@ PY_SRCS(
MAIN runner.py
)

PEERDIR(
)

END()
81 changes: 81 additions & 0 deletions ydb/library/benchmarks/runner/tpc_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import yatest.common
import pathlib
import sys
import os


class Runner:
DEPS = {
"run_tests" : "ydb/library/benchmarks/runner/run_tests",
"dqrun" : "ydb/library/yql/tools/dqrun",
"gen-queries" : "ydb/library/benchmarks/gen_queries",
"result-compare" : "ydb/library/benchmarks/runner/result_compare",
"runner" : "ydb/library/benchmarks/runner/runner"
}

DATA = {
"fs-cfg" : "ydb/library/yql/tools/dqrun/examples/fs.conf",
"gateways-cfg" : "ydb/library/benchmarks/runner/runner/test-gateways.conf",
"flame-graph" : "contrib/tools/flame-graph",
"downloaders-dir" : "ydb/library/benchmarks/runner",
}

UDFS = [
"ydb/library/yql/udfs/common/set",
"ydb/library/yql/udfs/common/url_base",
"ydb/library/yql/udfs/common/datetime2",
"ydb/library/yql/udfs/common/re2"
]

def __init__(self):
self.deps = {name : pathlib.Path(yatest.common.binary_path(path)) for name, path in self.DEPS.items()}
self.udfs = [pathlib.Path(yatest.common.binary_path(path)) for path in self.UDFS]
self.data = {name : pathlib.Path(yatest.common.source_path(path)) for name, path in self.DATA.items() if name}
self.output = pathlib.Path(yatest.common.output_path())
self.results_path = self.output / "results"
self.results_path.mkdir()

self.cmd = [str(self.deps["run_tests"]) + "/run_tests", "--is-test"]
self.cmd += ["--dqrun", str(self.deps["dqrun"]) + "/dqrun"]
self.cmd += ["--gen-queries", str(self.deps["gen-queries"]) + "/gen_queries"]
self.cmd += ["--result-compare", str(self.deps["result-compare"]) + "/result_compare"]
self.cmd += ["--downloaders-dir", str(self.data["downloaders-dir"])]
self.cmd += ["--runner", str(self.deps["runner"]) + "/runner"]
self.cmd += ["--flame-graph", str(self.data["flame-graph"])]
self.cmd += ["--udfs-dir", ";".join(map(str, self.udfs))]
self.cmd += ["--fs-cfg", str(self.data["fs-cfg"])]
self.cmd += ["--gateways-cfg", str(self.data["gateways-cfg"])]
self.cmd += ["-o", str(self.results_path)]

def wrapped_run(self, variant, datasize, tasks, query_filter):
cmd = self.cmd
cmd += ["--variant", f"{variant}"]
cmd += ["--datasize", f"{datasize}"]
cmd += ["--tasks", f"{tasks}"]
if query_filter:
cmd += ["--query-filter", f"{query_filter}"]
yatest.common.execute(cmd, stdout=sys.stdout, stderr=sys.stderr)


def upload(result_path, s3_folder):
uploader = pathlib.Path(yatest.common.source_path("ydb/library/benchmarks/runner/upload_results.py")).resolve()
cmd = ["python3", str(uploader)]
cmd += ["--result-path", str(result_path)]
cmd += ["--s3-folder", str(s3_folder)]
yatest.common.execute(cmd, stdout=sys.stdout, stderr=sys.stderr)


def test_tpc():
is_ci = os.environ.get("PUBLIC_DIR") is not None

runner = Runner()
runner.wrapped_run("h", 1, 1, r"q1\.sql")
result_path = runner.results_path.resolve()
print("Results path: ", result_path, file=sys.stderr)

if is_ci:
s3_folder = pathlib.Path(os.environ["PUBLIC_DIR"]).resolve()

upload(result_path, s3_folder)

exit(1)
Loading

0 comments on commit 0bcaedf

Please sign in to comment.