From 782bf66fb13837cbede1d4120f912eaeeecb4324 Mon Sep 17 00:00:00 2001 From: pseusys Date: Fri, 4 Oct 2024 22:38:47 +0800 Subject: [PATCH] ydb finished --- chatsky/context_storages/mongo.py | 62 ++-- chatsky/context_storages/redis.py | 44 +-- chatsky/context_storages/sql.py | 73 ++--- chatsky/context_storages/ydb.py | 486 +++++++++++++--------------- chatsky/utils/testing/cleanup_db.py | 14 +- 5 files changed, 314 insertions(+), 365 deletions(-) diff --git a/chatsky/context_storages/mongo.py b/chatsky/context_storages/mongo.py index e4470c82a..d6ec0cf38 100644 --- a/chatsky/context_storages/mongo.py +++ b/chatsky/context_storages/mongo.py @@ -61,19 +61,19 @@ def __init__( self._mongo = AsyncIOMotorClient(self.full_path, uuidRepresentation="standard") db = self._mongo.get_default_database() - self._main_table = db[f"{collection_prefix}_{self._main_table_name}"] - self._turns_table = db[f"{collection_prefix}_{self._turns_table_name}"] - self._misc_table = db[f"{collection_prefix}_{self._misc_table_name}"] + self.main_table = db[f"{collection_prefix}_{self._main_table_name}"] + self.turns_table = db[f"{collection_prefix}_{self._turns_table_name}"] + self.misc_table = db[f"{collection_prefix}_{self._misc_table_name}"] asyncio.run( asyncio.gather( - self._main_table.create_index( + self.main_table.create_index( self._id_column_name, background=True, unique=True ), - self._turns_table.create_index( + self.turns_table.create_index( [self._id_column_name, self._key_column_name], background=True, unique=True ), - self._misc_table.create_index( + self.misc_table.create_index( [self._id_column_name, self._key_column_name], background=True, unique=True ) ) @@ -82,25 +82,25 @@ def __init__( # TODO: this method (and similar) repeat often. Optimize? def _get_config_for_field(self, field_name: str) -> Tuple[Collection, str, FieldConfig]: if field_name == self.labels_config.name: - return self._turns_table, field_name, self.labels_config + return self.turns_table, field_name, self.labels_config elif field_name == self.requests_config.name: - return self._turns_table, field_name, self.requests_config + return self.turns_table, field_name, self.requests_config elif field_name == self.responses_config.name: - return self._turns_table, field_name, self.responses_config + return self.turns_table, field_name, self.responses_config elif field_name == self.misc_config.name: - return self._misc_table, self._value_column_name, self.misc_config + return self.misc_table, self._value_column_name, self.misc_config else: raise ValueError(f"Unknown field name: {field_name}!") async def load_main_info(self, ctx_id: str) -> Optional[Tuple[int, int, int, bytes]]: - result = await self._main_table.find_one( + result = await self.main_table.find_one( {self._id_column_name: ctx_id}, [self._current_turn_id_column_name, self._created_at_column_name, self._updated_at_column_name, self._framework_data_column_name] ) return (result[self._current_turn_id_column_name], result[self._created_at_column_name], result[self._updated_at_column_name], result[self._framework_data_column_name]) if result is not None else None async def update_main_info(self, ctx_id: str, turn_id: int, crt_at: int, upd_at: int, fw_data: bytes) -> None: - await self._main_table.update_one( + await self.main_table.update_one( {self._id_column_name: ctx_id}, { "$set": { @@ -116,54 +116,54 @@ async def update_main_info(self, ctx_id: str, turn_id: int, crt_at: int, upd_at: async def delete_context(self, ctx_id: str) -> None: await asyncio.gather( - self._main_table.delete_one({self._id_column_name: ctx_id}), - self._turns_table.delete_one({self._id_column_name: ctx_id}), - self._misc_table.delete_one({self._id_column_name: ctx_id}) + self.main_table.delete_one({self._id_column_name: ctx_id}), + self.turns_table.delete_one({self._id_column_name: ctx_id}), + self.misc_table.delete_one({self._id_column_name: ctx_id}) ) async def load_field_latest(self, ctx_id: str, field_name: str) -> List[Tuple[Hashable, bytes]]: - field_table, field_name, field_config = self._get_config_for_field(field_name) + field_table, key_name, field_config = self._get_config_for_field(field_name) sort, limit, key = None, 0, dict() - if field_table == self._turns_table: + if field_table == self.turns_table: sort = [(self._key_column_name, -1)] if isinstance(field_config.subscript, int): limit = field_config.subscript elif isinstance(field_config.subscript, Set): key = {self._key_column_name: {"$in": list(field_config.subscript)}} result = await field_table.find( - {self._id_column_name: ctx_id, field_name: {"$exists": True, "$ne": None}, **key}, - [self._key_column_name, field_name], + {self._id_column_name: ctx_id, key_name: {"$exists": True, "$ne": None}, **key}, + [self._key_column_name, key_name], sort=sort ).limit(limit).to_list(None) - return [(item[self._key_column_name], item[field_name]) for item in result] + return [(item[self._key_column_name], item[key_name]) for item in result] async def load_field_keys(self, ctx_id: str, field_name: str) -> List[Hashable]: - field_table, field_name, _ = self._get_config_for_field(field_name) + field_table, key_name, _ = self._get_config_for_field(field_name) result = await field_table.aggregate( [ - {"$match": {self._id_column_name: ctx_id, field_name: {"$ne": None}}}, + {"$match": {self._id_column_name: ctx_id, key_name: {"$ne": None}}}, {"$group": {"_id": None, self._UNIQUE_KEYS: {"$addToSet": f"${self._key_column_name}"}}}, ] ).to_list(None) return result[0][self._UNIQUE_KEYS] if len(result) == 1 else list() async def load_field_items(self, ctx_id: str, field_name: str, keys: Set[Hashable]) -> List[bytes]: - field_table, field_name, _ = self._get_config_for_field(field_name) + field_table, key_name, _ = self._get_config_for_field(field_name) result = await field_table.find( - {self._id_column_name: ctx_id, self._key_column_name: {"$in": list(keys)}, field_name: {"$exists": True, "$ne": None}}, - [self._key_column_name, field_name] + {self._id_column_name: ctx_id, self._key_column_name: {"$in": list(keys)}, key_name: {"$exists": True, "$ne": None}}, + [self._key_column_name, key_name] ).to_list(None) - return [(item[self._key_column_name], item[field_name]) for item in result] + return [(item[self._key_column_name], item[key_name]) for item in result] async def update_field_items(self, ctx_id: str, field_name: str, items: List[Tuple[Hashable, bytes]]) -> None: - field_table, field_name, _ = self._get_config_for_field(field_name) + field_table, key_name, _ = self._get_config_for_field(field_name) if len(items) == 0: return await field_table.bulk_write( [ UpdateOne( {self._id_column_name: ctx_id, self._key_column_name: k}, - {"$set": {field_name: v}}, + {"$set": {key_name: v}}, upsert=True, ) for k, v in items ] @@ -171,7 +171,7 @@ async def update_field_items(self, ctx_id: str, field_name: str, items: List[Tup async def clear_all(self) -> None: await asyncio.gather( - self._main_table.delete_many({}), - self._turns_table.delete_many({}), - self._misc_table.delete_many({}) + self.main_table.delete_many({}), + self.turns_table.delete_many({}), + self.misc_table.delete_many({}) ) diff --git a/chatsky/context_storages/redis.py b/chatsky/context_storages/redis.py index b41b561a0..418c48af3 100644 --- a/chatsky/context_storages/redis.py +++ b/chatsky/context_storages/redis.py @@ -68,7 +68,7 @@ def __init__( raise ImportError("`redis` package is missing.\n" + install_suggestion) if not bool(key_prefix): raise ValueError("`key_prefix` parameter shouldn't be empty") - self._redis = Redis.from_url(self.full_path) + self.database = Redis.from_url(self.full_path) self._prefix = key_prefix self._main_key = f"{key_prefix}:{self._main_table_name}" @@ -97,12 +97,12 @@ def _get_config_for_field(self, field_name: str, ctx_id: str) -> Tuple[str, Call raise ValueError(f"Unknown field name: {field_name}!") async def load_main_info(self, ctx_id: str) -> Optional[Tuple[int, int, int, bytes]]: - if await self._redis.exists(f"{self._main_key}:{ctx_id}"): + if await self.database.exists(f"{self._main_key}:{ctx_id}"): cti, ca, ua, fd = await gather( - self._redis.hget(f"{self._main_key}:{ctx_id}", self._current_turn_id_column_name), - self._redis.hget(f"{self._main_key}:{ctx_id}", self._created_at_column_name), - self._redis.hget(f"{self._main_key}:{ctx_id}", self._updated_at_column_name), - self._redis.hget(f"{self._main_key}:{ctx_id}", self._framework_data_column_name) + self.database.hget(f"{self._main_key}:{ctx_id}", self._current_turn_id_column_name), + self.database.hget(f"{self._main_key}:{ctx_id}", self._created_at_column_name), + self.database.hget(f"{self._main_key}:{ctx_id}", self._updated_at_column_name), + self.database.hget(f"{self._main_key}:{ctx_id}", self._framework_data_column_name) ) return (int(cti), int(ca), int(ua), fd) else: @@ -110,50 +110,50 @@ async def load_main_info(self, ctx_id: str) -> Optional[Tuple[int, int, int, byt async def update_main_info(self, ctx_id: str, turn_id: int, crt_at: int, upd_at: int, fw_data: bytes) -> None: await gather( - self._redis.hset(f"{self._main_key}:{ctx_id}", self._current_turn_id_column_name, str(turn_id)), - self._redis.hset(f"{self._main_key}:{ctx_id}", self._created_at_column_name, str(crt_at)), - self._redis.hset(f"{self._main_key}:{ctx_id}", self._updated_at_column_name, str(upd_at)), - self._redis.hset(f"{self._main_key}:{ctx_id}", self._framework_data_column_name, fw_data) + self.database.hset(f"{self._main_key}:{ctx_id}", self._current_turn_id_column_name, str(turn_id)), + self.database.hset(f"{self._main_key}:{ctx_id}", self._created_at_column_name, str(crt_at)), + self.database.hset(f"{self._main_key}:{ctx_id}", self._updated_at_column_name, str(upd_at)), + self.database.hset(f"{self._main_key}:{ctx_id}", self._framework_data_column_name, fw_data) ) async def delete_context(self, ctx_id: str) -> None: - keys = await self._redis.keys(f"{self._prefix}:*:{ctx_id}*") + keys = await self.database.keys(f"{self._prefix}:*:{ctx_id}*") if len(keys) > 0: - await self._redis.delete(*keys) + await self.database.delete(*keys) async def load_field_latest(self, ctx_id: str, field_name: str) -> List[Tuple[Hashable, bytes]]: field_key, field_converter, field_config = self._get_config_for_field(field_name, ctx_id) - keys = await self._redis.hkeys(field_key) + keys = await self.database.hkeys(field_key) if field_key.startswith(self._turns_key): keys = sorted(keys, key=lambda k: int(k), reverse=True) if isinstance(field_config.subscript, int): keys = keys[:field_config.subscript] elif isinstance(field_config.subscript, Set): keys = [k for k in keys if k in self._keys_to_bytes(field_config.subscript)] - values = await gather(*[self._redis.hget(field_key, k) for k in keys]) + values = await gather(*[self.database.hget(field_key, k) for k in keys]) return [(k, v) for k, v in zip(field_converter(keys), values)] async def load_field_keys(self, ctx_id: str, field_name: str) -> List[Hashable]: field_key, field_converter, _ = self._get_config_for_field(field_name, ctx_id) - return field_converter(await self._redis.hkeys(field_key)) + return field_converter(await self.database.hkeys(field_key)) async def load_field_items(self, ctx_id: str, field_name: str, keys: List[Hashable]) -> List[Tuple[Hashable, bytes]]: field_key, field_converter, _ = self._get_config_for_field(field_name, ctx_id) - load = [k for k in await self._redis.hkeys(field_key) if k in self._keys_to_bytes(keys)] - values = await gather(*[self._redis.hget(field_key, k) for k in load]) + load = [k for k in await self.database.hkeys(field_key) if k in self._keys_to_bytes(keys)] + values = await gather(*[self.database.hget(field_key, k) for k in load]) return [(k, v) for k, v in zip(field_converter(load), values)] async def update_field_items(self, ctx_id: str, field_name: str, items: List[Tuple[Hashable, bytes]]) -> None: field_key, _, _ = self._get_config_for_field(field_name, ctx_id) - await gather(*[self._redis.hset(field_key, str(k), v) for k, v in items]) + await gather(*[self.database.hset(field_key, str(k), v) for k, v in items]) async def delete_field_keys(self, ctx_id: str, field_name: str, keys: List[Hashable]) -> None: field_key, _, _ = self._get_config_for_field(field_name, ctx_id) - match = [k for k in await self._redis.hkeys(field_key) if k in self._keys_to_bytes(keys)] + match = [k for k in await self.database.hkeys(field_key) if k in self._keys_to_bytes(keys)] if len(match) > 0: - await self._redis.hdel(field_key, *match) + await self.database.hdel(field_key, *match) async def clear_all(self) -> None: - keys = await self._redis.keys(f"{self._prefix}:*") + keys = await self.database.keys(f"{self._prefix}:*") if len(keys) > 0: - await self._redis.delete(*keys) + await self.database.delete(*keys) diff --git a/chatsky/context_storages/sql.py b/chatsky/context_storages/sql.py index b87201922..a6fff97dd 100644 --- a/chatsky/context_storages/sql.py +++ b/chatsky/context_storages/sql.py @@ -145,7 +145,8 @@ class SQLContextStorage(DBContextStorage): _FIELD_LENGTH = 256 def __init__( - self, path: str, + self, + path: str, rewrite_existing: bool = False, configuration: Optional[Dict[str, FieldConfig]] = None, table_name_prefix: str = "chatsky_table", @@ -161,30 +162,30 @@ def __init__( if self.dialect == "sqlite": event.listen(self.engine.sync_engine, "connect", _sqlite_enable_foreign_key) - self._metadata = MetaData() - self._main_table = Table( + metadata = MetaData() + self.main_table = Table( f"{table_name_prefix}_{self._main_table_name}", - self._metadata, + metadata, Column(self._id_column_name, String(self._UUID_LENGTH), index=True, unique=True, nullable=False), Column(self._current_turn_id_column_name, BigInteger(), nullable=False), Column(self._created_at_column_name, BigInteger(), nullable=False), Column(self._updated_at_column_name, BigInteger(), nullable=False), Column(self._framework_data_column_name, LargeBinary(), nullable=False), ) - self._turns_table = Table( + self.turns_table = Table( f"{table_name_prefix}_{self._turns_table_name}", - self._metadata, - Column(self._id_column_name, String(self._UUID_LENGTH), ForeignKey(self._main_table.name, self._id_column_name), nullable=False), + metadata, + Column(self._id_column_name, String(self._UUID_LENGTH), ForeignKey(self.main_table.name, self._id_column_name), nullable=False), Column(self._key_column_name, Integer(), nullable=False), Column(self.labels_config.name, LargeBinary(), nullable=True), Column(self.requests_config.name, LargeBinary(), nullable=True), Column(self.responses_config.name, LargeBinary(), nullable=True), Index(f"{self._turns_table_name}_index", self._id_column_name, self._key_column_name, unique=True), ) - self._misc_table = Table( + self.misc_table = Table( f"{table_name_prefix}_{self._misc_table_name}", - self._metadata, - Column(self._id_column_name, String(self._UUID_LENGTH), ForeignKey(self._main_table.name, self._id_column_name), nullable=False), + metadata, + Column(self._id_column_name, String(self._UUID_LENGTH), ForeignKey(self.main_table.name, self._id_column_name), nullable=False), Column(self._key_column_name, String(self._FIELD_LENGTH), nullable=False), Column(self._value_column_name, LargeBinary(), nullable=True), Index(f"{self._misc_table_name}_index", self._id_column_name, self._key_column_name, unique=True), @@ -201,7 +202,7 @@ async def _create_self_tables(self): Create tables required for context storing, if they do not exist yet. """ async with self.engine.begin() as conn: - for table in [self._main_table, self._turns_table, self._misc_table]: + for table in [self.main_table, self.turns_table, self.misc_table]: if not await conn.run_sync(lambda sync_conn: inspect(sync_conn).has_table(table.name)): await conn.run_sync(table.create, self.engine) @@ -224,24 +225,24 @@ def _check_availability(self): # TODO: this method (and similar) repeat often. Optimize? def _get_config_for_field(self, field_name: str) -> Tuple[Table, str, FieldConfig]: if field_name == self.labels_config.name: - return self._turns_table, field_name, self.labels_config + return self.turns_table, field_name, self.labels_config elif field_name == self.requests_config.name: - return self._turns_table, field_name, self.requests_config + return self.turns_table, field_name, self.requests_config elif field_name == self.responses_config.name: - return self._turns_table, field_name, self.responses_config + return self.turns_table, field_name, self.responses_config elif field_name == self.misc_config.name: - return self._misc_table, self._value_column_name, self.misc_config + return self.misc_table, self._value_column_name, self.misc_config else: raise ValueError(f"Unknown field name: {field_name}!") async def load_main_info(self, ctx_id: str) -> Optional[Tuple[int, int, int, bytes]]: - stmt = select(self._main_table).where(self._main_table.c[self._id_column_name] == ctx_id) + stmt = select(self.main_table).where(self.main_table.c[self._id_column_name] == ctx_id) async with self.engine.begin() as conn: result = (await conn.execute(stmt)).fetchone() return None if result is None else result[1:] async def update_main_info(self, ctx_id: str, turn_id: int, crt_at: int, upd_at: int, fw_data: bytes) -> None: - insert_stmt = self._INSERT_CALLABLE(self._main_table).values( + insert_stmt = self._INSERT_CALLABLE(self.main_table).values( { self._id_column_name: ctx_id, self._current_turn_id_column_name: turn_id, @@ -263,16 +264,16 @@ async def update_main_info(self, ctx_id: str, turn_id: int, crt_at: int, upd_at: async def delete_context(self, ctx_id: str) -> None: async with self.engine.begin() as conn: await asyncio.gather( - conn.execute(delete(self._main_table).where(self._main_table.c[self._id_column_name] == ctx_id)), - conn.execute(delete(self._turns_table).where(self._turns_table.c[self._id_column_name] == ctx_id)), - conn.execute(delete(self._misc_table).where(self._misc_table.c[self._id_column_name] == ctx_id)), + conn.execute(delete(self.main_table).where(self.main_table.c[self._id_column_name] == ctx_id)), + conn.execute(delete(self.turns_table).where(self.turns_table.c[self._id_column_name] == ctx_id)), + conn.execute(delete(self.misc_table).where(self.misc_table.c[self._id_column_name] == ctx_id)), ) async def load_field_latest(self, ctx_id: str, field_name: str) -> List[Tuple[Hashable, bytes]]: - field_table, field_name, field_config = self._get_config_for_field(field_name) - stmt = select(field_table.c[self._key_column_name], field_table.c[field_name]) - stmt = stmt.where((field_table.c[self._id_column_name] == ctx_id) & (field_table.c[field_name] != None)) - if field_table == self._turns_table: + field_table, key_name, field_config = self._get_config_for_field(field_name) + stmt = select(field_table.c[self._key_column_name], field_table.c[key_name]) + stmt = stmt.where((field_table.c[self._id_column_name] == ctx_id) & (field_table.c[key_name] != None)) + if field_table == self.turns_table: stmt = stmt.order_by(field_table.c[self._key_column_name].desc()) if isinstance(field_config.subscript, int): stmt = stmt.limit(field_config.subscript) @@ -282,37 +283,37 @@ async def load_field_latest(self, ctx_id: str, field_name: str) -> List[Tuple[Ha return list((await conn.execute(stmt)).fetchall()) async def load_field_keys(self, ctx_id: str, field_name: str) -> List[Hashable]: - field_table, field_name, _ = self._get_config_for_field(field_name) - stmt = select(field_table.c[self._key_column_name]).where((field_table.c[self._id_column_name] == ctx_id) & (field_table.c[field_name] != None)) + field_table, key_name, _ = self._get_config_for_field(field_name) + stmt = select(field_table.c[self._key_column_name]).where((field_table.c[self._id_column_name] == ctx_id) & (field_table.c[key_name] != None)) async with self.engine.begin() as conn: return [k[0] for k in (await conn.execute(stmt)).fetchall()] async def load_field_items(self, ctx_id: str, field_name: str, keys: List[Hashable]) -> List[bytes]: - field_table, field_name, _ = self._get_config_for_field(field_name) - stmt = select(field_table.c[self._key_column_name], field_table.c[field_name]) - stmt = stmt.where((field_table.c[self._id_column_name] == ctx_id) & (field_table.c[self._key_column_name].in_(tuple(keys))) & (field_table.c[field_name] != None)) + field_table, key_name, _ = self._get_config_for_field(field_name) + stmt = select(field_table.c[self._key_column_name], field_table.c[key_name]) + stmt = stmt.where((field_table.c[self._id_column_name] == ctx_id) & (field_table.c[self._key_column_name].in_(tuple(keys))) & (field_table.c[key_name] != None)) async with self.engine.begin() as conn: return list((await conn.execute(stmt)).fetchall()) async def update_field_items(self, ctx_id: str, field_name: str, items: List[Tuple[Hashable, bytes]]) -> None: - field_table, field_name, _ = self._get_config_for_field(field_name) + field_table, key_name, _ = self._get_config_for_field(field_name) if len(items) == 0: return - if field_name == self.misc_config.name and any(len(k) > self._FIELD_LENGTH for k, _ in items): + if key_name == self.misc_config.name and any(len(k) > self._FIELD_LENGTH for k, _ in items): raise ValueError(f"Field key length exceeds the limit of {self._FIELD_LENGTH} characters!") insert_stmt = self._INSERT_CALLABLE(field_table).values( [ { self._id_column_name: ctx_id, self._key_column_name: k, - field_name: v, + key_name: v, } for k, v in items ] ) update_stmt = _get_upsert_stmt( self.dialect, insert_stmt, - [field_name], + [key_name], [self._id_column_name, self._key_column_name], ) async with self.engine.begin() as conn: @@ -321,7 +322,7 @@ async def update_field_items(self, ctx_id: str, field_name: str, items: List[Tup async def clear_all(self) -> None: async with self.engine.begin() as conn: await asyncio.gather( - conn.execute(delete(self._main_table)), - conn.execute(delete(self._turns_table)), - conn.execute(delete(self._misc_table)) + conn.execute(delete(self.main_table)), + conn.execute(delete(self.turns_table)), + conn.execute(delete(self.misc_table)) ) diff --git a/chatsky/context_storages/ydb.py b/chatsky/context_storages/ydb.py index 8735238aa..58f833e0a 100644 --- a/chatsky/context_storages/ydb.py +++ b/chatsky/context_storages/ydb.py @@ -10,9 +10,9 @@ take advantage of the scalability and high-availability features provided by the service. """ -import asyncio +from asyncio import gather, run from os.path import join -from typing import Any, Set, Tuple, List, Dict, Optional +from typing import Awaitable, Callable, Hashable, Set, Tuple, List, Dict, Optional from urllib.parse import urlsplit from .database import DBContextStorage, FieldConfig @@ -26,9 +26,9 @@ Column, OptionalType, PrimitiveType, - TableIndex, ) from ydb.aio import Driver, SessionPool + from ydb.table import Session ydb_available = True except ImportError: @@ -55,345 +55,291 @@ class YDBContextStorage(DBContextStorage): :param table_name: The name of the table to use. """ - _CONTEXTS_TABLE = "contexts" - _LOGS_TABLE = "logs" - _KEY_COLUMN = "key" - _VALUE_COLUMN = "value" - _FIELD_COLUMN = "field" - _PACKED_COLUMN = "data" + is_asynchronous = True def __init__( self, path: str, - serializer: Optional[Any] = None, rewrite_existing: bool = False, - turns_config: Optional[FieldConfig] = None, - misc_config: Optional[FieldConfig] = None, + configuration: Optional[Dict[str, FieldConfig]] = None, table_name_prefix: str = "chatsky_table", - timeout=5, + timeout: int = 5, ): - DBContextStorage.__init__(self, path, serializer, rewrite_existing, turns_config, misc_config) - self.context_schema.supports_async = True + DBContextStorage.__init__(self, path, rewrite_existing, configuration) protocol, netloc, self.database, _, _ = urlsplit(path) - self.endpoint = "{}://{}".format(protocol, netloc) if not ydb_available: install_suggestion = get_protocol_install_suggestion("grpc") raise ImportError("`ydb` package is missing.\n" + install_suggestion) self.table_prefix = table_name_prefix - self.driver, self.pool = asyncio.run(_init_drive(timeout, self.endpoint, self.database, table_name_prefix)) - - async def del_item_async(self, key: str): - async def callee(session): - query = f""" - PRAGMA TablePathPrefix("{self.database}"); - DECLARE ${ExtraFields.storage_key.value} AS Utf8; - UPDATE {self.table_prefix}_{self._CONTEXTS_TABLE} SET {ExtraFields.active_ctx.value}=False - WHERE {ExtraFields.storage_key.value} == ${ExtraFields.storage_key.value}; - """ + run(self._init_drive(timeout, f"{protocol}://{netloc}")) + + async def _init_drive(self, timeout: int, endpoint: str) -> None: + self._driver = Driver(endpoint=endpoint, database=self.database) + client_settings = self._driver.table_client._table_client_settings.with_allow_truncated_result(True) + self._driver.table_client._table_client_settings = client_settings + await self._driver.wait(fail_fast=True, timeout=timeout) + + self.pool = SessionPool(self._driver, size=10) + + self.main_table = f"{self.table_prefix}_{self._main_table_name}" + if not await self._does_table_exist(self.main_table): + await self._create_main_table(self.main_table) + + self.turns_table = f"{self.table_prefix}_{self._turns_table_name}" + if not await self._does_table_exist(self.turns_table): + await self._create_turns_table(self.turns_table) + + self.misc_table = f"{self.table_prefix}_{self._misc_table_name}" + if not await self._does_table_exist(self.misc_table): + await self._create_misc_table(self.misc_table) + + async def _does_table_exist(self, table_name: str) -> bool: + async def callee(session: Session) -> None: + await session.describe_table(join(self.database, table_name)) + + try: + await self.pool.retry_operation(callee) + return True + except SchemeError: + return False + + async def _create_main_table(self, table_name: str) -> None: + async def callee(session: Session) -> None: + await session.create_table( + "/".join([self.database, table_name]), + TableDescription() + .with_column(Column(self._id_column_name, PrimitiveType.Utf8)) + .with_column(Column(self._current_turn_id_column_name, PrimitiveType.Uint64)) + .with_column(Column(self._created_at_column_name, PrimitiveType.Uint64)) + .with_column(Column(self._updated_at_column_name, PrimitiveType.Uint64)) + .with_column(Column(self._framework_data_column_name, PrimitiveType.String)) + .with_primary_key(self._id_column_name) + ) - await session.transaction(SerializableReadWrite()).execute( - await session.prepare(query), - {f"${ExtraFields.storage_key.value}": key}, - commit_tx=True, + await self.pool.retry_operation(callee) + + async def _create_turns_table(self, table_name: str) -> None: + async def callee(session: Session) -> None: + await session.create_table( + "/".join([self.database, table_name]), + TableDescription() + .with_column(Column(self._id_column_name, PrimitiveType.Utf8)) + .with_column(Column(self._key_column_name, PrimitiveType.Uint32)) + .with_column(Column(self.labels_config.name, OptionalType(PrimitiveType.String))) + .with_column(Column(self.requests_config.name, OptionalType(PrimitiveType.String))) + .with_column(Column(self.responses_config.name, OptionalType(PrimitiveType.String))) + .with_primary_keys(self._id_column_name, self._key_column_name) ) - return await self.pool.retry_operation(callee) + await self.pool.retry_operation(callee) + + async def _create_misc_table(self, table_name: str) -> None: + async def callee(session: Session) -> None: + await session.create_table( + "/".join([self.database, table_name]), + TableDescription() + .with_column(Column(self._id_column_name, PrimitiveType.Utf8)) + .with_column(Column(self._key_column_name, PrimitiveType.Utf8)) + .with_column(Column(self._value_column_name, OptionalType(PrimitiveType.String))) + .with_primary_keys(self._id_column_name, self._key_column_name) + ) - async def contains_async(self, key: str) -> bool: - async def callee(session): + await self.pool.retry_operation(callee) + + # TODO: this method (and similar) repeat often. Optimize? + def _get_config_for_field(self, field_name: str) -> Tuple[str, str, FieldConfig]: + if field_name == self.labels_config.name: + return self.turns_table, field_name, self.labels_config + elif field_name == self.requests_config.name: + return self.turns_table, field_name, self.requests_config + elif field_name == self.responses_config.name: + return self.turns_table, field_name, self.responses_config + elif field_name == self.misc_config.name: + return self.misc_table, self._value_column_name, self.misc_config + else: + raise ValueError(f"Unknown field name: {field_name}!") + + # TODO: this method (and similar) repeat often. Optimize? + def _transform_keys(self, field_name: str, keys: List[Hashable]) -> List[str]: + if field_name == self.misc_config.name: + return [f"\"{e}\"" for e in keys] + elif field_name in (self.labels_config.name, self.requests_config.name, self.responses_config.name): + return [str(e) for e in keys] + else: + raise ValueError(f"Unknown field name: {field_name}!") + + async def load_main_info(self, ctx_id: str) -> Optional[Tuple[int, int, int, bytes]]: + async def callee(session: Session) -> Optional[Tuple[int, int, int, bytes]]: query = f""" PRAGMA TablePathPrefix("{self.database}"); - DECLARE ${ExtraFields.storage_key.value} AS Utf8; - SELECT COUNT(DISTINCT {ExtraFields.storage_key.value}) AS cnt - FROM {self.table_prefix}_{self._CONTEXTS_TABLE} - WHERE {ExtraFields.storage_key.value} == ${ExtraFields.storage_key.value} AND {ExtraFields.active_ctx.value} == True; + SELECT {self._current_turn_id_column_name}, {self._created_at_column_name}, {self._updated_at_column_name}, {self._framework_data_column_name} + FROM {self.main_table} + WHERE {self._id_column_name} = "{ctx_id}"; """ # noqa: E501 - result_sets = await session.transaction(SerializableReadWrite()).execute( - await session.prepare(query), - {f"${ExtraFields.storage_key.value}": key}, - commit_tx=True, + await session.prepare(query), dict(), commit_tx=True ) - return result_sets[0].rows[0].cnt != 0 if len(result_sets[0].rows) > 0 else False + return ( + result_sets[0].rows[0][self._current_turn_id_column_name], + result_sets[0].rows[0][self._created_at_column_name], + result_sets[0].rows[0][self._updated_at_column_name], + result_sets[0].rows[0][self._framework_data_column_name], + ) if len(result_sets[0].rows) > 0 else None return await self.pool.retry_operation(callee) - async def len_async(self) -> int: - async def callee(session): + async def update_main_info(self, ctx_id: str, turn_id: int, crt_at: int, upd_at: int, fw_data: bytes) -> None: + async def callee(session: Session) -> None: query = f""" PRAGMA TablePathPrefix("{self.database}"); - SELECT COUNT(DISTINCT {ExtraFields.storage_key.value}) AS cnt - FROM {self.table_prefix}_{self._CONTEXTS_TABLE} - WHERE {ExtraFields.active_ctx.value} == True; - """ - - result_sets = await session.transaction(SerializableReadWrite()).execute( + DECLARE ${self._current_turn_id_column_name} AS Uint64; + DECLARE ${self._created_at_column_name} AS Uint64; + DECLARE ${self._updated_at_column_name} AS Uint64; + DECLARE ${self._framework_data_column_name} AS String; + UPSERT INTO {self.main_table} ({self._id_column_name}, {self._current_turn_id_column_name}, {self._created_at_column_name}, {self._updated_at_column_name}, {self._framework_data_column_name}) + VALUES ("{ctx_id}", ${self._current_turn_id_column_name}, ${self._created_at_column_name}, ${self._updated_at_column_name}, ${self._framework_data_column_name}); + """ # noqa: E501 + await session.transaction(SerializableReadWrite()).execute( await session.prepare(query), - commit_tx=True, + { + f"${self._current_turn_id_column_name}": turn_id, + f"${self._created_at_column_name}": crt_at, + f"${self._updated_at_column_name}": upd_at, + f"${self._framework_data_column_name}": fw_data, + }, + commit_tx=True ) - return result_sets[0].rows[0].cnt if len(result_sets[0].rows) > 0 else 0 - return await self.pool.retry_operation(callee) + await self.pool.retry_operation(callee) - async def clear_async(self, prune_history: bool = False): - async def callee(session): - if prune_history: + async def delete_context(self, ctx_id: str) -> None: + def construct_callee(table_name: str) -> Callable[[Session], Awaitable[None]]: + async def callee(session: Session) -> None: query = f""" PRAGMA TablePathPrefix("{self.database}"); - DELETE FROM {self.table_prefix}_{self._CONTEXTS_TABLE}; - """ - else: - query = f""" - PRAGMA TablePathPrefix("{self.database}"); - UPDATE {self.table_prefix}_{self._CONTEXTS_TABLE} SET {ExtraFields.active_ctx.value}=False; - """ + DELETE FROM {table_name} + WHERE {self._id_column_name} = "{ctx_id}"; + """ # noqa: E501 + await session.transaction(SerializableReadWrite()).execute( + await session.prepare(query), dict(), commit_tx=True + ) - await session.transaction(SerializableReadWrite()).execute( - await session.prepare(query), - commit_tx=True, - ) + return callee - return await self.pool.retry_operation(callee) + await gather( + self.pool.retry_operation(construct_callee(self.main_table)), + self.pool.retry_operation(construct_callee(self.turns_table)), + self.pool.retry_operation(construct_callee(self.misc_table)) + ) - async def keys_async(self) -> Set[str]: - async def callee(session): + async def load_field_latest(self, ctx_id: str, field_name: str) -> List[Tuple[Hashable, bytes]]: + field_table, key_name, field_config = self._get_config_for_field(field_name) + + async def callee(session: Session) -> List[Tuple[Hashable, bytes]]: + sort, limit, key = "", "", "" + if field_table == self.turns_table: + sort = f"ORDER BY {self._key_column_name} DESC" + if isinstance(field_config.subscript, int): + limit = f"LIMIT {field_config.subscript}" + elif isinstance(field_config.subscript, Set): + keys = ", ".join(self._transform_keys(field_name, field_config.subscript)) + key = f"AND {self._key_column_name} IN ({keys})" query = f""" PRAGMA TablePathPrefix("{self.database}"); - SELECT DISTINCT {ExtraFields.storage_key.value} - FROM {self.table_prefix}_{self._CONTEXTS_TABLE} - WHERE {ExtraFields.active_ctx.value} == True; - """ - + SELECT {self._key_column_name}, {key_name} + FROM {field_table} + WHERE {self._id_column_name} = "{ctx_id}" AND {key_name} IS NOT NULL {key} + {sort} {limit}; + """ # noqa: E501 result_sets = await session.transaction(SerializableReadWrite()).execute( - await session.prepare(query), - commit_tx=True, + await session.prepare(query), dict(), commit_tx=True ) - return {row[ExtraFields.storage_key.value] for row in result_sets[0].rows} + return [ + (e[self._key_column_name], e[key_name]) for e in result_sets[0].rows + ] if len(result_sets[0].rows) > 0 else list() return await self.pool.retry_operation(callee) - async def _read_pac_ctx(self, storage_key: str) -> Tuple[Dict, Optional[str]]: - async def callee(session): + async def load_field_keys(self, ctx_id: str, field_name: str) -> List[Hashable]: + field_table, key_name, _ = self._get_config_for_field(field_name) + + async def callee(session: Session) -> List[Hashable]: query = f""" PRAGMA TablePathPrefix("{self.database}"); - DECLARE ${ExtraFields.storage_key.value} AS Utf8; - SELECT {ExtraFields.id.value}, {self._PACKED_COLUMN}, {ExtraFields.updated_at.value} - FROM {self.table_prefix}_{self._CONTEXTS_TABLE} - WHERE {ExtraFields.storage_key.value} = ${ExtraFields.storage_key.value} AND {ExtraFields.active_ctx.value} == True - ORDER BY {ExtraFields.updated_at.value} DESC - LIMIT 1; + SELECT {self._key_column_name} + FROM {field_table} + WHERE {self._id_column_name} = "{ctx_id}" AND {key_name} IS NOT NULL; """ # noqa: E501 - result_sets = await session.transaction(SerializableReadWrite()).execute( - await session.prepare(query), - {f"${ExtraFields.storage_key.value}": storage_key}, - commit_tx=True, + await session.prepare(query), dict(), commit_tx=True ) - - if len(result_sets[0].rows) > 0: - return ( - self.serializer.loads(result_sets[0].rows[0][self._PACKED_COLUMN]), - result_sets[0].rows[0][ExtraFields.id.value], - ) - else: - return dict(), None + return [ + e[self._key_column_name] for e in result_sets[0].rows + ] if len(result_sets[0].rows) > 0 else list() return await self.pool.retry_operation(callee) - async def _read_log_ctx(self, keys_limit: Optional[int], field_name: str, id: str) -> Dict: - async def callee(session): - limit = 1001 if keys_limit is None else keys_limit + async def load_field_items(self, ctx_id: str, field_name: str, keys: List[Hashable]) -> List[Tuple[Hashable, bytes]]: + field_table, key_name, _ = self._get_config_for_field(field_name) + async def callee(session: Session) -> List[Tuple[Hashable, bytes]]: query = f""" PRAGMA TablePathPrefix("{self.database}"); - DECLARE ${ExtraFields.id.value} AS Utf8; - DECLARE ${self._FIELD_COLUMN} AS Utf8; - SELECT {self._KEY_COLUMN}, {self._VALUE_COLUMN} - FROM {self.table_prefix}_{self._LOGS_TABLE} - WHERE {ExtraFields.id.value} = ${ExtraFields.id.value} AND {self._FIELD_COLUMN} = ${self._FIELD_COLUMN} - ORDER BY {self._KEY_COLUMN} DESC - LIMIT {limit} + SELECT {self._key_column_name}, {key_name} + FROM {field_table} + WHERE {self._id_column_name} = "{ctx_id}" AND {key_name} IS NOT NULL + AND {self._key_column_name} IN ({', '.join(self._transform_keys(field_name, keys))}); """ # noqa: E501 - - final_offset = 0 - result_sets = None - - result_dict = dict() - while result_sets is None or result_sets[0].truncated: - final_query = f"{query} OFFSET {final_offset};" - result_sets = await session.transaction(SerializableReadWrite()).execute( - await session.prepare(final_query), - {f"${ExtraFields.id.value}": id, f"${self._FIELD_COLUMN}": field_name}, - commit_tx=True, - ) - - if len(result_sets[0].rows) > 0: - for key, value in { - row[self._KEY_COLUMN]: row[self._VALUE_COLUMN] for row in result_sets[0].rows - }.items(): - result_dict[key] = self.serializer.loads(value) - - final_offset += 1000 - - return result_dict + result_sets = await session.transaction(SerializableReadWrite()).execute( + await session.prepare(query), dict(), commit_tx=True + ) + return [ + (e[self._key_column_name], e[key_name]) for e in result_sets[0].rows + ] if len(result_sets[0].rows) > 0 else list() return await self.pool.retry_operation(callee) - async def _write_pac_ctx(self, data: Dict, created: int, updated: int, storage_key: str, id: str): - async def callee(session): + async def update_field_items(self, ctx_id: str, field_name: str, items: List[Tuple[Hashable, bytes]]) -> None: + field_table, key_name, _ = self._get_config_for_field(field_name) + if len(items) == 0: + return + + async def callee(session: Session) -> None: + keys = self._transform_keys(field_name, [k for k, _ in items]) + placeholders = {k: f"${key_name}_{i}" for i, (k, v) in enumerate(items) if v is not None} + declarations = "\n".join(f"DECLARE {p} AS String;" for p in placeholders.values()) + values = ", ".join(f"(\"{ctx_id}\", {keys[i]}, {placeholders.get(k, 'NULL')})" for i, (k, _) in enumerate(items)) query = f""" PRAGMA TablePathPrefix("{self.database}"); - DECLARE ${self._PACKED_COLUMN} AS String; - DECLARE ${ExtraFields.id.value} AS Utf8; - DECLARE ${ExtraFields.storage_key.value} AS Utf8; - DECLARE ${ExtraFields.created_at.value} AS Uint64; - DECLARE ${ExtraFields.updated_at.value} AS Uint64; - UPSERT INTO {self.table_prefix}_{self._CONTEXTS_TABLE} ({self._PACKED_COLUMN}, {ExtraFields.storage_key.value}, {ExtraFields.id.value}, {ExtraFields.active_ctx.value}, {ExtraFields.created_at.value}, {ExtraFields.updated_at.value}) - VALUES (${self._PACKED_COLUMN}, ${ExtraFields.storage_key.value}, ${ExtraFields.id.value}, True, ${ExtraFields.created_at.value}, ${ExtraFields.updated_at.value}); + {declarations} + UPSERT INTO {field_table} ({self._id_column_name}, {self._key_column_name}, {key_name}) + VALUES {values}; """ # noqa: E501 - await session.transaction(SerializableReadWrite()).execute( await session.prepare(query), - { - f"${self._PACKED_COLUMN}": self.serializer.dumps(data), - f"${ExtraFields.id.value}": id, - f"${ExtraFields.storage_key.value}": storage_key, - f"${ExtraFields.created_at.value}": created, - f"${ExtraFields.updated_at.value}": updated, - }, - commit_tx=True, + {placeholders[k]: v for k, v in items if k in placeholders.keys()}, + commit_tx=True ) - return await self.pool.retry_operation(callee) + await self.pool.retry_operation(callee) - async def _write_log_ctx(self, data: List[Tuple[str, int, Dict]], updated: int, id: str): - async def callee(session): - for field, key, value in data: + async def clear_all(self) -> None: + def construct_callee(table_name: str) -> Callable[[Session], Awaitable[None]]: + async def callee(session: Session) -> None: query = f""" PRAGMA TablePathPrefix("{self.database}"); - DECLARE ${self._FIELD_COLUMN} AS Utf8; - DECLARE ${self._KEY_COLUMN} AS Uint64; - DECLARE ${self._VALUE_COLUMN} AS String; - DECLARE ${ExtraFields.id.value} AS Utf8; - DECLARE ${ExtraFields.updated_at.value} AS Uint64; - UPSERT INTO {self.table_prefix}_{self._LOGS_TABLE} ({self._FIELD_COLUMN}, {self._KEY_COLUMN}, {self._VALUE_COLUMN}, {ExtraFields.id.value}, {ExtraFields.updated_at.value}) - VALUES (${self._FIELD_COLUMN}, ${self._KEY_COLUMN}, ${self._VALUE_COLUMN}, ${ExtraFields.id.value}, ${ExtraFields.updated_at.value}); + DELETE FROM {table_name}; """ # noqa: E501 - await session.transaction(SerializableReadWrite()).execute( - await session.prepare(query), - { - f"${self._FIELD_COLUMN}": field, - f"${self._KEY_COLUMN}": key, - f"${self._VALUE_COLUMN}": self.serializer.dumps(value), - f"${ExtraFields.id.value}": id, - f"${ExtraFields.updated_at.value}": updated, - }, - commit_tx=True, + await session.prepare(query), dict(), commit_tx=True ) - return await self.pool.retry_operation(callee) - - -async def _init_drive(timeout: int, endpoint: str, database: str, table_name_prefix: str): - """ - Initialize YDB drive if it doesn't exist and connect to it. - - :param timeout: timeout to wait for driver. - :param endpoint: endpoint to connect to. - :param database: database to connect to. - :param table_name_prefix: prefix for all table names. - """ - driver = Driver(endpoint=endpoint, database=database) - client_settings = driver.table_client._table_client_settings.with_allow_truncated_result(True) - driver.table_client._table_client_settings = client_settings - await driver.wait(fail_fast=True, timeout=timeout) - - pool = SessionPool(driver, size=10) - - logs_table_name = f"{table_name_prefix}_{YDBContextStorage._LOGS_TABLE}" - if not await _does_table_exist(pool, database, logs_table_name): - await _create_logs_table(pool, database, logs_table_name) - - ctx_table_name = f"{table_name_prefix}_{YDBContextStorage._CONTEXTS_TABLE}" - if not await _does_table_exist(pool, database, ctx_table_name): - await _create_contexts_table(pool, database, ctx_table_name) - - return driver, pool - + return callee -async def _does_table_exist(pool, path, table_name) -> bool: - """ - Check if table exists. - - :param pool: driver session pool. - :param path: path to table being checked. - :param table_name: the table name. - :returns: True if table exists, False otherwise. - """ - - async def callee(session): - await session.describe_table(join(path, table_name)) - - try: - await pool.retry_operation(callee) - return True - except SchemeError: - return False - - -async def _create_contexts_table(pool, path, table_name): - """ - Create CONTEXTS table. - - :param pool: driver session pool. - :param path: path to table being checked. - :param table_name: the table name. - """ - - async def callee(session): - await session.create_table( - "/".join([path, table_name]), - TableDescription() - .with_column(Column(ExtraFields.id.value, PrimitiveType.Utf8)) - .with_column(Column(ExtraFields.storage_key.value, OptionalType(PrimitiveType.Utf8))) - .with_column(Column(ExtraFields.active_ctx.value, OptionalType(PrimitiveType.Bool))) - .with_column(Column(ExtraFields.created_at.value, OptionalType(PrimitiveType.Uint64))) - .with_column(Column(ExtraFields.updated_at.value, OptionalType(PrimitiveType.Uint64))) - .with_column(Column(YDBContextStorage._PACKED_COLUMN, OptionalType(PrimitiveType.String))) - .with_index(TableIndex("context_key_index").with_index_columns(ExtraFields.storage_key.value)) - .with_index(TableIndex("context_active_index").with_index_columns(ExtraFields.active_ctx.value)) - .with_primary_key(ExtraFields.id.value), - ) - - return await pool.retry_operation(callee) - - -async def _create_logs_table(pool, path, table_name): - """ - Create CONTEXTS table. - - :param pool: driver session pool. - :param path: path to table being checked. - :param table_name: the table name. - """ - - async def callee(session): - await session.create_table( - "/".join([path, table_name]), - TableDescription() - .with_column(Column(ExtraFields.id.value, PrimitiveType.Utf8)) - .with_column(Column(ExtraFields.updated_at.value, OptionalType(PrimitiveType.Uint64))) - .with_column(Column(YDBContextStorage._FIELD_COLUMN, OptionalType(PrimitiveType.Utf8))) - .with_column(Column(YDBContextStorage._KEY_COLUMN, PrimitiveType.Uint64)) - .with_column(Column(YDBContextStorage._VALUE_COLUMN, OptionalType(PrimitiveType.String))) - .with_index(TableIndex("logs_id_index").with_index_columns(ExtraFields.id.value)) - .with_index(TableIndex("logs_field_index").with_index_columns(YDBContextStorage._FIELD_COLUMN)) - .with_primary_keys( - ExtraFields.id.value, YDBContextStorage._FIELD_COLUMN, YDBContextStorage._KEY_COLUMN - ), + await gather( + self.pool.retry_operation(construct_callee(self.main_table)), + self.pool.retry_operation(construct_callee(self.turns_table)), + self.pool.retry_operation(construct_callee(self.misc_table)) ) - - return await pool.retry_operation(callee) diff --git a/chatsky/utils/testing/cleanup_db.py b/chatsky/utils/testing/cleanup_db.py index f26299e21..d88a85897 100644 --- a/chatsky/utils/testing/cleanup_db.py +++ b/chatsky/utils/testing/cleanup_db.py @@ -5,6 +5,8 @@ including JSON, MongoDB, Pickle, Redis, Shelve, SQL, and YDB databases. """ +from typing import Any + from chatsky.context_storages import ( JSONContextStorage, MongoContextStorage, @@ -38,7 +40,7 @@ async def delete_mongo(storage: MongoContextStorage): """ if not mongo_available: raise Exception("Can't delete mongo database - mongo provider unavailable!") - for collection in [storage._main_table, storage._turns_table, storage._misc_table]: + for collection in [storage.main_table, storage.turns_table, storage.misc_table]: await collection.drop() @@ -51,7 +53,7 @@ async def delete_redis(storage: RedisContextStorage): if not redis_available: raise Exception("Can't delete redis database - redis provider unavailable!") await storage.clear_all() - await storage._redis.close() + await storage.database.aclose() async def delete_sql(storage: SQLContextStorage): @@ -67,7 +69,7 @@ async def delete_sql(storage: SQLContextStorage): if storage.dialect == "mysql" and not mysql_available: raise Exception("Can't delete mysql database - mysql provider unavailable!") async with storage.engine.begin() as conn: - for table in [storage._main_table, storage._turns_table, storage._misc_table]: + for table in [storage.main_table, storage.turns_table, storage.misc_table]: await conn.run_sync(table.drop, storage.engine) @@ -80,8 +82,8 @@ async def delete_ydb(storage: YDBContextStorage): if not ydb_available: raise Exception("Can't delete ydb database - ydb provider unavailable!") - async def callee(session): - for field in [storage._CONTEXTS_TABLE, storage._LOGS_TABLE]: - await session.drop_table("/".join([storage.database, f"{storage.table_prefix}_{field}"])) + async def callee(session: Any) -> None: + for table in [storage.main_table, storage.turns_table, storage.misc_table]: + await session.drop_table("/".join([storage.database, table])) await storage.pool.retry_operation(callee)