diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index 4cbbd9f6db..fa9b537f6c 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -15,7 +15,6 @@ TColumnName, TSimpleRegex, DLT_NAME_PREFIX, - TTableSchema, ) from dlt.common.schema.utils import ( column_name_validator, @@ -96,7 +95,7 @@ def _reset(self) -> None: # self.primary_keys = Dict[str, ] def _flatten( - self, table: str, dict_row: DictStrAny, parent_path: Tuple[str, ...], _r_lvl: int + self, table: str, dict_row: DictStrAny, _r_lvl: int ) -> Tuple[DictStrAny, Dict[Tuple[str, ...], Sequence[Any]]]: out_rec_row: DictStrAny = {} out_rec_list: Dict[Tuple[str, ...], Sequence[Any]] = {} @@ -116,13 +115,11 @@ def norm_row_dicts(dict_row: StrAny, __r_lvl: int, path: Tuple[str, ...] = ()) - ) # for lists and dicts we must check if type is possibly nested if isinstance(v, (dict, list)): - if not self._is_nested_type( - self.schema, table, nested_name, self.max_nesting, parent_path, __r_lvl - ): + if not self._is_nested_type(self.schema, table, nested_name, __r_lvl): # TODO: if schema contains table {table}__{nested_name} then convert v into single element list if isinstance(v, dict): # flatten the dict more - norm_row_dicts(v, __r_lvl + 1, path + (norm_k,)) + norm_row_dicts(v, __r_lvl - 1, path + (norm_k,)) else: # pass the list to out_rec_list out_rec_list[path + (schema_naming.normalize_table_identifier(k),)] = v @@ -174,9 +171,9 @@ def _add_row_id( flattened_row: DictStrAny, parent_row_id: str, pos: int, - _r_lvl: int, + is_root: bool = False, ) -> str: - if _r_lvl == 0: # root table + if is_root: # root table row_id_type = self._get_root_row_id_type(self.schema, table) if row_id_type in ("key_hash", "row_hash"): subset = None @@ -201,14 +198,14 @@ def _add_row_id( flattened_row[self.c_dlt_id] = row_id return row_id - def _get_propagated_values(self, table: str, row: DictStrAny, _r_lvl: int) -> StrAny: + def _get_propagated_values(self, table: str, row: DictStrAny, is_root: bool) -> StrAny: extend: DictStrAny = {} config = self.propagation_config if config: # mapping(k:v): propagate property with name "k" as property with name "v" in nested table mappings: Dict[TColumnName, TColumnName] = {} - if _r_lvl == 0: + if is_root: mappings.update(config.get("root") or {}) if table in (config.get("tables") or {}): mappings.update(config["tables"][table]) @@ -246,13 +243,13 @@ def _normalize_list( parent_path, parent_row_id, idx, - _r_lvl + 1, + _r_lvl - 1, ) else: # found non-dict in seq, so wrap it wrap_v = wrap_in_dict(self.c_value, v) DataItemNormalizer._extend_row(extend, wrap_v) - self._add_row_id(table, wrap_v, wrap_v, parent_row_id, idx, _r_lvl) + self._add_row_id(table, wrap_v, wrap_v, parent_row_id, idx) yield (table, self.schema.naming.shorten_fragments(*parent_path)), wrap_v def _normalize_row( @@ -264,20 +261,21 @@ def _normalize_row( parent_row_id: Optional[str] = None, pos: Optional[int] = None, _r_lvl: int = 0, + is_root: bool = False, ) -> TNormalizedRowIterator: schema = self.schema table = schema.naming.shorten_fragments(*parent_path, *ident_path) # flatten current row and extract all lists to recur into - flattened_row, lists = self._flatten(table, dict_row, parent_path, _r_lvl) + flattened_row, lists = self._flatten(table, dict_row, _r_lvl) # always extend row DataItemNormalizer._extend_row(extend, flattened_row) # infer record hash or leave existing primary key if present row_id = flattened_row.get(self.c_dlt_id, None) if not row_id: - row_id = self._add_row_id(table, dict_row, flattened_row, parent_row_id, pos, _r_lvl) + row_id = self._add_row_id(table, dict_row, flattened_row, parent_row_id, pos, is_root) # find fields to propagate to nested tables in config - extend.update(self._get_propagated_values(table, flattened_row, _r_lvl)) + extend.update(self._get_propagated_values(table, flattened_row, is_root)) # yield parent table first should_descend = yield ( @@ -295,7 +293,7 @@ def _normalize_row( list_path, parent_path + ident_path, row_id, - _r_lvl + 1, + _r_lvl - 1, ) def extend_schema(self) -> None: @@ -361,10 +359,16 @@ def normalize_data_item( row = cast(DictStrAny, item) # identify load id if loaded data must be processed after loading incrementally row[self.c_dlt_load_id] = load_id + # get table name and nesting level + root_table_name = self.schema.naming.normalize_table_identifier(table_name) + max_nesting = self._get_table_nesting_level(self.schema, root_table_name, self.max_nesting) + yield from self._normalize_row( row, {}, - (self.schema.naming.normalize_table_identifier(table_name),), + (root_table_name,), + _r_lvl=max_nesting, # we count backwards + is_root=True, ) @classmethod @@ -422,27 +426,24 @@ def _normalize_prop( validator_f=column_name_validator(schema.naming), ) + # + # Cached helper methods for all operations that are called often + # @staticmethod + @lru_cache(maxsize=None) def _get_table_nesting_level( - schema: Schema, table_name: str, parent_path: Tuple[str, ...] + schema: Schema, table_name: str, default_nesting: int = 1000 ) -> Optional[int]: """gets table nesting level, will inherit from parent if not set""" - # try go get table directly table = schema.tables.get(table_name) - max_nesting = None - - if table and (max_nesting := cast(int, table.get("x-normalizer", {}).get("max_nesting"))): + if ( + table + and (max_nesting := cast(int, table.get("x-normalizer", {}).get("max_nesting"))) + is not None + ): return max_nesting - - # if table is not found, try to get it from root path - if max_nesting is None and parent_path: - table = schema.tables.get(parent_path[0]) - - if table: - return cast(int, table.get("x-normalizer", {}).get("max_nesting")) - - return None + return default_nesting @staticmethod @lru_cache(maxsize=None) @@ -458,22 +459,15 @@ def _is_nested_type( schema: Schema, table_name: str, field_name: str, - max_nesting: int, - parent_path: Tuple[str, ...], _r_lvl: int, ) -> bool: """For those paths the nested objects should be left in place. Cache perf: max_nesting < _r_lvl: ~2x faster, full check 10x faster """ - # turn everything at the recursion level into nested type - max_table_nesting = DataItemNormalizer._get_table_nesting_level( - schema, table_name, parent_path - ) - if max_table_nesting is not None: - max_nesting = max_table_nesting - assert _r_lvl <= max_nesting - if _r_lvl == max_nesting: + # nesting level is counted backwards + # is we have traversed to or beyond the calculated nesting level, we detect a nested type + if _r_lvl <= 0: return True column: TColumnSchema = None diff --git a/tests/common/normalizers/test_json_relational.py b/tests/common/normalizers/test_json_relational.py index 2a9fe3ecf4..748259cba1 100644 --- a/tests/common/normalizers/test_json_relational.py +++ b/tests/common/normalizers/test_json_relational.py @@ -29,7 +29,7 @@ def test_flatten_fix_field_name(norm: RelationalNormalizer) -> None: "f 2": [], "f!3": {"f4": "a", "f-5": "b", "f*6": {"c": 7, "c v": 8, "c x": []}}, } - flattened_row, lists = norm._flatten("mock_table", row, (), 0) + flattened_row, lists = norm._flatten("mock_table", row, 0) assert "f_1" in flattened_row # assert "f_2" in flattened_row assert "f_3__f4" in flattened_row @@ -62,11 +62,11 @@ def test_preserve_json_value(norm: RelationalNormalizer) -> None: ) ) row_1 = {"value": 1} - flattened_row, _ = norm._flatten("with_json", row_1, (), 0) + flattened_row, _ = norm._flatten("with_json", row_1, 0) assert flattened_row["value"] == 1 row_2 = {"value": {"json": True}} - flattened_row, _ = norm._flatten("with_json", row_2, (), 0) + flattened_row, _ = norm._flatten("with_json", row_2, 0) assert flattened_row["value"] == row_2["value"] # json value is not flattened assert "value__json" not in flattened_row @@ -78,11 +78,11 @@ def test_preserve_json_value_with_hint(norm: RelationalNormalizer) -> None: norm.schema._compile_settings() row_1 = {"value": 1} - flattened_row, _ = norm._flatten("any_table", row_1, (), 0) + flattened_row, _ = norm._flatten("any_table", row_1, 0) assert flattened_row["value"] == 1 row_2 = {"value": {"json": True}} - flattened_row, _ = norm._flatten("any_table", row_2, (), 0) + flattened_row, _ = norm._flatten("any_table", row_2, 0) assert flattened_row["value"] == row_2["value"] # json value is not flattened assert "value__json" not in flattened_row @@ -884,7 +884,7 @@ def test_caching_perf(norm: RelationalNormalizer) -> None: table["x-normalizer"] = {} start = time() for _ in range(100000): - norm._is_nested_type(norm.schema, "test", "field", 0, (), 0) + norm._is_nested_type(norm.schema, "test", "field", 0, 0) # norm._get_table_nesting_level(norm.schema, "test") print(f"{time() - start}")