Skip to content

Commit

Permalink
feat(ingestion): powerbi # Amazon Redshift lineage support (datahub-p…
Browse files Browse the repository at this point in the history
…roject#7562)

Co-authored-by: MohdSiddiqueBagwan <[email protected]>
Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
3 people authored and shirshanka committed Mar 22, 2023
1 parent 022d980 commit e9ce79b
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 20 deletions.
3 changes: 2 additions & 1 deletion metadata-ingestion/docs/sources/powerbi/powerbi_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ PowerBI Source supports M-Query expression for below listed PowerBI Data Sources
4. Microsoft SQL Server
5. Google BigQuery

Native SQL query parsing is only supported for `Snowflake` data-source and only first table from `FROM` clause will be ingested as upstream table. Advance SQL construct like JOIN and SUB-QUERIES in `FROM` clause are not supported.
Native SQL query parsing is supported for `Snowflake` and `Amazon Redshift` data-sources and only first table from `FROM` clause will be ingested as upstream table. Advance SQL construct like JOIN and SUB-QUERIES in `FROM` clause are not supported.

For example refer below native SQL query. The table `OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_UNIT_TARGET` will be ingested as upstream table.

Expand Down Expand Up @@ -70,6 +70,7 @@ let
in
#"Added Conditional Column"
```
Use full-table-name in `from` clause. For example dev.public.category
## M-Query Pattern Supported For Lineage Extraction
Lets consider a M-Query which combine two PostgreSQL tables. Such M-Query can be written as per below patterns.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,17 @@ class SupportedDataPlatform(Enum):
MS_SQL = DataPlatformPair(
powerbi_data_platform_name="Sql", datahub_data_platform_name="mssql"
)

GOOGLE_BIGQUERY = DataPlatformPair(
powerbi_data_platform_name="GoogleBigQuery",
datahub_data_platform_name="bigquery",
)

AMAZON_REDSHIFT = DataPlatformPair(
powerbi_data_platform_name="AmazonRedshift",
datahub_data_platform_name="redshift",
)


class AbstractTableFullNameCreator(ABC):
@abstractmethod
Expand All @@ -64,6 +70,21 @@ def get_full_table_names(
def get_platform_pair(self) -> DataPlatformPair:
pass

@staticmethod
def get_db_name_from_second_argument(arg_list: Tree) -> Optional[str]:
arguments: List[str] = tree_function.strip_char_from_list(
values=tree_function.remove_whitespaces_from_list(
tree_function.token_values(arg_list)
),
char='"',
)

if len(arguments) < 2:
logger.debug(f"Expected minimum 2 arguments, but got {len(arguments)}")
return None

return arguments[1]


class AbstractDataAccessMQueryResolver(ABC):
table: Table
Expand Down Expand Up @@ -395,20 +416,15 @@ def two_level_access_pattern(
full_table_names: List[str] = []

logger.debug(
f"Processing PostgreSQL data-access function detail {data_access_func_detail}"
)
arguments: List[str] = tree_function.strip_char_from_list(
values=tree_function.remove_whitespaces_from_list(
tree_function.token_values(data_access_func_detail.arg_list)
),
char='"',
f"Processing {self.get_platform_pair().powerbi_data_platform_name} function detail {data_access_func_detail}"
)

if len(arguments) != 2:
logger.debug(f"Expected 2 arguments, but got {len(arguments)}")
return full_table_names

db_name: str = arguments[1]
db_name: Optional[str] = self.get_db_name_from_second_argument(
data_access_func_detail.arg_list
)
if db_name is None:
logger.debug("db_name not found in expression")
return full_table_names # Return empty list

schema_name: str = cast(
IdentifierAccessor, data_access_func_detail.identifier_accessor
Expand Down Expand Up @@ -563,9 +579,55 @@ def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.GOOGLE_BIGQUERY.value


class AmazonRedshiftFullNameCreator(AbstractTableFullNameCreator):
def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.AMAZON_REDSHIFT.value

def get_full_table_names(
self, data_access_func_detail: DataAccessFunctionDetail
) -> List[str]:
full_table_names: List[str] = []

logger.debug(
f"Processing AmazonRedshift data-access function detail {data_access_func_detail}"
)

db_name: Optional[str] = self.get_db_name_from_second_argument(
data_access_func_detail.arg_list
)
if db_name is None:
return full_table_names # Return empty list

schema_name: str = cast(
IdentifierAccessor, data_access_func_detail.identifier_accessor
).items["Name"]

table_name: str = cast(
IdentifierAccessor,
cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next,
).items["Name"]

full_table_names.append(f"{db_name}.{schema_name}.{table_name}")

return full_table_names


class NativeQueryTableFullNameCreator(AbstractTableFullNameCreator):
SUPPORTED_NATIVE_QUERY_DATA_PLATFORM: dict = {
SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name: SupportedDataPlatform.SNOWFLAKE,
SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name: SupportedDataPlatform.AMAZON_REDSHIFT,
}
current_data_platform: SupportedDataPlatform = SupportedDataPlatform.SNOWFLAKE

def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.SNOWFLAKE.value
return self.current_data_platform.value

@staticmethod
def is_native_parsing_supported(data_access_function_name: str) -> bool:
return (
data_access_function_name
in NativeQueryTableFullNameCreator.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM
)

def get_full_table_names(
self, data_access_func_detail: DataAccessFunctionDetail
Expand All @@ -586,16 +648,18 @@ def get_full_table_names(
data_access_tokens: List[str] = tree_function.remove_whitespaces_from_list(
tree_function.token_values(flat_argument_list[0])
)
if (
data_access_tokens[0]
!= SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name
):
if not self.is_native_parsing_supported(data_access_tokens[0]):
logger.debug(
f"Unsupported native-query data-platform = {data_access_tokens[0]}"
)
logger.debug(
f"Provided native-query data-platform = {data_access_tokens[0]}"
f"NativeQuery is supported only for {self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM}"
)
logger.debug("Only Snowflake is supported in NativeQuery")
return full_table_names

self.current_data_platform = self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM[
data_access_tokens[0]
]
# First argument is the query
sql_query: str = tree_function.strip_char_from_list(
values=tree_function.remove_whitespaces_from_list(
Expand Down Expand Up @@ -625,6 +689,7 @@ class FunctionName(Enum):
SNOWFLAKE_DATA_ACCESS = "Snowflake.Databases"
MSSQL_DATA_ACCESS = "Sql.Database"
GOOGLE_BIGQUERY_DATA_ACCESS = "GoogleBigQuery.Database"
AMAZON_REDSHIFT_DATA_ACCESS = "AmazonRedshift.Database"


class SupportedResolver(Enum):
Expand Down Expand Up @@ -652,6 +717,12 @@ class SupportedResolver(Enum):
GoogleBigQueryTableFullNameCreator,
FunctionName.GOOGLE_BIGQUERY_DATA_ACCESS,
)

AMAZON_REDSHIFT = (
AmazonRedshiftFullNameCreator,
FunctionName.AMAZON_REDSHIFT_DATA_ACCESS,
)

NATIVE_QUERY = (
NativeQueryTableFullNameCreator,
FunctionName.NATIVE_QUERY,
Expand Down
42 changes: 42 additions & 0 deletions metadata-ingestion/tests/integration/powerbi/test_m_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
'let \nSource = GoogleBigQuery.Database([BillingProject = #"Parameter - Source"]),\n#"gcp-project" = Source{[Name=#"Parameter - Source"]}[Data],\ngcp_billing_Schema = #"gcp-project"{[Name=#"My bq project",Kind="Schema"]}[Data],\nF_GCP_COST_Table = gcp_billing_Schema{[Name="GCP_TABLE",Kind="Table"]}[Data]\nin\nF_GCP_COST_Table',
'let\n Source = GoogleBigQuery.Database([BillingProject = #"Parameter - Source"]),\n#"gcp-project" = Source{[Name=#"Parameter - Source"]}[Data],\nuniversal_Schema = #"gcp-project"{[Name="universal",Kind="Schema"]}[Data],\nD_WH_DATE_Table = universal_Schema{[Name="D_WH_DATE",Kind="Table"]}[Data],\n#"Filtered Rows" = Table.SelectRows(D_WH_DATE_Table, each [D_DATE] > #datetime(2019, 9, 10, 0, 0, 0)),\n#"Filtered Rows1" = Table.SelectRows(#"Filtered Rows", each DateTime.IsInPreviousNHours([D_DATE], 87600))\n in \n#"Filtered Rows1"',
'let\n Source = GoogleBigQuery.Database([BillingProject="dwh-prod"]),\ngcp_project = Source{[Name="dwh-prod"]}[Data],\ngcp_billing_Schema = gcp_project {[Name="gcp_billing",Kind="Schema"]}[Data],\nD_GCP_CUSTOM_LABEL_Table = gcp_billing_Schema{[Name="D_GCP_CUSTOM_LABEL",Kind="Table"]}[Data] \n in \n D_GCP_CUSTOM_LABEL_Table',
'let\n Source = AmazonRedshift.Database("redshift-url","dev"),\n public = Source{[Name="public"]}[Data],\n category1 = public{[Name="category"]}[Data]\nin\n category1',
'let\n Source = Value.NativeQuery(AmazonRedshift.Database("redshift-url","dev"), "select * from dev.public.category", null, [EnableFolding=true]) \n in Source',
]


Expand Down Expand Up @@ -500,3 +502,43 @@ def test_expression_is_none():
)

assert len(data_platform_tables) == 0


def test_redshift_regular_case():
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
expression=M_QUERIES[21],
name="category",
full_name="dev.public.category",
)
reporter = PowerBiDashboardSourceReport()

data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, reporter, native_query_enabled=False
)
assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == table.full_name.split(".")[2]
assert data_platform_tables[0].full_name == table.full_name
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name
)


def test_redshift_native_query():
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
expression=M_QUERIES[22],
name="category",
full_name="dev.public.category",
)
reporter = PowerBiDashboardSourceReport()

data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, reporter, native_query_enabled=True
)
assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == table.full_name.split(".")[2]
assert data_platform_tables[0].full_name == table.full_name
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name
)

0 comments on commit e9ce79b

Please sign in to comment.