From e32ffb7d1d3a44f8a1049524aea857f3546ab8e0 Mon Sep 17 00:00:00 2001 From: Andrew Stalin Date: Mon, 9 Sep 2024 07:22:40 +0000 Subject: [PATCH 1/3] nemesis statistics workload --- ydb/tests/stability/ydb/test_stability.py | 16 ++ ydb/tests/stability/ydb/ya.make | 1 + ydb/tools/statistics_workload/__main__.py | 209 ++++++++++++++++++++++ ydb/tools/statistics_workload/ya.make | 12 ++ ydb/tools/ya.make | 1 + 5 files changed, 239 insertions(+) create mode 100644 ydb/tools/statistics_workload/__main__.py create mode 100644 ydb/tools/statistics_workload/ya.make diff --git a/ydb/tests/stability/ydb/test_stability.py b/ydb/tests/stability/ydb/test_stability.py index cd509dcb757f..643e2ba9c833 100644 --- a/ydb/tests/stability/ydb/test_stability.py +++ b/ydb/tests/stability/ydb/test_stability.py @@ -46,6 +46,7 @@ class TestSetupForStability(object): yatest_common.binary_path('ydb/tests/tools/nemesis/driver/nemesis'), yatest_common.binary_path('ydb/tools/simple_queue/simple_queue'), yatest_common.binary_path('ydb/tools/olap_workload/olap_workload'), + yatest_common.binary_path('ydb/tools/statistics_workload/statistics_workload'), ) @classmethod @@ -138,6 +139,21 @@ def test_olap_workload(self): self._stop_nemesis() + def test_statistics_workload(self): + self._start_nemesis() + duration = 90*60 + + for node_id, node in enumerate(self.kikimr_cluster.nodes.values()): + node.ssh_command( + f'screen -d -m bash -c "sudo /Berkanavt/nemesis/bin/statistics_workload --database /Root/db1 --duration {duration} --log_file /Berkanavt/nemesis/log/statistics_workload.log"', + raise_on_error=True + ) + + logger.info('Sleeping for {} minute(s)'.format(duration)) + time.sleep(duration) + + self._stop_nemesis() + @classmethod def _start_nemesis(cls): for node in cls.kikimr_cluster.nodes.values(): diff --git a/ydb/tests/stability/ydb/ya.make b/ydb/tests/stability/ydb/ya.make index 11bc858442b8..a0cc575e304a 100644 --- a/ydb/tests/stability/ydb/ya.make +++ b/ydb/tests/stability/ydb/ya.make @@ -15,6 +15,7 @@ DATA( DEPENDS( ydb/tools/simple_queue ydb/tools/olap_workload + ydb/tools/statistics_workload ydb/tools/cfg/bin ydb/tests/tools/nemesis/driver ) diff --git a/ydb/tools/statistics_workload/__main__.py b/ydb/tools/statistics_workload/__main__.py new file mode 100644 index 000000000000..efe5e0fb62f3 --- /dev/null +++ b/ydb/tools/statistics_workload/__main__.py @@ -0,0 +1,209 @@ +# -*- coding: utf-8 -*- +import argparse +import ydb +import logging +import time +import os +import random +import string + + +ydb.interceptor.monkey_patch_event_handler() + + +logger = logging.getLogger("StatisticsWorkload") + + +def random_string(length): + letters = string.ascii_lowercase + return bytes(''.join(random.choice(letters) for i in range(length)), encoding='utf8') + + +def random_type(): + return random.choice([ydb.PrimitiveType.Int64, ydb.PrimitiveType.String]) + + +def random_value(type): + if isinstance(type, ydb.OptionalType): + return random_value(type.item) + if type == ydb.PrimitiveType.Int64: + return random.randint(0, 1 << 31) + if type == ydb.PrimitiveType.String: + return random_string(random.randint(1, 32)) + + +class Workload(object): + def __init__(self, endpoint, database, duration, batch_size, batch_count): + self.database = database + self.driver = ydb.Driver(ydb.DriverConfig(endpoint, database)) + self.pool = ydb.SessionPool(self.driver, size=200) + self.duration = duration + self.batch_size = batch_size + self.batch_count = batch_count + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.pool.stop() + self.driver.stop() + + def run_query_ignore_errors(self, callee): + try: + self.pool.retry_operation_sync(callee) + except Exception as e: + logger.error(f'{type(e)}, {e}') + + def generate_batch(self, schema): + data = [] + for i in range(self.batch_size): + data.append({c.name: random_value(c.type) for c in schema}) + return data + + def get_tables(self): + db = self.driver.scheme_client.list_directory(self.database) + return [t.name for t in db.children] + + def create_table(self, table_name): + logger.info(f"create table '{table_name}'") + def callee(session): + session.execute_scheme(f""" + CREATE TABLE `{table_name}` ( + id Int64 NOT NULL, + value Int64, + PRIMARY KEY(id) + ) + PARTITION BY HASH(id) + WITH ( + STORE = COLUMN + ) + """) + self.run_query_ignore_errors(callee) + + def enable_statistics(self, table_name): + def callee(session): + session.execute_scheme(f""" + ALTER OBJECT `{table_name}` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_key, TYPE=COUNT_MIN_SKETCH, FEATURES=`{'column_names': ['id']}`); + """) + session.execute_scheme(f""" + ALTER OBJECT `{table_name}` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_value, TYPE=COUNT_MIN_SKETCH, FEATURES=`{'column_names': ['value']}`); + """) + self.run_query_ignore_errors(callee) + + def drop_table(self, table_name): + logger.info(f'drop table {table_name}') + def callee(session): + session.drop_table(table_name) + self.run_query_ignore_errors(callee) + + def drop_all_tables_with_prefix(self, prefix): + for t in self.get_tables(): + if t.startswith(prefix): + self.drop_table(self.database + "/" + t) + + def list_columns(self, table_name): + def callee(session): + return session.describe_table(self.database + "/" + table_name).columns + return self.pool.retry_operation_sync(callee) + + def add_batch(self, table_name, schema): + column_types = ydb.BulkUpsertColumns() + for c in schema: + column_types.add_column(c.name, c.type) + batch = self.generate_batch(schema) + logger.info(f"batch size: {len(batch)}") + self.driver.table_client.bulk_upsert(self.database + "/" + table_name, batch, column_types) + + def add_data(self, table_name): + schema = self.list_columns(table_name) + for i in range(self.batch_count): + logger.info(f"add batch #{i}") + self.add_batch(table_name, schema) + + def delete_from_table(self, table_name): + logger.info(f"delete from table '{table_name}'") + def callee(session): + session.transaction().execute(f"DELETE FROM `{table_name}`", commit_tx=True) + self.run_query_ignore_errors(callee) + + def rows_count(self, table_name): + return self.driver.table_client.scan_query(f"SELECT count(*) FROM `{table_name}`").next().result_set.rows[0][0] + + def analyze(self, table_name): + table_path = self.database + "/" + table_name + logger.info(f"analyze '{table_name}'") + def callee(session): + session.execute_scheme(f"ANALYZE `{table_path}`") + self.run_query_ignore_errors(callee) + + def execute(self): + table_name = "test_table" + table_statistics = ".metadata/_statistics" + + try: + logger.info("start new round") + + self.pool.acquire() + + self.delete_from_table(table_statistics) + if self.rows_count(table_statistics) > 0: + logger.error(f"table '{table_statistics}' is not empty") + return + + self.drop_all_tables_with_prefix(table_name) + self.create_table(table_name) + + self.add_data(table_name) + count = self.rows_count(table_name) + logger.info(f"number of rows in table '{table_name}' {count}") + if count == 0: + logger.error(f"table {table_name} is empty") + return + + logger.info("waiting to receive information about the table from scheme shard") + time.sleep(300) + + self.analyze(table_name) + + count = self.rows_count(table_statistics) + logger.info(f"number of rows in table '{table_statistics}' {count}") + if count == 0: + logger.error(f"table '{table_statistics}' is empty") + except Exception as e: + logger.error(f"{type(e)}, {e}") + + def run(self): + started_at = time.time() + sleep_time = 20 + + while time.time() - started_at < self.duration: + self.execute() + + logger.info(f"sleeping for {sleep_time} seconds") + time.sleep(sleep_time) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description="statistics stability workload", formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument('--endpoint', default='localhost:2135', help="An endpoint to be used") + parser.add_argument('--database', default=None, required=True, help='A database to connect') + parser.add_argument('--duration', default=120, type=lambda x: int(x), help='A duration of workload in seconds') + parser.add_argument('--batch_size', default=1000, help='Batch size for bulk insert') + parser.add_argument('--batch_count', default=10, help='The number of butches to be inserted') + parser.add_argument('--log_file', default='', help='Append log into specified file') + + args = parser.parse_args() + + if args.log_file: + logging.basicConfig( + filename=args.log_file, + filemode='a', + format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', + datefmt='%H:%M:%S', + level=logging.INFO + ) + + with Workload(args.endpoint, args.database, args.duration, args.batch_size, args.batch_count) as workload: + workload.run() diff --git a/ydb/tools/statistics_workload/ya.make b/ydb/tools/statistics_workload/ya.make new file mode 100644 index 000000000000..3f8b6df61176 --- /dev/null +++ b/ydb/tools/statistics_workload/ya.make @@ -0,0 +1,12 @@ +PY3_PROGRAM(statistics_workload) + +PY_SRCS( + __main__.py +) + +PEERDIR( + ydb/public/sdk/python + library/python/monlib +) + +END() diff --git a/ydb/tools/ya.make b/ydb/tools/ya.make index 375abee36446..18f92a24710a 100644 --- a/ydb/tools/ya.make +++ b/ydb/tools/ya.make @@ -5,6 +5,7 @@ RECURSE( query_replay_yt simple_queue olap_workload + statistics_workload tsserver tstool ydbd_slice From 24489ae0cb686e87f992bce8eda84096ccaa9ceb Mon Sep 17 00:00:00 2001 From: Andrew Stalin Date: Mon, 9 Sep 2024 08:09:11 +0000 Subject: [PATCH 2/3] fix linter error --- ydb/tools/statistics_workload/__main__.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ydb/tools/statistics_workload/__main__.py b/ydb/tools/statistics_workload/__main__.py index efe5e0fb62f3..9dee5dde51d2 100644 --- a/ydb/tools/statistics_workload/__main__.py +++ b/ydb/tools/statistics_workload/__main__.py @@ -3,7 +3,6 @@ import ydb import logging import time -import os import random import string @@ -66,6 +65,7 @@ def get_tables(self): def create_table(self, table_name): logger.info(f"create table '{table_name}'") + def callee(session): session.execute_scheme(f""" CREATE TABLE `{table_name}` ( @@ -92,6 +92,7 @@ def callee(session): def drop_table(self, table_name): logger.info(f'drop table {table_name}') + def callee(session): session.drop_table(table_name) self.run_query_ignore_errors(callee) @@ -122,6 +123,7 @@ def add_data(self, table_name): def delete_from_table(self, table_name): logger.info(f"delete from table '{table_name}'") + def callee(session): session.transaction().execute(f"DELETE FROM `{table_name}`", commit_tx=True) self.run_query_ignore_errors(callee) @@ -132,6 +134,7 @@ def rows_count(self, table_name): def analyze(self, table_name): table_path = self.database + "/" + table_name logger.info(f"analyze '{table_name}'") + def callee(session): session.execute_scheme(f"ANALYZE `{table_path}`") self.run_query_ignore_errors(callee) @@ -152,7 +155,7 @@ def execute(self): self.drop_all_tables_with_prefix(table_name) self.create_table(table_name) - + self.add_data(table_name) count = self.rows_count(table_name) logger.info(f"number of rows in table '{table_name}' {count}") From 5b2a610f916568f7cca299893843497a50fbf683 Mon Sep 17 00:00:00 2001 From: Andrew Stalin Date: Tue, 10 Sep 2024 08:41:37 +0000 Subject: [PATCH 3/3] fix review remarks --- ydb/tests/stability/ydb/test_stability.py | 11 +++++++---- ydb/tools/statistics_workload/__main__.py | 20 +++++++++++++------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/ydb/tests/stability/ydb/test_stability.py b/ydb/tests/stability/ydb/test_stability.py index 643e2ba9c833..ed45bb93e9ac 100644 --- a/ydb/tests/stability/ydb/test_stability.py +++ b/ydb/tests/stability/ydb/test_stability.py @@ -141,16 +141,19 @@ def test_olap_workload(self): def test_statistics_workload(self): self._start_nemesis() - duration = 90*60 + + log_file = "/Berkanavt/nemesis/log/statistics_workload.log" + test_path = "/Berkanavt/nemesis/bin/statistics_workload" for node_id, node in enumerate(self.kikimr_cluster.nodes.values()): node.ssh_command( - f'screen -d -m bash -c "sudo /Berkanavt/nemesis/bin/statistics_workload --database /Root/db1 --duration {duration} --log_file /Berkanavt/nemesis/log/statistics_workload.log"', + f'screen -d -m bash -c "while true; do sudo {test_path} --database /Root/db1 --log_file {log_file} ; done"', raise_on_error=True ) + sleep_time_min = 90 - logger.info('Sleeping for {} minute(s)'.format(duration)) - time.sleep(duration) + logger.info('Sleeping for {} minute(s)'.format(sleep_time_min)) + time.sleep(sleep_time_min*60) self._stop_nemesis() diff --git a/ydb/tools/statistics_workload/__main__.py b/ydb/tools/statistics_workload/__main__.py index 9dee5dde51d2..399120167094 100644 --- a/ydb/tools/statistics_workload/__main__.py +++ b/ydb/tools/statistics_workload/__main__.py @@ -3,6 +3,7 @@ import ydb import logging import time +import os import random import string @@ -13,6 +14,14 @@ logger = logging.getLogger("StatisticsWorkload") +def timestamp(): + return int(1000 * time.time()) + + +def table_name_with_timestamp(table_prefix): + return os.path.join(table_prefix + "_" + str(timestamp())) + + def random_string(length): letters = string.ascii_lowercase return bytes(''.join(random.choice(letters) for i in range(length)), encoding='utf8') @@ -140,7 +149,8 @@ def callee(session): self.run_query_ignore_errors(callee) def execute(self): - table_name = "test_table" + table_prefix = "test_table" + table_name = table_name_with_timestamp(table_prefix) table_statistics = ".metadata/_statistics" try: @@ -153,7 +163,7 @@ def execute(self): logger.error(f"table '{table_statistics}' is not empty") return - self.drop_all_tables_with_prefix(table_name) + self.drop_all_tables_with_prefix(table_prefix) self.create_table(table_name) self.add_data(table_name) @@ -177,14 +187,10 @@ def execute(self): def run(self): started_at = time.time() - sleep_time = 20 while time.time() - started_at < self.duration: self.execute() - logger.info(f"sleeping for {sleep_time} seconds") - time.sleep(sleep_time) - if __name__ == '__main__': parser = argparse.ArgumentParser( @@ -194,7 +200,7 @@ def run(self): parser.add_argument('--database', default=None, required=True, help='A database to connect') parser.add_argument('--duration', default=120, type=lambda x: int(x), help='A duration of workload in seconds') parser.add_argument('--batch_size', default=1000, help='Batch size for bulk insert') - parser.add_argument('--batch_count', default=10, help='The number of butches to be inserted') + parser.add_argument('--batch_count', default=3, help='The number of butches to be inserted') parser.add_argument('--log_file', default='', help='Append log into specified file') args = parser.parse_args()