Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): powerbi # set dataset_type_mapping to all supported data platform #7598

Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions metadata-ingestion/docs/sources/powerbi/powerbi_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,18 @@ source:
extract_endorsements_to_tags: false
# dataset_type_mapping is fixed mapping of Power BI datasources type to equivalent Datahub "data platform" dataset
dataset_type_mapping:
PostgreSql: postgres
Oracle: oracle
Sql: mssql
GoogleBigQuery: bigquery
PostgreSql:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole purpose is to make the user do LESS. Why would we include this in our sample recipe? Please reove the entire field from our starter recipe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This to give example for how to set the platform_instance with recipe, I will remove it in next PR when we make this field deprecated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is for reference for how to set platform_instance. We can remove this field once we deprecated it in next PR.

platform_instance: operational_instance
env: DEV
Oracle:
platform_instance: high_performance_production_unit
env: PROD
Sql:
platform_instance: reporting-db
env: QA
GoogleBigQuery:
platform_instance: sn-2
env: STAGE

sink:
# sink configs
59 changes: 54 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import logging
from dataclasses import dataclass, field as dataclass_field
from enum import Enum
from typing import Dict, List, Optional, Union

import pydantic
from pydantic import validator
from pydantic.class_validators import root_validator

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin
from datahub.ingestion.source.common.subtypes import BIAssetSubTypes
from datahub.ingestion.source.state.stale_entity_removal_handler import (
Expand Down Expand Up @@ -105,6 +106,41 @@ class Constant:
DATASET_WEB_URL = "datasetWebUrl"


@dataclass
class DataPlatformPair:
datahub_data_platform_name: str
powerbi_data_platform_name: str


@dataclass
class DataPlatformTable:
name: str
full_name: str
data_platform_pair: DataPlatformPair


class SupportedDataPlatform(Enum):
POSTGRES_SQL = DataPlatformPair(
powerbi_data_platform_name="PostgreSQL", datahub_data_platform_name="postgres"
)

ORACLE = DataPlatformPair(
powerbi_data_platform_name="Oracle", datahub_data_platform_name="oracle"
)

SNOWFLAKE = DataPlatformPair(
powerbi_data_platform_name="Snowflake", datahub_data_platform_name="snowflake"
)

MS_SQL = DataPlatformPair(
jjoyce0510 marked this conversation as resolved.
Show resolved Hide resolved
powerbi_data_platform_name="Sql", datahub_data_platform_name="mssql"
)
GOOGLE_BIGQUERY = DataPlatformPair(
powerbi_data_platform_name="GoogleBigQuery",
datahub_data_platform_name="bigquery",
)


@dataclass
class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport):
dashboards_scanned: int = 0
Expand All @@ -129,11 +165,21 @@ def report_number_of_workspaces(self, number_of_workspaces: int) -> None:
self.number_of_workspaces = number_of_workspaces


@dataclass
class PlatformDetail:
def default_for_dataset_type_mapping() -> Dict[str, str]:
dict_: dict = {}
for item in SupportedDataPlatform:
dict_[
item.value.powerbi_data_platform_name
] = item.value.datahub_data_platform_name

return dict_


