Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix resource level max_table_nesting and normalizer performance tuning #2026

Merged
merged 6 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 66 additions & 34 deletions dlt/common/normalizers/json/relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
TColumnName,
TSimpleRegex,
DLT_NAME_PREFIX,
TTableSchema,
)
from dlt.common.schema.utils import (
column_name_validator,
Expand Down Expand Up @@ -100,32 +99,31 @@ def _flatten(
) -> Tuple[DictStrAny, Dict[Tuple[str, ...], Sequence[Any]]]:
out_rec_row: DictStrAny = {}
out_rec_list: Dict[Tuple[str, ...], Sequence[Any]] = {}
schema_naming = self.schema.naming

def norm_row_dicts(dict_row: StrAny, __r_lvl: int, path: Tuple[str, ...] = ()) -> None:
for k, v in dict_row.items():
if k.strip():
norm_k = schema_naming.normalize_identifier(k)
norm_k = self._normalize_identifier(self.schema, k)
else:
# for empty keys in the data use _
norm_k = self.EMPTY_KEY_IDENTIFIER
# if norm_k != k:
# print(f"{k} -> {norm_k}")
nested_name = (
norm_k if path == () else schema_naming.shorten_fragments(*path, norm_k)
norm_k if path == () else self._shorten_fragments(self.schema, *path, norm_k)
)
# for lists and dicts we must check if type is possibly nested
if isinstance(v, (dict, list)):
if not self._is_nested_type(
self.schema, table, nested_name, self.max_nesting, __r_lvl
):
if not self._is_nested_type(self.schema, table, nested_name, __r_lvl):
# TODO: if schema contains table {table}__{nested_name} then convert v into single element list
if isinstance(v, dict):
# flatten the dict more
norm_row_dicts(v, __r_lvl + 1, path + (norm_k,))
norm_row_dicts(v, __r_lvl - 1, path + (norm_k,))
else:
# pass the list to out_rec_list
out_rec_list[path + (schema_naming.normalize_table_identifier(k),)] = v
out_rec_list[
path + (self._normalize_table_identifier(self.schema, k),)
] = v
continue
else:
# pass the nested value to out_rec_row
Expand Down Expand Up @@ -174,9 +172,9 @@ def _add_row_id(
flattened_row: DictStrAny,
parent_row_id: str,
pos: int,
_r_lvl: int,
is_root: bool = False,
) -> str:
if _r_lvl == 0: # root table
if is_root: # root table
row_id_type = self._get_root_row_id_type(self.schema, table)
if row_id_type in ("key_hash", "row_hash"):
subset = None
Expand All @@ -201,14 +199,14 @@ def _add_row_id(
flattened_row[self.c_dlt_id] = row_id
return row_id

def _get_propagated_values(self, table: str, row: DictStrAny, _r_lvl: int) -> StrAny:
def _get_propagated_values(self, table: str, row: DictStrAny, is_root: bool) -> StrAny:
extend: DictStrAny = {}

config = self.propagation_config
if config:
# mapping(k:v): propagate property with name "k" as property with name "v" in nested table
mappings: Dict[TColumnName, TColumnName] = {}
if _r_lvl == 0:
if is_root:
mappings.update(config.get("root") or {})
if table in (config.get("tables") or {}):
mappings.update(config["tables"][table])
Expand All @@ -229,7 +227,7 @@ def _normalize_list(
parent_row_id: Optional[str] = None,
_r_lvl: int = 0,
) -> TNormalizedRowIterator:
table = self.schema.naming.shorten_fragments(*parent_path, *ident_path)
table = self._shorten_fragments(self.schema, *parent_path, *ident_path)

for idx, v in enumerate(seq):
if isinstance(v, dict):
Expand All @@ -246,14 +244,14 @@ def _normalize_list(
parent_path,
parent_row_id,
idx,
_r_lvl + 1,
_r_lvl - 1,
)
else:
# found non-dict in seq, so wrap it
wrap_v = wrap_in_dict(self.c_value, v)
DataItemNormalizer._extend_row(extend, wrap_v)
self._add_row_id(table, wrap_v, wrap_v, parent_row_id, idx, _r_lvl)
yield (table, self.schema.naming.shorten_fragments(*parent_path)), wrap_v
self._add_row_id(table, wrap_v, wrap_v, parent_row_id, idx)
yield (table, self._shorten_fragments(self.schema, *parent_path)), wrap_v

def _normalize_row(
self,
Expand All @@ -264,24 +262,25 @@ def _normalize_row(
parent_row_id: Optional[str] = None,
pos: Optional[int] = None,
_r_lvl: int = 0,
is_root: bool = False,
) -> TNormalizedRowIterator:
schema = self.schema
table = schema.naming.shorten_fragments(*parent_path, *ident_path)
table = self._shorten_fragments(schema, *parent_path, *ident_path)
# flatten current row and extract all lists to recur into
flattened_row, lists = self._flatten(table, dict_row, _r_lvl)
# always extend row
DataItemNormalizer._extend_row(extend, flattened_row)
# infer record hash or leave existing primary key if present
row_id = flattened_row.get(self.c_dlt_id, None)
if not row_id:
row_id = self._add_row_id(table, dict_row, flattened_row, parent_row_id, pos, _r_lvl)
row_id = self._add_row_id(table, dict_row, flattened_row, parent_row_id, pos, is_root)

# find fields to propagate to nested tables in config
extend.update(self._get_propagated_values(table, flattened_row, _r_lvl))
extend.update(self._get_propagated_values(table, flattened_row, is_root))

# yield parent table first
should_descend = yield (
(table, schema.naming.shorten_fragments(*parent_path)),
(table, self._shorten_fragments(schema, *parent_path)),
flattened_row,
)
if should_descend is False:
Expand All @@ -295,7 +294,7 @@ def _normalize_row(
list_path,
parent_path + ident_path,
row_id,
_r_lvl + 1,
_r_lvl - 1,
)

def extend_schema(self) -> None:
Expand Down Expand Up @@ -361,10 +360,16 @@ def normalize_data_item(
row = cast(DictStrAny, item)
# identify load id if loaded data must be processed after loading incrementally
row[self.c_dlt_load_id] = load_id
# get table name and nesting level
root_table_name = self._normalize_table_identifier(self.schema, table_name)
max_nesting = self._get_table_nesting_level(self.schema, root_table_name, self.max_nesting)

yield from self._normalize_row(
row,
{},
(self.schema.naming.normalize_table_identifier(table_name),),
(root_table_name,),
_r_lvl=max_nesting, # we count backwards
is_root=True,
)

@classmethod
Expand Down Expand Up @@ -422,12 +427,39 @@ def _normalize_prop(
validator_f=column_name_validator(schema.naming),
)

#
# Cached helper methods for all operations that are called often
#
@staticmethod
@lru_cache(maxsize=None)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this adds a considerable speed boost. we could also consider adding caching support on the naming and not here so that all normalizers and other places can benefit, I'm not quite sure if there are other places where this gets called as often as here though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is interesting because this functions is already cached. are you using snake_case? convention? please look at the underlying code again

def _shorten_fragments(schema: Schema, *idents: str) -> str:
return schema.naming.shorten_fragments(*idents)

@staticmethod
@lru_cache(maxsize=None)
def _normalize_table_identifier(schema: Schema, table_name: str) -> str:
return schema.naming.normalize_table_identifier(table_name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also cached already


@staticmethod
def _get_table_nesting_level(schema: Schema, table_name: str) -> Optional[int]:
@lru_cache(maxsize=None)
def _normalize_identifier(schema: Schema, identifier: str) -> str:
return schema.naming.normalize_path(identifier)

@staticmethod
@lru_cache(maxsize=None)
def _get_table_nesting_level(
schema: Schema, table_name: str, default_nesting: int = 1000
) -> Optional[int]:
"""gets table nesting level, will inherit from parent if not set"""

table = schema.tables.get(table_name)
if table:
return table.get("x-normalizer", {}).get("max_nesting") # type: ignore
return None
if (
table
and (max_nesting := cast(int, table.get("x-normalizer", {}).get("max_nesting")))
is not None
):
return max_nesting
return default_nesting

@staticmethod
@lru_cache(maxsize=None)
Expand All @@ -440,18 +472,18 @@ def _get_primary_key(schema: Schema, table_name: str) -> List[str]:
@staticmethod
@lru_cache(maxsize=None)
def _is_nested_type(
schema: Schema, table_name: str, field_name: str, max_nesting: int, _r_lvl: int
schema: Schema,
table_name: str,
field_name: str,
_r_lvl: int,
) -> bool:
"""For those paths the nested objects should be left in place.
Cache perf: max_nesting < _r_lvl: ~2x faster, full check 10x faster
"""
# turn everything at the recursion level into nested type
max_table_nesting = DataItemNormalizer._get_table_nesting_level(schema, table_name)
if max_table_nesting is not None:
max_nesting = max_table_nesting

assert _r_lvl <= max_nesting
if _r_lvl == max_nesting:
# nesting level is counted backwards
# is we have traversed to or beyond the calculated nesting level, we detect a nested type
if _r_lvl <= 0:
return True

column: TColumnSchema = None
Expand Down
42 changes: 24 additions & 18 deletions tests/common/normalizers/test_json_relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_flatten_fix_field_name(norm: RelationalNormalizer) -> None:
"f 2": [],
"f!3": {"f4": "a", "f-5": "b", "f*6": {"c": 7, "c v": 8, "c x": []}},
}
flattened_row, lists = norm._flatten("mock_table", row, 0)
flattened_row, lists = norm._flatten("mock_table", row, 1000)
assert "f_1" in flattened_row
# assert "f_2" in flattened_row
assert "f_3__f4" in flattened_row
Expand Down Expand Up @@ -93,7 +93,7 @@ def test_nested_table_linking(norm: RelationalNormalizer) -> None:
# request _dlt_root_id propagation
add_dlt_root_id_propagation(norm)

rows = list(norm._normalize_row(row, {}, ("table",)))
rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
# should have 7 entries (root + level 1 + 3 * list + 2 * object)
assert len(rows) == 7
# root elem will not have a root hash if not explicitly added, "extend" is added only to child
Expand Down Expand Up @@ -144,7 +144,7 @@ def test_skip_nested_link_when_no_parent(norm: RelationalNormalizer) -> None:
table__f = new_table("table__f", parent_table_name=None)
norm.schema.update_table(table__f)

rows = list(norm._normalize_row(row, {}, ("table",)))
rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
root = next(t for t in rows if t[0][0] == "table")[1]
# record hash is random for primary keys, not based on their content
# this is a change introduced in dlt 0.2.0a30
Expand Down Expand Up @@ -174,7 +174,7 @@ def test_yields_parents_first(norm: RelationalNormalizer) -> None:
"f": [{"id": "level1", "l": ["a", "b", "c"], "v": 120, "o": [{"a": 1}, {"a": 2}]}],
"g": [{"id": "level2_g", "l": ["a"]}],
}
rows = list(norm._normalize_row(row, {}, ("table",)))
rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
tables = list(r[0][0] for r in rows)
# child tables are always yielded before parent tables
expected_tables = [
Expand Down Expand Up @@ -220,7 +220,7 @@ def test_yields_parent_relation(norm: RelationalNormalizer) -> None:
}
],
}
rows = list(norm._normalize_row(row, {}, ("table",)))
rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
# normalizer must return parent table first and move in order of the list elements when yielding child tables
# the yielding order if fully defined
expected_parents = [
Expand Down Expand Up @@ -281,7 +281,7 @@ def test_list_position(norm: RelationalNormalizer) -> None:
row: DictStrAny = {
"f": [{"l": ["a", "b", "c"], "v": 120, "lo": [{"e": "a"}, {"e": "b"}, {"e": "c"}]}]
}
rows = list(norm._normalize_row(row, {}, ("table",)))
rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
# root has no pos
root = [t for t in rows if t[0][0] == "table"][0][1]
assert "_dlt_list_idx" not in root
Expand Down Expand Up @@ -436,7 +436,7 @@ def test_child_row_deterministic_hash(norm: RelationalNormalizer) -> None:
"_dlt_id": row_id,
"f": [{"l": ["a", "b", "c"], "v": 120, "lo": [{"e": "a"}, {"e": "b"}, {"e": "c"}]}],
}
rows = list(norm._normalize_row(row, {}, ("table",)))
rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
children = [t for t in rows if t[0][0] != "table"]
# all hashes must be different
distinct_hashes = set([ch[1]["_dlt_id"] for ch in children])
Expand All @@ -455,19 +455,19 @@ def test_child_row_deterministic_hash(norm: RelationalNormalizer) -> None:
assert f_lo_p2["_dlt_id"] == digest128(f"{el_f['_dlt_id']}_table__f__lo_2", DLT_ID_LENGTH_BYTES)

# same data with same table and row_id
rows_2 = list(norm._normalize_row(row, {}, ("table",)))
rows_2 = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
children_2 = [t for t in rows_2 if t[0][0] != "table"]
# corresponding hashes must be identical
assert all(ch[0][1]["_dlt_id"] == ch[1][1]["_dlt_id"] for ch in zip(children, children_2))

# change parent table and all child hashes must be different
rows_4 = list(norm._normalize_row(row, {}, ("other_table",)))
rows_4 = list(norm._normalize_row(row, {}, ("other_table",), _r_lvl=1000, is_root=True))
children_4 = [t for t in rows_4 if t[0][0] != "other_table"]
assert all(ch[0][1]["_dlt_id"] != ch[1][1]["_dlt_id"] for ch in zip(children, children_4))

# change parent hash and all child hashes must be different
row["_dlt_id"] = uniq_id()
rows_3 = list(norm._normalize_row(row, {}, ("table",)))
rows_3 = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
children_3 = [t for t in rows_3 if t[0][0] != "table"]
assert all(ch[0][1]["_dlt_id"] != ch[1][1]["_dlt_id"] for ch in zip(children, children_3))

Expand All @@ -483,7 +483,13 @@ def test_keeps_dlt_id(norm: RelationalNormalizer) -> None:
def test_propagate_hardcoded_context(norm: RelationalNormalizer) -> None:
row = {"level": 1, "list": ["a", "b", "c"], "comp": [{"_timestamp": "a"}]}
rows = list(
norm._normalize_row(row, {"_timestamp": 1238.9, "_dist_key": "SENDER_3000"}, ("table",))
norm._normalize_row(
row,
{"_timestamp": 1238.9, "_dist_key": "SENDER_3000"},
("table",),
_r_lvl=1000,
is_root=True,
)
)
# context is not added to root element
root = next(t for t in rows if t[0][0] == "table")[1]
Expand Down Expand Up @@ -514,7 +520,7 @@ def test_propagates_root_context(norm: RelationalNormalizer) -> None:
"dependent_list": [1, 2, 3],
"dependent_objects": [{"vx": "ax"}],
}
normalized_rows = list(norm._normalize_row(row, {}, ("table",)))
normalized_rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
# all non-root rows must have:
non_root = [r for r in normalized_rows if r[0][1] is not None]
assert all(r[1]["_dlt_root_id"] == "###" for r in non_root)
Expand Down Expand Up @@ -553,7 +559,7 @@ def test_propagates_table_context(
# to reproduce a bug where rows with _dlt_id set were not extended
row["lvl1"][0]["_dlt_id"] = "row_id_lvl1" # type: ignore[index]

normalized_rows = list(norm._normalize_row(row, {}, ("table",)))
normalized_rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
non_root = [r for r in normalized_rows if r[0][1] is not None]
# _dlt_root_id in all non root
assert all(r[1]["_dlt_root_id"] == "###" for r in non_root)
Expand Down Expand Up @@ -585,7 +591,7 @@ def test_propagates_table_context_to_lists(norm: RelationalNormalizer) -> None:
prop_config["root"][TColumnName("timestamp")] = TColumnName("_partition_ts")

row = {"_dlt_id": "###", "timestamp": 12918291.1212, "lvl1": [1, 2, 3, [4, 5, 6]]}
normalized_rows = list(norm._normalize_row(row, {}, ("table",)))
normalized_rows = list(norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True))
# _partition_ts == timestamp on all child tables
non_root = [r for r in normalized_rows if r[0][1] is not None]
assert all(r[1]["_partition_ts"] == 12918291.1212 for r in non_root)
Expand All @@ -598,7 +604,7 @@ def test_removes_normalized_list(norm: RelationalNormalizer) -> None:
# after normalizing the list that got normalized into child table must be deleted
row = {"comp": [{"_timestamp": "a"}]}
# get iterator
normalized_rows_i = norm._normalize_row(row, {}, ("table",))
normalized_rows_i = norm._normalize_row(row, {}, ("table",), _r_lvl=1000, is_root=True)
# yield just one item
root_row = next(normalized_rows_i)
# root_row = next(r for r in normalized_rows if r[0][1] is None)
Expand All @@ -622,7 +628,7 @@ def test_preserves_json_types_list(norm: RelationalNormalizer) -> None:
)
)
row = {"value": ["from", {"json": True}]}
normalized_rows = list(norm._normalize_row(row, {}, ("event_slot",)))
normalized_rows = list(norm._normalize_row(row, {}, ("event_slot",), _r_lvl=1000, is_root=True))
# make sure only 1 row is emitted, the list is not normalized
assert len(normalized_rows) == 1
# value is kept in root row -> market as json
Expand All @@ -631,7 +637,7 @@ def test_preserves_json_types_list(norm: RelationalNormalizer) -> None:

# same should work for a list
row = {"value": ["from", ["json", True]]} # type: ignore[list-item]
normalized_rows = list(norm._normalize_row(row, {}, ("event_slot",)))
normalized_rows = list(norm._normalize_row(row, {}, ("event_slot",), _r_lvl=1000, is_root=True))
# make sure only 1 row is emitted, the list is not normalized
assert len(normalized_rows) == 1
# value is kept in root row -> market as json
Expand Down Expand Up @@ -884,7 +890,7 @@ def test_caching_perf(norm: RelationalNormalizer) -> None:
table["x-normalizer"] = {}
start = time()
for _ in range(100000):
norm._is_nested_type(norm.schema, "test", "field", 0, 0)
norm._is_nested_type(norm.schema, "test", "field", 0)
# norm._get_table_nesting_level(norm.schema, "test")
print(f"{time() - start}")

Expand Down
Loading
Loading