-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/powerbi): support modified_since, extract_dataset_schema and many more #7519
Conversation
@hsheth2 , @mohdsiddique and @jjoyce0510 . Can you all please help to take a look? |
I am going through it |
) | ||
|
||
assert len(data_platform_tables) == 1 | ||
assert data_platform_tables[0].name == "public_consumer_price_index" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should match to table.name
assert data_platform_tables[0].name == "public_consumer_price_index" | ||
assert ( | ||
data_platform_tables[0].full_name | ||
== "hive_metastore.sandbox_revenue.public_consumer_price_index" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and it should be table.full_name
) -> SchemaFieldClass: | ||
if isinstance(field, powerbi_data_classes.Column): | ||
data_type = field.dataType | ||
type_class = FIELD_TYPE_MAPPING.get(data_type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please move FIELD_TYPE_MAPPING to data_classes.py and while creating Column and Measure then set the data_type to DataHub datatype in data_resolver.py. I believe we don't need to check whether it is Measure or Column as mostly both has same attributes the difference is only in forming description
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good recommendation, i will create a datahubDataType attribute
@@ -223,12 +292,22 @@ def to_datahub_dataset( | |||
dataset_mcps: List[MetadataChangeProposalWrapper] = [] | |||
if dataset is None: | |||
return dataset_mcps | |||
if ( | |||
self.__config.extract_only_matched_endorsed_dataset is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please code it as per allow deny pattern. Check workspace_id_pattern for reference. default would be allow all
workspace.scan_result | ||
) | ||
# Fetch endorsements tag if it is enabled from configuration | ||
if self.__config.extract_endorsements_to_tags: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this got changed? powerbi source already have code to extract endorsements to tag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't change anything actually. just indentation. cause of the for loop workspace
def get_allowed_workspaces(self) -> Iterable[powerbi_data_classes.Workspace]: | ||
all_workspaces = self.powerbi_client.get_workspaces() | ||
def get_allowed_workspaces(self) -> List[powerbi_data_classes.Workspace]: | ||
if self.source_config.admin_apis_only and self.source_config.modified_since: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove admin_api_only condition. User can set admin permission to client-credential and can go for the ingestion of specific workspace. Please refer code of method _get_entity_users and write as per that code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove extra condition handling from here and add it in self.powerbi_client.get_workspaces(). the self.powerbi_client.get_workspaces() should know how to return workspace whether modified only or all
@@ -191,11 +193,16 @@ def compute_job_id(cls, platform: Optional[str]) -> JobId: | |||
|
|||
# Default name for everything else | |||
job_name_suffix = "stale_entity_removal" | |||
return JobId(f"{platform}_{job_name_suffix}" if platform else job_name_suffix) | |||
unique_suffix = f"_{unique_id}" if unique_id else "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add comment for this code change
@@ -6,7 +6,7 @@ | |||
"aspectName": "corpUserKey", | |||
"aspect": { | |||
"json": { | |||
"username": "[email protected]" | |||
"username": "urn:li:corpuser:users.[email protected]" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ownership should not break the old golden files. The code should be backward compatible.
aa0dd01
to
5d67541
Compare
@mohdsiddique hihi, please help me to review again. I have changed accordingly to your comments. thanks! |
5d67541
to
7d93e3d
Compare
@@ -47,6 +48,9 @@ class SupportedDataPlatform(Enum): | |||
MS_SQL = DataPlatformPair( | |||
powerbi_data_platform_name="Sql", datahub_data_platform_name="mssql" | |||
) | |||
DATABRICK_SQL = DataPlatformPair( | |||
powerbi_data_platform_name="Databrick", datahub_data_platform_name="databrick" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't found databrick
as a platform in DataHub. This mapping is used for upstream lineage. Could you please map it to proper DataHub platform, check this documentation: https://datahubproject.io/docs/generated/ingestion/sources/databricks.
As per your data-access function Databricks.Catalogs
you might need to map it to unity-catalog
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated to Databricks and databricks
field: Union[powerbi_data_classes.Column, powerbi_data_classes.Measure], | ||
) -> SchemaFieldClass: | ||
data_type = field.dataType | ||
if getattr(field, "expression", None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could do isinstance
check here instead of attribute check. for example isinstance of Column then determine the description from expression and description fields
return builder.make_user_urn(user.split("@")[0]) | ||
return builder.make_user_urn(f"users.{user}") | ||
|
||
def get_dataset_table_schema( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename this to to_datahub_schema_field
for tag in (dataset.tags or [""]) | ||
] | ||
): | ||
return dataset_mcps |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please debug level log message, like `Returning empty dataset_mcps as no dataset tag matched with extract_only_matched_endorsed_dataset"
@@ -241,9 +287,17 @@ def to_datahub_dataset( | |||
|
|||
logger.debug(f"{Constant.Dataset_URN}={ds_urn}") | |||
# Create datasetProperties mcp | |||
custom_properties = {} | |||
if table.expression: | |||
custom_properties["expression"] = table.expression |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add "expression" in constant
@@ -260,7 +314,63 @@ def to_datahub_dataset( | |||
aspect_name=Constant.STATUS, | |||
aspect=StatusClass(removed=False), | |||
) | |||
dataset_mcps.extend([info_mcp, status_mcp]) | |||
if self.__config.extract_dataset_schema: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets have a separate function for this logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please create a function extract_dataset_schema
similar to self.extract_lineage
and move schema generation logic into extract_dataset_schema
@@ -183,6 +186,11 @@ def fill_tags() -> None: | |||
return reports | |||
|
|||
def get_workspaces(self) -> List[Workspace]: | |||
if self.__config.modified_since: | |||
workspaces = self.get_modified_workspaces() | |||
if workspaces: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this if ?if get_modified_workspaces returns empty list then we don't need this
@@ -1,12 +1,46 @@ | |||
[ | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please check why user related MCP got added, ideally these should not get ingested.
custom-properties related changes in golden file is expected.
@@ -65,14 +65,48 @@ | |||
"runId": "powerbi-test" | |||
} | |||
}, | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, user should not get added in existing golden files, looks like your owner related changes are breaking previous configuration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aezomz looks like this one is still unresolved - I think this is the last pending item on this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing, i checked. the json is being re-ordered. I am not sure why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same aspect is still emitted before and after this PR for this test.
# As modified_workspaces is not idempotent, hence we checkpoint for each powerbi workspace | ||
# Because job_id is used as dictionary key, we have to set a new job_id | ||
# Refer to https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py#L390 | ||
self.stale_entity_removal_handler.set_job_id(workspace.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This breaks backwards compatibility for all users of powerbi stateful ingestion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok i will make some change so there is backward compatibility. Will need help to review the logic.
metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi-lexical-grammar.rule
Outdated
Show resolved
Hide resolved
7d93e3d
to
4449ee2
Compare
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
821b7ef
to
12178c2
Compare
@hsheth2 and @mohdsiddique please help to review again. thanks! |
12178c2
to
6321159
Compare
Hi @hsheth2 , can we get this reviewed again? thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please make sure existing golden files should not get updated because of new code changes. description related changes or new PowerBI dataset as container are acceptable
} | ||
|
||
def create_scan_job(self, workspace_id: str) -> str: | ||
def create_scan_job(self, workspace_ids: List[str]) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert it to single workspace_id: str
so that we can have concurrent asset extraction per workspace if needed at higher level.
For example check https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py#L1255
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function wrap the ids in a list eventually and send to PBI eitherway.
So it doesn't matter if u pass in 1 workspace_id or a List[workspace_id], in the end it will still wrap in a List and send.
Actually i recommend sending in a list of workspace to the scan api. Since PBI accept a list, they do it quite quickly too.
But if u want to think async.
Lets make batch_size configurable then.
@@ -260,7 +314,63 @@ def to_datahub_dataset( | |||
aspect_name=Constant.STATUS, | |||
aspect=StatusClass(removed=False), | |||
) | |||
dataset_mcps.extend([info_mcp, status_mcp]) | |||
if self.__config.extract_dataset_schema: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please create a function extract_dataset_schema
similar to self.extract_lineage
and move schema generation logic into extract_dataset_schema
self.reporter, auto_status_aspect(self.get_workunits_internal()) | ||
), | ||
) | ||
if self.source_config.modified_since: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason to skip auto_stale_entity_removal
for workspaces return in modified since flow ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -1,34 +1,4 @@ | |||
[ | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please make sure existing golden file should not get updated because of your code changes ? description related changes or new PowerBI dataset as container are acceptable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap! checking through. actually corpuserkey is just having inconsistent json ordering.
I have ensured that the new feature will have no impact on existing user as much as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
description and custom properties update are new. So are subtypes. The rest remain the same. U can open in full view and check.
self.stale_entity_removal_handler | ||
) | ||
|
||
yield from auto_stale_entity_removal( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason to skip auto_stale_entity_removal for workspaces return in modified since flow ?
modified_since will only work in this way. where we create checkpoint state for each workspace id
6321159
to
b4366d7
Compare
@hsheth2 and @mohdsiddique please help to review again. I hope we can close this soon as the default recipe setting is already backward compatible. |
@@ -472,13 +638,20 @@ | |||
} | |||
}, | |||
{ | |||
"entityType": "corpuser", | |||
"entityUrn": "urn:li:corpuser:[email protected]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mohdsiddique , same as before . The corpuser is created here. Just that the order of the JSON changed.
Codecov ReportPatch coverage:
📣 This organization is not using Codecov’s GitHub App Integration. We recommend you install it so Codecov can continue to function properly for your repositories. Learn more Additional details and impacted files@@ Coverage Diff @@
## master #7519 +/- ##
==========================================
- Coverage 74.90% 67.27% -7.63%
==========================================
Files 353 353
Lines 35395 35586 +191
==========================================
- Hits 26511 23940 -2571
- Misses 8884 11646 +2762
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 73 files with indirect coverage changes Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report in Codecov by Sentry. |
@hsheth2 can u help to review again? we should close this soon. Can u help to resolve the conflict if all is good? |
828fb8d
to
8b8fc39
Compare
@mohdsiddique
I have added view properties, and view subtype to enable the view definition. I have fixed the user mcps ordering issues. The golden json ordering will be similar to the OSS now for corpuser. @hsheth2 please review and lets close this. thanks |
Hi @aezomz |
Hi @mohdsiddique , i think its already done in this PR which is approved by Joyce. |
Once again, please close this soon. thanks |
@aezomz I'm going to merge this in tonight, assuming CI turns green One thing I'd like to call out - the changes to stateful ingestion in this PR are definitely experimental, and I can't guarantee that we won't be making changes to stateful ingestion in the future that may mess with the implementation here. |
…and many more (#7519) Co-authored-by: Harshal Sheth <[email protected]>
Checklist
I think the main improvement will be modified_since, which leave a interesting problem for stateful.
I tried my best to solve this in the most elegant way as compared to my previous PR which got rejected.
As much as I am catering for backward compatibility, those extra config are entirely optional and will not impact current users. :)