Skip to content

Commit

Permalink
(Broken) making test properly in-progress
Browse files Browse the repository at this point in the history
  • Loading branch information
vladl2802 committed Aug 27, 2024
1 parent 44e0a8e commit c0db3dd
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 65 deletions.
75 changes: 44 additions & 31 deletions ydb/library/benchmarks/runner/run_tests/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,33 @@
import os
from sys import stderr


def variant(string):
if string not in ["h", "ds"]:
raise ValueError("variant must be h or ds")
return string

def paths(string):
return list(map(pathlib.Path, string.split(";")))

def parse_args(passed=None):
YDB_ROOT = "../../../"

def variant(string):
if string not in ["h", "ds"]:
raise ValueError("variant must be h or ds")
return string

parser = argparse.ArgumentParser()

parser.add_argument('--datasize', type=int, default=1)
parser.add_argument('--variant', type=variant, default='h')
parser.add_argument('--tasks', type=int, default=1)
parser.add_argument('--ydb-root', type=lambda path: pathlib.Path(path).resolve(), default=YDB_ROOT)

parser.add_argument('--dqrun', type=pathlib.Path)
parser.add_argument('--gen-queries', type=pathlib.Path)
parser.add_argument('--downloaders-dir', type=pathlib.Path)
parser.add_argument('--udfs-dir', type=paths)
parser.add_argument('--fs-cfg', type=pathlib.Path)
parser.add_argument('--flame-graph', type=pathlib.Path)
parser.add_argument('--result-compare', type=pathlib.Path)
parser.add_argument('--gateways-cfg', type=pathlib.Path)
parser.add_argument('--runner-path', type=pathlib.Path)

parser.add_argument('-o', '--output', default="./results")
parser.add_argument('--clean-old', action="store_true", default=False)
parser.add_argument('--query-filter', action="append", default=[])
Expand All @@ -26,28 +39,30 @@ def variant(string):

class Runner:
def prepare_queries_dir(self, custom_pragmas):
print("Preparing queries...")
print("Preparing queries...", file=stderr)
self.queries_dir.mkdir(parents=True, exist_ok=True)
cmd = [self.args.gen_queries]
print("queries dir: ", self.queries_dir.resolve(), file=stderr)
cmd = [str(self.args.gen_queries)]
cmd += ["--output", f"{self.queries_dir}"]
cmd += ["--variant", f"{self.args.variant}"]
cmd += ["--syntax", "yql"]
cmd += ["--dataset-size", f"{self.args.datasize}"]
for it in custom_pragmas:
cmd += ["--pragma", it]
print(cmd, file=stderr)
subprocess.run(cmd)

def prepare_tpc_dir(self):
print("Preparing tpc...")
print("Preparing tpc...", file=stderr)
cmd = [f"{self.args.downloaders_dir}/download_files_{self.args.variant}_{self.args.datasize}.sh"]
print(cmd, file=stderr)
subprocess.run(cmd)
os.symlink(f"{self.args.downloaders_dir}/tpc", f"{pathlib.Path("./tpc")}", target_is_directory=True)

def __init__(self, args, enable_spilling):
self.args = args
self.enable_spilling = enable_spilling

