Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nemesis statistics workload #8919

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions ydb/tests/stability/ydb/test_stability.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class TestSetupForStability(object):
yatest_common.binary_path('ydb/tests/tools/nemesis/driver/nemesis'),
yatest_common.binary_path('ydb/tools/simple_queue/simple_queue'),
yatest_common.binary_path('ydb/tools/olap_workload/olap_workload'),
yatest_common.binary_path('ydb/tools/statistics_workload/statistics_workload'),
)

@classmethod
Expand Down Expand Up @@ -138,6 +139,23 @@ def test_olap_workload(self):

self._stop_nemesis()

def test_statistics_workload(self):
self._start_nemesis()

log_file = "/Berkanavt/nemesis/log/statistics_workload.log"
test_path = "/Berkanavt/nemesis/bin/statistics_workload"
node = list(self.kikimr_cluster.nodes.values())[0]
node.ssh_command(
f'screen -d -m bash -c "while true; do sudo {test_path} --database /Root/db1 --log_file {log_file} ; done"',
raise_on_error=True
)
sleep_time_min = 90

logger.info('Sleeping for {} minute(s)'.format(sleep_time_min))
time.sleep(sleep_time_min*60)

self._stop_nemesis()

@classmethod
def _start_nemesis(cls):
for node in cls.kikimr_cluster.nodes.values():
Expand Down
1 change: 1 addition & 0 deletions ydb/tests/stability/ydb/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ DATA(
DEPENDS(
ydb/tools/simple_queue
ydb/tools/olap_workload
ydb/tools/statistics_workload
ydb/tools/cfg/bin
ydb/tests/tools/nemesis/driver
)
Expand Down
215 changes: 215 additions & 0 deletions ydb/tools/statistics_workload/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# -*- coding: utf-8 -*-
import argparse
import ydb
import logging
import time
import os
import random
import string


ydb.interceptor.monkey_patch_event_handler()


logger = logging.getLogger("StatisticsWorkload")


def table_name_with_prefix(table_prefix):
table_suffix = ''.join(random.choices(string.ascii_uppercase + string.digits, k=5))
return os.path.join(table_prefix + "_" + table_suffix)


def random_string(length):
letters = string.ascii_lowercase
return bytes(''.join(random.choice(letters) for i in range(length)), encoding='utf8')


def random_type():
return random.choice([ydb.PrimitiveType.Int64, ydb.PrimitiveType.String])


def random_value(type):
if isinstance(type, ydb.OptionalType):
return random_value(type.item)
if type == ydb.PrimitiveType.Int64:
return random.randint(0, 1 << 31)
if type == ydb.PrimitiveType.String:
return random_string(random.randint(1, 32))


class Workload(object):
def __init__(self, endpoint, database, duration, batch_size, batch_count):
self.database = database
self.driver = ydb.Driver(ydb.DriverConfig(endpoint, database))
self.pool = ydb.SessionPool(self.driver, size=200)
self.duration = duration
self.batch_size = batch_size
self.batch_count = batch_count

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.pool.stop()
self.driver.stop()

def run_query_ignore_errors(self, callee):
try:
self.pool.retry_operation_sync(callee)
except Exception as e:
logger.error(f'{type(e)}, {e}')

def generate_batch(self, schema):
data = []
for i in range(self.batch_size):
data.append({c.name: random_value(c.type) for c in schema})
return data

def get_tables(self):
db = self.driver.scheme_client.list_directory(self.database)
return [t.name for t in db.children]

def create_table(self, table_name):
logger.info(f"create table '{table_name}'")

def callee(session):
session.execute_scheme(f"""
CREATE TABLE `{table_name}` (
id Int64 NOT NULL,
value Int64,
PRIMARY KEY(id)
)
PARTITION BY HASH(id)
WITH (
STORE = COLUMN
)
""")
self.run_query_ignore_errors(callee)

def enable_statistics(self, table_name):
def callee(session):
session.execute_scheme(f"""
ALTER OBJECT `{table_name}` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_key, TYPE=COUNT_MIN_SKETCH, FEATURES=`{'column_names': ['id']}`);
""")
session.execute_scheme(f"""
ALTER OBJECT `{table_name}` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_value, TYPE=COUNT_MIN_SKETCH, FEATURES=`{'column_names': ['value']}`);
""")
self.run_query_ignore_errors(callee)

def drop_table(self, table_name):
logger.info(f'drop table {table_name}')

def callee(session):
session.drop_table(table_name)
self.run_query_ignore_errors(callee)

def drop_all_tables_with_prefix(self, prefix):
for t in self.get_tables():
if t.startswith(prefix):
self.drop_table(self.database + "/" + t)

def list_columns(self, table_name):
def callee(session):
return session.describe_table(self.database + "/" + table_name).columns
return self.pool.retry_operation_sync(callee)

def add_batch(self, table_name, schema):
column_types = ydb.BulkUpsertColumns()
for c in schema:
column_types.add_column(c.name, c.type)
batch = self.generate_batch(schema)
logger.info(f"batch size: {len(batch)}")
self.driver.table_client.bulk_upsert(self.database + "/" + table_name, batch, column_types)

def add_data(self, table_name):
schema = self.list_columns(table_name)
for i in range(self.batch_count):
logger.info(f"add batch #{i}")
self.add_batch(table_name, schema)

def delete_from_table(self, table_name):
logger.info(f"delete from table '{table_name}'")

def callee(session):
session.transaction().execute(f"DELETE FROM `{table_name}`", commit_tx=True)
self.run_query_ignore_errors(callee)

def rows_count(self, table_name):
return self.driver.table_client.scan_query(f"SELECT count(*) FROM `{table_name}`").next().result_set.rows[0][0]

def analyze(self, table_name):
table_path = self.database + "/" + table_name
logger.info(f"analyze '{table_name}'")

def callee(session):
session.execute_scheme(f"ANALYZE `{table_path}`")
self.run_query_ignore_errors(callee)

def execute(self):
table_prefix = "test_table"
table_name = table_name_with_prefix(table_prefix)
table_statistics = ".metadata/_statistics"

try:
logger.info("start new round")

self.pool.acquire()

self.delete_from_table(table_statistics)
if self.rows_count(table_statistics) > 0:
logger.error(f"table '{table_statistics}' is not empty")
return

self.drop_all_tables_with_prefix(table_prefix)
self.create_table(table_name)

self.add_data(table_name)
count = self.rows_count(table_name)
logger.info(f"number of rows in table '{table_name}' {count}")
if count == 0:
logger.error(f"table {table_name} is empty")
return

logger.info("waiting to receive information about the table from scheme shard")
time.sleep(300)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А точно нужны эти слипы? Типа слип на 5 минут выглядит как перебор

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Этот точно нужен. Он ждет пока информация о таблице дотечет до агрегатора статистики. Иначе команда ANALYZE отработает в холостую и статистики не будет. Такая вот особенность имплементации.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А почему именно 5 минут?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 минуты планировщик срабатывания SchemeShard + 1 минута планировщик обхода StatisticsAggregator-а. Так что 5 минут с запасом.


self.analyze(table_name)

count = self.rows_count(table_statistics)
logger.info(f"number of rows in table '{table_statistics}' {count}")
if count == 0:
logger.error(f"table '{table_statistics}' is empty")
except Exception as e:
logger.error(f"{type(e)}, {e}")

def run(self):
started_at = time.time()

while time.time() - started_at < self.duration:
self.execute()


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="statistics stability workload", formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('--endpoint', default='localhost:2135', help="An endpoint to be used")
parser.add_argument('--database', default=None, required=True, help='A database to connect')
parser.add_argument('--duration', default=120, type=lambda x: int(x), help='A duration of workload in seconds')
parser.add_argument('--batch_size', default=1000, help='Batch size for bulk insert')
parser.add_argument('--batch_count', default=3, help='The number of butches to be inserted')
parser.add_argument('--log_file', default=None, help='Append log into specified file')

args = parser.parse_args()

if args.log_file:
logging.basicConfig(
filename=args.log_file,
filemode='a',
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=logging.INFO
)

with Workload(args.endpoint, args.database, args.duration, args.batch_size, args.batch_count) as workload:
workload.run()
12 changes: 12 additions & 0 deletions ydb/tools/statistics_workload/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
PY3_PROGRAM(statistics_workload)

PY_SRCS(
__main__.py
)

PEERDIR(
ydb/public/sdk/python
library/python/monlib
)

END()
1 change: 1 addition & 0 deletions ydb/tools/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ RECURSE(
query_replay_yt
simple_queue
olap_workload
statistics_workload
tsserver
tstool
ydbd_slice
Expand Down