From ea0295bf660c1f846f52a8efefa4a9bee07a0f13 Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Mon, 29 Jul 2024 16:01:16 +0100 Subject: [PATCH 1/3] feat(cli): Add run-id option to put sub-command --- .../src/datahub/cli/cli_utils.py | 11 ++++++++-- metadata-ingestion/src/datahub/cli/put_cli.py | 20 ++++++++++++------- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py index b0039b5f87b34..295584340c2bb 100644 --- a/metadata-ingestion/src/datahub/cli/cli_utils.py +++ b/metadata-ingestion/src/datahub/cli/cli_utils.py @@ -15,7 +15,7 @@ from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.request_helper import make_curl_command -from datahub.emitter.serialization_helper import post_json_transform +from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( MetadataChangeEvent, MetadataChangeProposal, @@ -166,10 +166,11 @@ def post_entity( aspect_value: Dict, cached_session_host: Optional[Tuple[Session, str]] = None, is_async: Optional[str] = "false", + system_metadata: Union[None, SystemMetadataClass] = None, ) -> int: endpoint: str = "/aspects/?action=ingestProposal" - proposal = { + proposal: dict[str, Any] = { "proposal": { "entityType": entity_type, "entityUrn": urn, @@ -182,6 +183,12 @@ def post_entity( }, "async": is_async, } + + if system_metadata is not None: + proposal["proposal"]["systemMetadata"] = json.dumps( + pre_json_transform(system_metadata.to_obj()) + ) + payload = json.dumps(proposal) url = gms_host + endpoint curl_command = make_curl_command(session, "POST", url, payload) diff --git a/metadata-ingestion/src/datahub/cli/put_cli.py b/metadata-ingestion/src/datahub/cli/put_cli.py index 40af54c7c7e2e..d7590157f6ff2 100644 --- a/metadata-ingestion/src/datahub/cli/put_cli.py +++ b/metadata-ingestion/src/datahub/cli/put_cli.py @@ -6,7 +6,7 @@ from datahub.cli.cli_utils import post_entity from datahub.configuration.config_loader import load_config_file -from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp import MetadataChangeProposalWrapper, SystemMetadataClass from datahub.ingestion.graph.client import get_default_graph from datahub.metadata.schema_classes import ( DataPlatformInfoClass as DataPlatformInfo, @@ -36,9 +36,10 @@ def put() -> None: @click.option("--urn", required=True, type=str) @click.option("-a", "--aspect", required=True, type=str) @click.option("-d", "--aspect-data", required=True, type=str) +@click.option("--run-id", type=str, help="Run ID into which we should log the aspect.") @upgrade.check_upgrade @telemetry.with_telemetry() -def aspect(urn: str, aspect: str, aspect_data: str) -> None: +def aspect(urn: str, aspect: str, aspect_data: str, run_id: str) -> None: """Update a single aspect of an entity""" entity_type = guess_entity_type(urn) @@ -56,6 +57,7 @@ def aspect(urn: str, aspect: str, aspect_data: str) -> None: aspect_name=aspect, entity_type=entity_type, aspect_value=aspect_obj, + system_metadata=SystemMetadataClass(runId=run_id), ) click.secho(f"Update succeeded with status {status}", fg="green") @@ -82,8 +84,11 @@ def aspect(urn: str, aspect: str, aspect_data: str) -> None: help="Logo URL that must be reachable from the DataHub UI.", required=True, ) +@click.option( + "--run-id", type=str, help="Run ID into which we should log the platform." +) def platform( - ctx: click.Context, name: str, display_name: Optional[str], logo: str + ctx: click.Context, name: str, display_name: Optional[str], logo: str, run_id: str ) -> None: """ Create or update a dataplatform entity in DataHub @@ -104,11 +109,12 @@ def platform( logoUrl=logo, ) datahub_graph = get_default_graph() - datahub_graph.emit( - MetadataChangeProposalWrapper( - entityUrn=str(platform_urn), aspect=data_platform_info - ) + mcp = MetadataChangeProposalWrapper( + entityUrn=str(platform_urn), + aspect=data_platform_info, + systemMetadata=SystemMetadataClass(runId=run_id), ) + datahub_graph.emit(mcp) click.echo( f"✅ Successfully wrote data platform metadata for {platform_urn} to DataHub ({datahub_graph})" ) From aa141f6a48b21d82cb5ffea71f758fadfab0470b Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Mon, 29 Jul 2024 16:19:07 +0100 Subject: [PATCH 2/3] fix lint --- metadata-ingestion/src/datahub/cli/cli_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py index 295584340c2bb..6fe6aa9ce7543 100644 --- a/metadata-ingestion/src/datahub/cli/cli_utils.py +++ b/metadata-ingestion/src/datahub/cli/cli_utils.py @@ -170,7 +170,7 @@ def post_entity( ) -> int: endpoint: str = "/aspects/?action=ingestProposal" - proposal: dict[str, Any] = { + proposal: Dict[str, Any] = { "proposal": { "entityType": entity_type, "entityUrn": urn, From f5aecd6349c124f0679fec46741b2211335e14e7 Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Tue, 30 Jul 2024 18:55:22 +0100 Subject: [PATCH 3/3] fix parameter --- metadata-ingestion/src/datahub/cli/put_cli.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/cli/put_cli.py b/metadata-ingestion/src/datahub/cli/put_cli.py index d7590157f6ff2..989b1a6d02fd0 100644 --- a/metadata-ingestion/src/datahub/cli/put_cli.py +++ b/metadata-ingestion/src/datahub/cli/put_cli.py @@ -1,5 +1,5 @@ import logging -from typing import Optional +from typing import Optional, Union import click from click_default_group import DefaultGroup @@ -36,10 +36,15 @@ def put() -> None: @click.option("--urn", required=True, type=str) @click.option("-a", "--aspect", required=True, type=str) @click.option("-d", "--aspect-data", required=True, type=str) -@click.option("--run-id", type=str, help="Run ID into which we should log the aspect.") +@click.option( + "--run-id", + type=str, + required=False, + help="Run ID into which we should log the aspect.", +) @upgrade.check_upgrade @telemetry.with_telemetry() -def aspect(urn: str, aspect: str, aspect_data: str, run_id: str) -> None: +def aspect(urn: str, aspect: str, aspect_data: str, run_id: Optional[str]) -> None: """Update a single aspect of an entity""" entity_type = guess_entity_type(urn) @@ -49,6 +54,10 @@ def aspect(urn: str, aspect: str, aspect_data: str, run_id: str) -> None: client = get_default_graph() + system_metadata: Union[None, SystemMetadataClass] = None + if run_id: + system_metadata = SystemMetadataClass(runId=run_id) + # TODO: Replace with client.emit, requires figuring out the correct subsclass of _Aspect to create from the data status = post_entity( client._session, @@ -57,7 +66,7 @@ def aspect(urn: str, aspect: str, aspect_data: str, run_id: str) -> None: aspect_name=aspect, entity_type=entity_type, aspect_value=aspect_obj, - system_metadata=SystemMetadataClass(runId=run_id), + system_metadata=system_metadata, ) click.secho(f"Update succeeded with status {status}", fg="green")