self.queries_dir = pathlib.Path(f"queries{"+" if self.enable_spilling else "-"}spilling-{args.datasize}-{args.tasks}")
self.queries_dir = pathlib.Path(f"queries{"+" if self.enable_spilling else "-"}spilling-{args.datasize}-{args.tasks}").resolve()
if self.args.clean_old or not self.queries_dir.exists():
self.prepare_queries_dir([
f"dq.MaxTasksPerStage={self.args.tasks}",
Expand All @@ -60,23 +75,26 @@ def __init__(self, args, enable_spilling):
self.tpc_dir = pathlib.Path(f"{self.args.downloaders_dir}/tpc/{self.args.variant}/{self.args.datasize}").resolve()
if self.args.clean_old or not self.tpc_dir.exists():
self.prepare_tpc_dir()
if not pathlib.Path("./tpc").exists():
os.symlink(f"{self.args.downloaders_dir}/tpc", f"{pathlib.Path("./tpc")}", target_is_directory=True)

self.result_dir = pathlib.Path(f"{self.args.output}/{"with" if self.enable_spilling else "no"}-spilling/{args.variant}-{args.datasize}-{args.tasks}").resolve()
self.result_dir.mkdir(parents=True, exist_ok=True)

def run(self):
cmd = ["/usr/bin/time", f"{self.args.runner_path}"]
cmd = ["/usr/bin/time", f"{str(self.args.runner_path)}"]
cmd += ["--perf"]
for it in self.args.query_filter:
cmd += ["--query-filter", it]
cmd += ["--query-dir", f"{self.queries_dir}/{self.args.variant}"]
cmd += ["--bindings", f"{self.queries_dir}/{self.args.variant}/bindings.json"]
cmd += ["--result-dir", f"{self.result_dir}"]
cmd += ["--include-q", it]
cmd += ["--query-dir", f"{str(self.queries_dir)}/{self.args.variant}"]
cmd += ["--bindings", f"{str(self.queries_dir)}/{self.args.variant}/bindings.json"]
cmd += ["--result-dir", str(self.result_dir)]
cmd += ["--flame-graph", str(self.flame_graph)]
cmd += [f"{self.args.dqrun}", "-s"]
cmd += ["--enable-spilling"] if self.enable_spilling else []
cmd += ["--udfs-dir", f"{self.args.udfs_dir}"]
cmd += ["--fs-cfg", f"{self.args.fs_cfg}"]
cmd += ["--gateways-cfg", f"{self.args.gateways_cfg}"]
cmd += ["--udfs-dir", ";".join(map(str, self.args.udfs_dir))]
cmd += ["--fs-cfg", f"{str(self.args.fs_cfg)}"]
cmd += ["--gateways-cfg", f"{str(self.args.gateways_cfg)}"]
subprocess.run(cmd)

return self.result_dir
Expand All @@ -89,16 +107,8 @@ def result_compare(args, to_compare):
with open(f"{args.output}/result-{args.variant}-{args.datasize}-{args.tasks}.htm", "w") as result_table:
subprocess.run(cmd, stdout=result_table)

def main(passed=None):
args, _ = parse_args(passed)
args.dqrun = args.ydb_root / "library" / "yql" / "tools" / "dqrun" / "dqrun"
args.gen_queries = args.ydb_root / "library" / "benchmarks" / "gen_queries" / "gen_queries"
args.downloaders_dir = args.ydb_root / "library" / "benchmarks" / "runner"
args.udfs_dir = args.ydb_root / "library" / "yql" / "udfs" / "common"
args.fs_cfg = args.ydb_root / "library" / "yql" / "tools" / "dqrun" / "examples" / "fs.conf"
args.result_compare = args.ydb_root / "library" / "benchmarks" / "runner" / "result_compare" / "result_compare"
args.gateways_cfg = args.ydb_root / "library" / "benchmarks" / "runner" / "runner" / "test-gateways.conf"
args.runner_path = args.ydb_root / "library" / "benchmarks" / "runner" / "runner" / "runner"
def run(passed=None):
args = parse_args(passed)

print(args.query_filter)

Expand All @@ -112,5 +122,8 @@ def main(passed=None):

result_compare(args, results)

def main():
run()

if __name__ == "__main__":
main()
main()
22 changes: 9 additions & 13 deletions ydb/library/benchmarks/runner/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ def main():
parser.add_argument('--result-dir', type=Path, default="result-{:%Y%m%dT%H%M%S}".format(datetime.datetime.now()))
parser.add_argument('--timeout', type=int, default=30*60)
parser.add_argument('--perf', action='store_true')
parser.add_argument('--arc-path', type=str, default='{}/arcadia'.format(os.environ['HOME']))
parser.add_argument('--flame-graph', type=Path, default=None)
parser.add_argument('--include-q', default=[], action='append')
parser.add_argument('--exclude-q', default=[], action='append')
parser.add_argument('--query-filter', action="append", default=[])


args, argv = parser.parse_known_intermixed_args()

qdir = args.query_dir
bindings = args.bindings
outdir = args.result_dir
Expand All @@ -93,24 +93,20 @@ def main():
}), file=outj)
for query in sorted(querydir.glob('**/*.sql'), key=lambda x: tuple(map(lambda y: int(y) if re.match(RE_DIGITS, y) else y, re.split(RE_DIGITS, str(x))))):
q = str(query.stem)
# q<num>.sql
num = q[1:-4]
if args.query_filter != [] and num not in args.query_filter:
continue
print(f"{q}", end="", flush=True)
name = str(outdir / q)
if len(args.include_q):
include = False
for r in args.include_q:
if re.search(r, name):
if re.search(r, str(query)):
include = True
break
if not include:
continue
if len(args.exclude_q):
include = True
for r in args.exclude_q:
if re.search(r, name):
if re.search(r, str(query)):
include = False
break
if not include:
Expand Down Expand Up @@ -186,10 +182,10 @@ def main():
name + '-stderr-perf.txt',
timeout=args.timeout)
os.system('''
/usr/bin/perf script -i {2}/perf.data --header |
{0}/contrib/tools/flame-graph/stackcollapse-perf.pl |
{0}/contrib/tools/flame-graph/flamegraph.pl > {1}.svg
'''.format(args.arc_path, name, outdir))
perf script -i {2}/perf.data --header |
{0}/stackcollapse-perf.pl |
{0}/flamegraph.pl > {1}.svg
'''.format(args.flame_graph, name, outdir))
print(".", flush=True)


Expand Down
10 changes: 0 additions & 10 deletions ydb/library/benchmarks/runner/runner/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,4 @@ PY_SRCS(
MAIN runner.py
)

PEERDIR(
ydb/library/yql/tools/dqrun
ydb/library/benchmarks/gen_queries

ydb/library/yql/udfs/common/set
ydb/library/yql/udfs/common/url_base
ydb/library/yql/udfs/common/datetime2
ydb/library/yql/udfs/common/re2
)

END()
68 changes: 57 additions & 11 deletions ydb/library/benchmarks/runner/tpc_tests.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,62 @@
import run_tests.run_tests as run_tests
import yatest.common
import pathlib
import sys


def wrapped_run(variant, datasize, tasks):
cmd = []
cmd += ["--variant", f"{variant}"]
cmd += ["--datasize", f"{datasize}"]
cmd += ["--tasks", f"{tasks}"]
cmd += ["--query-filter", "1"]
cmd += ["--ydb-root", "/home/vladluk/ydbwork/ydb/ydb"]
cmd += ["-o", "/home/vladluk/ydbwork/ydb/ydb/library/benchmarks/runner/results"]
run_tests.main(cmd)
class TestRunner:
DEPS = {
"dqrun" : "ydb/library/yql/tools/dqrun",
"gen-queries" : "ydb/library/benchmarks/gen_queries",
"result-compare" : "ydb/library/benchmarks/runner/result_compare",
"runner" : "ydb/library/benchmarks/runner/runner"
}

DATA = {
"fs-cfg" : "ydb/library/yql/tools/dqrun/examples/fs.conf",
"gateways-cfg" : "ydb/library/benchmarks/runner/runner/test-gateways.conf",
"flame-graph" : "contrib/tools/flame-graph"
}

def test_tpc_h_1_1():
wrapped_run("h", 1, 1)
UDFS = [
"ydb/library/yql/udfs/common/set",
"ydb/library/yql/udfs/common/url_base",
"ydb/library/yql/udfs/common/datetime2",
"ydb/library/yql/udfs/common/re2"
]

def __init__(self):
self.deps = {name : pathlib.Path(yatest.common.binary_path(path)) for name, path in self.DEPS.items()}
self.udfs = [pathlib.Path(yatest.common.binary_path(path)) for path in self.UDFS]
self.data = {name : pathlib.Path(yatest.common.source_path(path)) for name, path in self.DATA.items()}
self.output = pathlib.Path(yatest.common.output_path()).resolve()
self.results_path = self.output / "results"
self.results_path.mkdir()

self.cmd = []
self.cmd += ["--dqrun", str(self.deps["dqrun"]) + "/dqrun"]
self.cmd += ["--gen-queries", str(self.deps["gen-queries"]) + "/gen_queries"]
self.cmd += ["--result-compare", str(self.deps["result-compare"]) + "/result_compare"]
self.cmd += ["--downloaders-dir", "/home/vladluk/ydbwork/ydb/ydb/library/benchmarks/runner"]
self.cmd += ["--runner", str(self.deps["runner"]) + "/runner"]
self.cmd += ["--flama-graph", str(self.data["flame-graph"])]
self.cmd += ["--udfs-dir", ";".join(map(str, self.udfs))]
self.cmd += ["--fs-cfg", str(self.data["fs-cfg"])]
self.cmd += ["--gateways-cfg", str(self.data["gateways-cfg"])]
self.cmd += ["-o", str(self.results_path)]


def wrapped_run(self, variant, datasize, tasks, query_filter):
cmd = self.cmd
cmd += ["--variant", f"{variant}"]
cmd += ["--datasize", f"{datasize}"]
cmd += ["--tasks", f"{tasks}"]
cmd += ["--query-filter", f"{query_filter}"]
print(" ".join(cmd), file=sys.stderr)
run_tests.run(cmd)


def test_tpc():
runner = TestRunner()
runner.wrapped_run("h", 1, 1, r"q1\.sql")
print("results path:", runner.results_path.resolve(), file=sys.stderr)
18 changes: 18 additions & 0 deletions ydb/library/benchmarks/runner/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,24 @@ TEST_SRCS(
tpc_tests.py
)

DEPENDS(
ydb/library/yql/tools/dqrun
ydb/library/benchmarks/gen_queries
ydb/library/benchmarks/runner/result_compare
ydb/library/benchmarks/runner/runner

ydb/library/yql/udfs/common/set
ydb/library/yql/udfs/common/url_base
ydb/library/yql/udfs/common/datetime2
ydb/library/yql/udfs/common/re2
)

DATA(
arcadia/ydb/library/yql/tools/dqrun/examples/fs.conf
arcadia/ydb/library/benchmarks/runner/runner/test-gateways.conf
contrib/tools/flame-graph
)

END()

RECURSE(
Expand Down

0 comments on commit c0db3dd

Please sign in to comment.