From 8e66881c7f6bdfacced13b87eab33da945e2154e Mon Sep 17 00:00:00 2001 From: treff7es Date: Thu, 25 Jul 2024 10:35:13 +0200 Subject: [PATCH 1/3] Fixing container creation when there is no folder in path --- .../data_lake_common/data_lake_utils.py | 35 ++++---- .../tests/unit/s3/test_s3_source.py | 85 +++++++++++++++++++ 2 files changed, 103 insertions(+), 17 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py index f594c61f4e5ed..35e40ec2d46cb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py @@ -160,23 +160,24 @@ def create_container_hierarchy( ) return - for folder in parent_folder_path.split("/"): - abs_path = folder - if parent_key: - prefix: str = "" - if isinstance(parent_key, BucketKey): - prefix = parent_key.bucket_name - elif isinstance(parent_key, FolderKey): - prefix = parent_key.folder_abs_path - abs_path = prefix + "/" + folder - folder_key = self.gen_folder_key(abs_path) - yield from self.create_emit_containers( - container_key=folder_key, - name=folder, - sub_types=[DatasetContainerSubTypes.FOLDER], - parent_container_key=parent_key, - ) - parent_key = folder_key + if parent_folder_path: + for folder in parent_folder_path.split("/"): + abs_path = folder + if parent_key: + prefix: str = "" + if isinstance(parent_key, BucketKey): + prefix = parent_key.bucket_name + elif isinstance(parent_key, FolderKey): + prefix = parent_key.folder_abs_path + abs_path = prefix + "/" + folder + folder_key = self.gen_folder_key(abs_path) + yield from self.create_emit_containers( + container_key=folder_key, + name=folder, + sub_types=[DatasetContainerSubTypes.FOLDER], + parent_container_key=parent_key, + ) + parent_key = folder_key assert parent_key is not None yield from add_dataset_to_container(parent_key, dataset_urn) diff --git a/metadata-ingestion/tests/unit/s3/test_s3_source.py b/metadata-ingestion/tests/unit/s3/test_s3_source.py index eeee037965efc..d624544a0648c 100644 --- a/metadata-ingestion/tests/unit/s3/test_s3_source.py +++ b/metadata-ingestion/tests/unit/s3/test_s3_source.py @@ -1,5 +1,11 @@ +from typing import List + +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator from datahub.ingestion.source.data_lake_common.path_spec import PathSpec from datahub.ingestion.source.s3.source import partitioned_folder_comparator +from datahub.metadata._schema_classes import MetadataChangeEventClass def test_partition_comparator_numeric_folder_name(): @@ -91,3 +97,82 @@ def test_path_spec_dir_allowed(): path = "s3://my-bucket/my-folder/year=2022/month=10/day=10/" assert path_spec.dir_allowed(path) is False, f"{path} should be denied" + + +def test_container_generation_without_folders(): + cwu = ContainerWUCreator("s3", None, "PROD") + mcps = cwu.create_container_hierarchy( + "s3://my-bucket/my-file.json.gz", "urn:li:dataset:123" + ) + + def container_properties_filter(x: MetadataWorkUnit) -> bool: + assert isinstance(x.metadata, MetadataChangeProposalWrapper) + return x.metadata.aspectName == "containerProperties" + + container_properties:List = list( + filter(container_properties_filter, mcps) + ) + assert len(container_properties) == 1 + assert container_properties[0].metadata.aspect.customProperties == { + "bucket_name": "my-bucket", + "env": "PROD", + "platform": "s3", + } + + +def test_container_generation_with_folder(): + cwu = ContainerWUCreator("s3", None, "PROD") + mcps = cwu.create_container_hierarchy( + "s3://my-bucket/my-dir/my-file.json.gz", "urn:li:dataset:123" + ) + + def container_properties_filter(x: MetadataWorkUnit) -> bool: + assert isinstance(x.metadata, MetadataChangeProposalWrapper) + return x.metadata.aspectName == "containerProperties" + + container_properties:List = list( + filter(container_properties_filter, mcps) + ) + assert len(container_properties) == 2 + assert container_properties[0].metadata.aspect.customProperties == { + "bucket_name": "my-bucket", + "env": "PROD", + "platform": "s3", + } + assert container_properties[1].metadata.aspect.customProperties == { + "env": "PROD", + "folder_abs_path": "my-bucket/my-dir", + "platform": "s3", + } + + +def test_container_generation_with_multiple_folders(): + cwu = ContainerWUCreator("s3", None, "PROD") + mcps = cwu.create_container_hierarchy( + "s3://my-bucket/my-dir/my-dir2/my-file.json.gz", "urn:li:dataset:123" + ) + + def container_properties_filter(x: MetadataWorkUnit) -> bool: + assert isinstance(x.metadata, MetadataChangeProposalWrapper) + return x.metadata.aspectName == "containerProperties" + + container_properties:List = list( + filter(container_properties_filter, mcps) + ) + + assert len(container_properties) == 3 + assert container_properties[0].metadata.aspect.customProperties == { + "bucket_name": "my-bucket", + "env": "PROD", + "platform": "s3", + } + assert container_properties[1].metadata.aspect.customProperties == { + "env": "PROD", + "folder_abs_path": "my-bucket/my-dir", + "platform": "s3", + } + assert container_properties[2].metadata.aspect.customProperties == { + "env": "PROD", + "folder_abs_path": "my-bucket/my-dir/my-dir2", + "platform": "s3", + } From bb72392c1583b43cbd5dd7c028898b45cb00740c Mon Sep 17 00:00:00 2001 From: treff7es Date: Thu, 25 Jul 2024 10:37:50 +0200 Subject: [PATCH 2/3] Fix formatting --- metadata-ingestion/tests/unit/s3/test_s3_source.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/metadata-ingestion/tests/unit/s3/test_s3_source.py b/metadata-ingestion/tests/unit/s3/test_s3_source.py index d624544a0648c..9ccf23c6008d4 100644 --- a/metadata-ingestion/tests/unit/s3/test_s3_source.py +++ b/metadata-ingestion/tests/unit/s3/test_s3_source.py @@ -109,9 +109,7 @@ def container_properties_filter(x: MetadataWorkUnit) -> bool: assert isinstance(x.metadata, MetadataChangeProposalWrapper) return x.metadata.aspectName == "containerProperties" - container_properties:List = list( - filter(container_properties_filter, mcps) - ) + container_properties: List = list(filter(container_properties_filter, mcps)) assert len(container_properties) == 1 assert container_properties[0].metadata.aspect.customProperties == { "bucket_name": "my-bucket", @@ -130,9 +128,7 @@ def container_properties_filter(x: MetadataWorkUnit) -> bool: assert isinstance(x.metadata, MetadataChangeProposalWrapper) return x.metadata.aspectName == "containerProperties" - container_properties:List = list( - filter(container_properties_filter, mcps) - ) + container_properties: List = list(filter(container_properties_filter, mcps)) assert len(container_properties) == 2 assert container_properties[0].metadata.aspect.customProperties == { "bucket_name": "my-bucket", @@ -156,10 +152,8 @@ def container_properties_filter(x: MetadataWorkUnit) -> bool: assert isinstance(x.metadata, MetadataChangeProposalWrapper) return x.metadata.aspectName == "containerProperties" - container_properties:List = list( - filter(container_properties_filter, mcps) - ) - + container_properties: List = list(filter(container_properties_filter, mcps)) + assert len(container_properties) == 3 assert container_properties[0].metadata.aspect.customProperties == { "bucket_name": "my-bucket", From 04d993819af83b619e541ee45a0f2c5ee42b3d45 Mon Sep 17 00:00:00 2001 From: treff7es Date: Thu, 25 Jul 2024 10:42:30 +0200 Subject: [PATCH 3/3] Remove unused import --- metadata-ingestion/tests/unit/s3/test_s3_source.py | 1 - 1 file changed, 1 deletion(-) diff --git a/metadata-ingestion/tests/unit/s3/test_s3_source.py b/metadata-ingestion/tests/unit/s3/test_s3_source.py index 9ccf23c6008d4..2eb386e39b0e5 100644 --- a/metadata-ingestion/tests/unit/s3/test_s3_source.py +++ b/metadata-ingestion/tests/unit/s3/test_s3_source.py @@ -5,7 +5,6 @@ from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator from datahub.ingestion.source.data_lake_common.path_spec import PathSpec from datahub.ingestion.source.s3.source import partitioned_folder_comparator -from datahub.metadata._schema_classes import MetadataChangeEventClass def test_partition_comparator_numeric_folder_name():