Skip to content

Commit

Permalink
fix resource level max_table_nesting and normalizer performance tuning (
Browse files Browse the repository at this point in the history
#2026)

* fix max table nesting, updated tests to come

* completely rework tests

* calculate max nesting only once, and count nesting level backwards

* fix normalizer tests in common

* cache shorten fragments (saves about 20-25% of time)

* cache normalizing identifiers
  • Loading branch information
sh-rp authored Nov 7, 2024
1 parent 9c14458 commit 62f46db
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 392 deletions.
100 changes: 66 additions & 34 deletions dlt/common/normalizers/json/relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
TColumnName,
TSimpleRegex,
DLT_NAME_PREFIX,
TTableSchema,
)
from dlt.common.schema.utils import (
column_name_validator,
Expand Down Expand Up @@ -100,32 +99,31 @@ def _flatten(
) -> Tuple[DictStrAny, Dict[Tuple[str, ...], Sequence[Any]]]:
out_rec_row: DictStrAny = {}
out_rec_list: Dict[Tuple[str, ...], Sequence[Any]] = {}
schema_naming = self.schema.naming

def norm_row_dicts(dict_row: StrAny, __r_lvl: int, path: Tuple[str, ...] = ()) -> None:
for k, v in dict_row.items():
if k.strip():
norm_k = schema_naming.normalize_identifier(k)
norm_k = self._normalize_identifier(self.schema, k)
else:
# for empty keys in the data use _
norm_k = self.EMPTY_KEY_IDENTIFIER
# if norm_k != k:
# print(f"{k} -> {norm_k}")
nested_name = (
norm_k if path == () else schema_naming.shorten_fragments(*path, norm_k)
norm_k if path == () else self._shorten_fragments(self.schema, *path, norm_k)
)
# for lists and dicts we must check if type is possibly nested
if isinstance(v, (dict, list)):
if not self._is_nested_type(
self.schema, table, nested_name, self.max_nesting, __r_lvl
):
if not self._is_nested_type(self.schema, table, nested_name, __r_lvl):
# TODO: if schema contains table {table}__{nested_name} then convert v into single element list
if isinstance(v, dict):
# flatten the dict more
norm_row_dicts(v, __r_lvl + 1, path + (norm_k,))
norm_row_dicts(v, __r_lvl - 1, path + (norm_k,))
else:
# pass the list to out_rec_list
out_rec_list[path + (schema_naming.normalize_table_identifier(k),)] = v
out_rec_list[
path + (self._normalize_table_identifier(self.schema, k),)
] = v
continue
else:
# pass the nested value to out_rec_row
Expand Down Expand Up @@ -174,9 +172,9 @@ def _add_row_id(
flattened_row: DictStrAny,
parent_row_id: str,
pos: int,
_r_lvl: int,
is_root: bool = False,
) -> str:
if _r_lvl == 0: # root table
if is_root: # root table
row_id_type = self._get_root_row_id_type(self.schema, table)
if row_id_type in ("key_hash", "row_hash"):
subset = None
Expand All @@ -201,14 +199,14 @@ def _add_row_id(
flattened_row[self.c_dlt_id] = row_id
return row_id

def _get_propagated_values(self, table: str, row: DictStrAny, _r_lvl: int) -> StrAny:
def _get_propagated_values(self, table: str, row: DictStrAny, is_root: bool) -> StrAny:
extend: DictStrAny = {}

config = self.propagation_config
if config:
# mapping(k:v): propagate property with name "k" as property with name "v" in nested table
mappings: Dict[TColumnName, TColumnName] = {}
if _r_lvl == 0:
if is_root:
mappings.update(config.get("root") or {})
if table in (config.get("tables") or {}):
mappings.update(config["tables"][table])
Expand All @@ -229,7 +227,7 @@ def _normalize_list(
parent_row_id: Optional[str] = None,
_r_lvl: int = 0,
) -> TNormalizedRowIterator:
table = self.schema.naming.shorten_fragments(*parent_path, *ident_path)
table = self._shorten_fragments(self.schema, *parent_path, *ident_path)

for idx, v in enumerate(seq):
if isinstance(v, dict):
Expand All @@ -246,14 +244,14 @@ def _normalize_list(
parent_path,
parent_row_id,
idx,
_r_lvl + 1,
_r_lvl - 1,
)
else:
# found non-dict in seq, so wrap it
wrap_v = wrap_in_dict(self.c_value, v)
DataItemNormalizer._extend_row(extend, wrap_v)
self._add_row_id(table, wrap_v, wrap_v, parent_row_id, idx, _r_lvl)
yield (table, self.schema.naming.shorten_fragments(*parent_path)), wrap_v
self._add_row_id(table, wrap_v, wrap_v, parent_row_id, idx)
yield (table, self._shorten_fragments(self.schema, *parent_path)), wrap_v

def _normalize_row(
self,
Expand All @@ -264,24 +262,25 @@ def _normalize_row(
parent_row_id: Optional[str] = None,
pos: Optional[int] = None,
_r_lvl: int = 0,
is_root: bool = False,
) -> TNormalizedRowIterator:
schema = self.schema
table = schema.naming.shorten_fragments(*parent_path, *ident_path)
table = self._shorten_fragments(schema, *parent_path, *ident_path)
# flatten current row and extract all lists to recur into
flattened_row, lists = self._flatten(table, dict_row, _r_lvl)
# always extend row
DataItemNormalizer._extend_row(extend, flattened_row)
# infer record hash or leave existing primary key if present
row_id = flattened_row.get(self.c_dlt_id, None)
if not row_id:
row_id = self._add_row_id(table, dict_row, flattened_row, parent_row_id, pos, _r_lvl)
row_id = self._add_row_id(table, dict_row, flattened_row, parent_row_id, pos, is_root)

# find fields to propagate to nested tables in config
extend.update(self._get_propagated_values(table, flattened_row, _r_lvl))
extend.update(self._get_propagated_values(table, flattened_row, is_root))

# yield parent table first
should_descend = yield (
(table, schema.naming.shorten_fragments(*parent_path)),
(table, self._shorten_fragments(schema, *parent_path)),
flattened_row,
)
if should_descend is False:
Expand All @@ -295,7 +294,7 @@ def _normalize_row(
list_path,
parent_path + ident_path,
row_id,
_r_lvl + 1,
_r_lvl - 1,
)

def extend_schema(self) -> None:
Expand Down Expand Up @@ -361,10 +360,16 @@ def normalize_data_item(
row = cast(DictStrAny, item)
# identify load id if loaded data must be processed after loading incrementally
row[self.c_dlt_load_id] = load_id
# get table name and nesting level
root_table_name = self._normalize_table_identifier(self.schema, table_name)
max_nesting = self._get_table_nesting_level(self.schema, root_table_name, self.max_nesting)

yield from self._normalize_row(
row,
{},
(self.schema.naming.normalize_table_identifier(table_name),),
(root_table_name,),
_r_lvl=max_nesting, # we count backwards
is_root=True,
)

@classmethod
Expand Down Expand Up @@ -422,12 +427,39 @@ def _normalize_prop(
validator_f=column_name_validator(schema.naming),
)

#
# Cached helper methods for all operations that are called often
#
@staticmethod
@lru_cache(maxsize=None)
def _shorten_fragments(schema: Schema, *idents: str) -> str:
return schema.naming.shorten_fragments(*idents)

@staticmethod
@lru_cache(maxsize=None)
def _normalize_table_identifier(schema: Schema, table_name: str) -> str:
return schema.naming.normalize_table_identifier(table_name)

@staticmethod
def _get_table_nesting_level(schema: Schema, table_name: str) -> Optional[int]:
@lru_cache(maxsize=None)
def _normalize_identifier(schema: Schema, identifier: str) -> str:
return schema.naming.normalize_path(identifier)

@staticmethod
@lru_cache(maxsize=None)
def _get_table_nesting_level(
schema: Schema, table_name: str, default_nesting: int = 1000
) -> Optional[int]:
"""gets table nesting level, will inherit from parent if not set"""

table = schema.tables.get(table_name)
if table:
return table.get("x-normalizer", {}).get("max_nesting") # type: ignore
return None
if (
table
and (max_nesting := cast(int, table.get("x-normalizer", {}).get("max_nesting")))
is not None
):
return max_nesting
return default_nesting

@staticmethod
@lru_cache(maxsize=None)
Expand All @@ -440,18 +472,18 @@ def _get_primary_key(schema: Schema, table_name: str) -> List[str]:
@staticmethod
@lru_cache(maxsize=None)
def _is_nested_type(
schema: Schema, table_name: str, field_name: str, max_nesting: int, _r_lvl: int
schema: Schema,
table_name: str,
field_name: str,
_r_lvl: int,
) -> bool:
"""For those paths the nested objects should be left in place.
Cache perf: max_nesting < _r_lvl: ~2x faster, full check 10x faster
"""
# turn everything at the recursion level into nested type
max_table_nesting = DataItemNormalizer._get_table_nesting_level(schema, table_name)
if max_table_nesting is not None:
max_nesting = max_table_nesting

assert _r_lvl <= max_nesting
if _r_lvl == max_nesting:
# nesting level is counted backwards
# is we have traversed to or beyond the calculated nesting level, we detect a nested type
if _r_lvl <= 0:
return True

column: TColumnSchema = None
Expand Down
42 changes: 24 additions & 18 deletions tests/common/normalizers/test_json_relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_flatten_fix_field_name(norm: RelationalNormalizer) -> None:
"f 2": [],
"f!3": {"f4": "a", "f-5": "b", "f*6": {"c": 7, "c v": 8, "c x": []}},
}
flattened_row, lists = norm._flatten("mock_table", row, 0)
flattened_row, lists = norm._flatten("mock_table", row, 1000)
assert "f_1" in flattened_row
# assert "f_2" in flattened_row
assert "f_3__f4" in flattened_row
Expand Down Expand Up @@ -93,7 +93,7 @@ def test_nested_table_linking(norm: RelationalNormalizer) -> None:
# request _dlt_root_id propagation
add_dlt_root_id_propagation(norm)

rows = list(norm._normalize_row(row, {}, ("table",)))
rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
# should have 7 entries (root + level 1 + 3 * list + 2 * object)
assert len(rows) == 7
# root elem will not have a root hash if not explicitly added, "extend" is added only to child
Expand Down Expand Up @@ -144,7 +144,7 @@ def test_skip_nested_link_when_no_parent(norm: RelationalNormalizer) -> None:
table__f = new_table("table__f", parent_table_name=None)
norm.schema.update_table(table__f)

rows = list(norm._normalize_row(row, {}, ("table",)))
rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
root = next(t for t in rows if t[0][0] == "table")[1]
# record hash is random for primary keys, not based on their content
# this is a change introduced in dlt 0.2.0a30
Expand Down Expand Up @@ -174,7 +174,7 @@ def test_yields_parents_first(norm: RelationalNormalizer) -> None:
"f": [{"id": "level1", "l": ["a", "b", "c"], "v": 120, "o": [{"a": 1}, {"a": 2}]}],
"g": [{"id": "level2_g", "l": ["a"]}],
}
rows = list(norm._normalize_row(row, {}, ("table",)))
rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
tables = list(r[0][0] for r in rows)
# child tables are always yielded before parent tables
expected_tables = [
Expand Down Expand Up @@ -220,7 +220,7 @@ def test_yields_parent_relation(norm: RelationalNormalizer) -> None:
}
],
}
rows = list(norm._normalize_row(row, {}, ("table",)))
rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
# normalizer must return parent table first and move in order of the list elements when yielding child tables
# the yielding order if fully defined
expected_parents = [
Expand Down Expand Up @@ -281,7 +281,7 @@ def test_list_position(norm: RelationalNormalizer) -> None:
row: DictStrAny = {
"f": [{"l": ["a", "b", "c"], "v": 120, "lo": [{"e": "a"}, {"e": "b"}, {"e": "c"}]}]
}
rows = list(norm._normalize_row(row, {}, ("table",)))
rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
# root has no pos
root = [t for t in rows if t[0][0] == "table"][0][1]
assert "_dlt_list_idx" not in root
Expand Down Expand Up @@ -436,7 +436,7 @@ def test_child_row_deterministic_hash(norm: RelationalNormalizer) -> None:
"_dlt_id": row_id,
"f": [{"l": ["a", "b", "c"], "v": 120, "lo": [{"e": "a"}, {"e": "b"}, {"e": "c"}]}],
}
rows = list(norm._normalize_row(row, {}, ("table",)))
rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
children = [t for t in rows if t[0][0] != "table"]
# all hashes must be different
distinct_hashes = set([ch[1]["_dlt_id"] for ch in children])
Expand All @@ -455,19 +455,19 @@ def test_child_row_deterministic_hash(norm: RelationalNormalizer) -> None:
assert f_lo_p2["_dlt_id"] == digest128(f"{el_f['_dlt_id']}_table__f__lo_2", DLT_ID_LENGTH_BYTES)