class PlatformDetail(ConfigModel):
platform_instance: Optional[str] = pydantic.Field(
default=None,
description="DataHub platform instance name. It should be same as you have used in ingestion receipe of DataHub platform ingestion source of particular platform",
description="DataHub platform instance name. It should be same as you have used in ingestion recipe of "
"DataHub platform ingestion source of particular platform",
)
env: str = pydantic.Field(
default=DEFAULT_ENV,
Expand Down Expand Up @@ -171,7 +217,10 @@ class PowerBiDashboardSourceConfig(
dataset_type_mapping: Union[
Dict[str, str], Dict[str, PlatformDetail]
] = pydantic.Field(
description="Mapping of PowerBI datasource type to DataHub supported data-sources. See Quickstart Recipe for mapping"
default_factory=default_for_dataset_type_mapping,
description="Mapping of PowerBI datasource type to DataHub supported data-sources. "
"You can configured platform instance for dataset lineage. "
"See Quickstart Recipe for mapping",
)
# Azure app client identifier
client_id: str = pydantic.Field(description="Azure app client identifier")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@

from lark import Tree

from datahub.ingestion.source.powerbi.config import PowerBiDashboardSourceReport
from datahub.ingestion.source.powerbi.config import (
DataPlatformPair,
PowerBiDashboardSourceReport,
SupportedDataPlatform,
)
from datahub.ingestion.source.powerbi.m_query import native_sql_parser, tree_function
from datahub.ingestion.source.powerbi.m_query.data_classes import (
TRACE_POWERBI_MQUERY_PARSER,
Expand All @@ -18,41 +22,13 @@
logger = logging.getLogger(__name__)


@dataclass
class DataPlatformPair:
datahub_data_platform_name: str
powerbi_data_platform_name: str


@dataclass
class DataPlatformTable:
name: str
full_name: str
data_platform_pair: DataPlatformPair


class SupportedDataPlatform(Enum):
POSTGRES_SQL = DataPlatformPair(
powerbi_data_platform_name="PostgreSQL", datahub_data_platform_name="postgres"
)

ORACLE = DataPlatformPair(
powerbi_data_platform_name="Oracle", datahub_data_platform_name="oracle"
)

SNOWFLAKE = DataPlatformPair(
powerbi_data_platform_name="Snowflake", datahub_data_platform_name="snowflake"
)

MS_SQL = DataPlatformPair(
powerbi_data_platform_name="Sql", datahub_data_platform_name="mssql"
)
GOOGLE_BIGQUERY = DataPlatformPair(
powerbi_data_platform_name="GoogleBigQuery",
datahub_data_platform_name="bigquery",
)


class AbstractTableFullNameCreator(ABC):
@abstractmethod
def get_full_table_names(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,10 @@ def validate_dataset_type_mapping(self):
if key not in powerbi_data_platforms:
raise ValueError(f"PowerBI DataPlatform {key} is not supported")

logger.debug(
f"Dataset lineage would get ingested for data-platform = {self.source_config.dataset_type_mapping}"
)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
"""
Datahub Ingestion framework invoke this method
Expand Down
56 changes: 54 additions & 2 deletions metadata-ingestion/tests/integration/powerbi/test_powerbi.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import logging
import sys
from typing import Any, Dict
from typing import Any, Dict, cast
from unittest import mock

import pytest
from freezegun import freeze_time

from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.powerbi.config import (
PowerBiDashboardSourceConfig,
SupportedDataPlatform,
)
from datahub.ingestion.source.powerbi.powerbi import PowerBiDashboardSource
from tests.test_helpers import mce_helpers

FROZEN_TIME = "2022-02-03 07:00:00"
Expand Down Expand Up @@ -651,7 +656,7 @@ def test_powerbi_ingest_urn_lower_case(
},
}
)

pipeline.config
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted

pipeline.run()
pipeline.raise_from_status()
golden_file = "golden_test_lower_case_urn_ingest.json"
Expand Down Expand Up @@ -978,3 +983,50 @@ def test_workspace_container(
output_path=tmp_path / "powerbi_container_mces.json",
golden_path=f"{test_resources_dir}/{mce_out_file}",
)


@freeze_time(FROZEN_TIME)
@mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca)
@pytest.mark.integration
def test_dataset_type_mapping_should_set_to_all(
mock_msal, pytestconfig, tmp_path, mock_time, requests_mock
):
"""
Here we don't need to run the pipeline. We need to verify dataset_type_mapping is set to default dataplatform
"""
register_mock_api(request_mock=requests_mock)

new_config: dict = {**default_source_config()}

del new_config["dataset_type_mapping"]

pipeline = Pipeline.create(
{
"run_id": "powerbi-test",
"source": {
"type": "powerbi",
"config": {
**new_config,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/powerbi_lower_case_urn_mces.json",
},
},
}
)
source_config: PowerBiDashboardSourceConfig = cast(
PowerBiDashboardSource, pipeline.source
).source_config
assert source_config.dataset_type_mapping is not None

# Generate default dataset_type_mapping and compare it with source_config.dataset_type_mapping
default_dataset_type_mapping: dict = {}
for item in SupportedDataPlatform:
default_dataset_type_mapping[
item.value.powerbi_data_platform_name
] = item.value.datahub_data_platform_name

assert default_dataset_type_mapping == source_config.dataset_type_mapping