# same data with same table and row_id
rows_2 = list(norm._normalize_row(row, {}, ("table",)))
rows_2 = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
children_2 = [t for t in rows_2 if t[0][0] != "table"]
# corresponding hashes must be identical
assert all(ch[0][1]["_dlt_id"] == ch[1][1]["_dlt_id"] for ch in zip(children, children_2))

# change parent table and all child hashes must be different
rows_4 = list(norm._normalize_row(row, {}, ("other_table",)))
rows_4 = list(norm._normalize_row(row, {}, ("other_table",), _r_lvl=1000, is_root=True))
children_4 = [t for t in rows_4 if t[0][0] != "other_table"]
assert all(ch[0][1]["_dlt_id"] != ch[1][1]["_dlt_id"] for ch in zip(children, children_4))

# change parent hash and all child hashes must be different
row["_dlt_id"] = uniq_id()
rows_3 = list(norm._normalize_row(row, {}, ("table",)))
rows_3 = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
children_3 = [t for t in rows_3 if t[0][0] != "table"]
assert all(ch[0][1]["_dlt_id"] != ch[1][1]["_dlt_id"] for ch in zip(children, children_3))

Expand All @@ -483,7 +483,13 @@ def test_keeps_dlt_id(norm: RelationalNormalizer) -> None:
def test_propagate_hardcoded_context(norm: RelationalNormalizer) -> None:
row = {"level": 1, "list": ["a", "b", "c"], "comp": [{"_timestamp": "a"}]}
rows = list(
norm._normalize_row(row, {"_timestamp": 1238.9, "_dist_key": "SENDER_3000"}, ("table",))
norm._normalize_row(
row,
{"_timestamp": 1238.9, "_dist_key": "SENDER_3000"},
("table",),
_r_lvl=1000,
is_root=True,
)
)
# context is not added to root element
root = next(t for t in rows if t[0][0] == "table")[1]
Expand Down Expand Up @@ -514,7 +520,7 @@ def test_propagates_root_context(norm: RelationalNormalizer) -> None:
"dependent_list": [1, 2, 3],
"dependent_objects": [{"vx": "ax"}],
}
normalized_rows = list(norm._normalize_row(row, {}, ("table",)))
normalized_rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
# all non-root rows must have:
non_root = [r for r in normalized_rows if r[0][1] is not None]
assert all(r[1]["_dlt_root_id"] == "###" for r in non_root)
Expand Down Expand Up @@ -553,7 +559,7 @@ def test_propagates_table_context(
# to reproduce a bug where rows with _dlt_id set were not extended
row["lvl1"][0]["_dlt_id"] = "row_id_lvl1" # type: ignore[index]

normalized_rows = list(norm._normalize_row(row, {}, ("table",)))
normalized_rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
non_root = [r for r in normalized_rows if r[0][1] is not None]
# _dlt_root_id in all non root
assert all(r[1]["_dlt_root_id"] == "###" for r in non_root)
Expand Down Expand Up @@ -585,7 +591,7 @@ def test_propagates_table_context_to_lists(norm: RelationalNormalizer) -> None:
prop_config["root"][TColumnName("timestamp")] = TColumnName("_partition_ts")

row = {"_dlt_id": "###", "timestamp": 12918291.1212, "lvl1": [1, 2, 3, [4, 5, 6]]}
normalized_rows = list(norm._normalize_row(row, {}, ("table",)))
normalized_rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
# _partition_ts == timestamp on all child tables
non_root = [r for r in normalized_rows if r[0][1] is not None]
assert all(r[1]["_partition_ts"] == 12918291.1212 for r in non_root)
Expand All @@ -598,7 +604,7 @@ def test_removes_normalized_list(norm: RelationalNormalizer) -> None:
# after normalizing the list that got normalized into child table must be deleted
row = {"comp": [{"_timestamp": "a"}]}
# get iterator
normalized_rows_i = norm._normalize_row(row, {}, ("table",))
normalized_rows_i = norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True)
# yield just one item
root_row = next(normalized_rows_i)
# root_row = next(r for r in normalized_rows if r[0][1] is None)
Expand All @@ -622,7 +628,7 @@ def test_preserves_json_types_list(norm: RelationalNormalizer) -> None:
)
)
row = {"value": ["from", {"json": True}]}
normalized_rows = list(norm._normalize_row(row, {}, ("event_slot",)))
normalized_rows = list(norm._normalize_row(row, {}, ("event_slot",), _r_lvl=1000, is_root=True))
# make sure only 1 row is emitted, the list is not normalized
assert len(normalized_rows) == 1
# value is kept in root row -> market as json
Expand All @@ -631,7 +637,7 @@ def test_preserves_json_types_list(norm: RelationalNormalizer) -> None:

# same should work for a list
row = {"value": ["from", ["json", True]]} # type: ignore[list-item]
normalized_rows = list(norm._normalize_row(row, {}, ("event_slot",)))
normalized_rows = list(norm._normalize_row(row, {}, ("event_slot",), _r_lvl=1000, is_root=True))
# make sure only 1 row is emitted, the list is not normalized
assert len(normalized_rows) == 1
# value is kept in root row -> market as json
Expand Down Expand Up @@ -884,7 +890,7 @@ def test_caching_perf(norm: RelationalNormalizer) -> None:
table["x-normalizer"] = {}
start = time()
for _ in range(100000):
norm._is_nested_type(norm.schema, "test", "field", 0, 0)
norm._is_nested_type(norm.schema, "test", "field", 0)
# norm._get_table_nesting_level(norm.schema, "test")
print(f"{time() - start}")

Expand Down
Loading

0 comments on commit 62f46db

Please sign in to comment.