From f4c0af3cec9e8b88723de7e500d2a441a8229fbe Mon Sep 17 00:00:00 2001 From: PatrickfBraz Date: Mon, 12 Dec 2022 17:08:48 -0300 Subject: [PATCH 1/5] fix(ingest): sql lineage parser impl - makes it possible to return the name of tables and views in their original form --- .../src/datahub/utilities/sql_lineage_parser_impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py index a386a000c50cc..f25cb9615cc05 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py +++ b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py @@ -110,7 +110,7 @@ def get_tables(self) -> List[str]: logger.error("sql holder not present so cannot get tables") return result for table in self._sql_holder.source_tables: - table_normalized = re.sub(r"^.", "", str(table)) + table_normalized = re.sub(r"^.", "", table.raw_name) result.append(str(table_normalized)) # We need to revert TOKEN replacements From b3d423b9e4138ec2a68f65c361b654d165f95805 Mon Sep 17 00:00:00 2001 From: PatrickfBraz Date: Tue, 13 Dec 2022 10:04:22 -0300 Subject: [PATCH 2/5] refactor(ingest): bigquery-lineage - creates a parameter to allow tables and datasets in uppercase --- .../source/bigquery_v2/bigquery_config.py | 4 + .../ingestion/source/bigquery_v2/lineage.py | 6 +- .../datahub/utilities/bigquery_sql_parser.py | 4 +- .../utilities/sql_lineage_parser_impl.py | 15 ++-- .../src/datahub/utilities/sql_parser.py | 17 +++-- .../tests/unit/test_bigquery_sql_lineage.py | 75 +++++++++++++++++++ 6 files changed, 104 insertions(+), 17 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 94117f26ff794..f74c68b3d45c8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -82,6 +82,10 @@ class BigQueryV2Config(BigQueryConfig, LineageConfig): default=True, description="Sql parse view ddl to get lineage.", ) + lineage_sql_parser_use_raw_names: bool = Field( + default=False, + description="This parameter ignores the lowercase pattern stipulated in the SQLParser. NOTE: Ignored if lineage_use_sql_parser is False." + ) convert_urns_to_lowercase: bool = Field( default=False, diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index faf02649d5b43..e7d069d458cba 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -432,7 +432,8 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st # to ensure we only use direct objects accessed for lineage try: parser = BigQuerySQLParser( - e.query, self.config.sql_parser_use_external_process + e.query, self.config.sql_parser_use_external_process, + use_raw_names=self.config.lineage_sql_parser_use_raw_names ) referenced_objs = set( map(lambda x: x.split(".")[-1], parser.get_tables()) @@ -471,7 +472,8 @@ def parse_view_lineage( if view.view_definition: try: parser = BigQuerySQLParser( - view.view_definition, self.config.sql_parser_use_external_process + view.view_definition, self.config.sql_parser_use_external_process, + use_raw_names=self.config.lineage_sql_parser_use_raw_names ) tables = parser.get_tables() except Exception as ex: diff --git a/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py b/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py index ca23a60fab8ae..3819d45aa0a20 100644 --- a/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py +++ b/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py @@ -9,11 +9,11 @@ class BigQuerySQLParser(SQLParser): parser: SQLParser - def __init__(self, sql_query: str, use_external_process: bool = False) -> None: + def __init__(self, sql_query: str, use_external_process: bool = False, use_raw_names: bool = False) -> None: super().__init__(sql_query) self._parsed_sql_query = self.parse_sql_query(sql_query) - self.parser = SqlLineageSQLParser(self._parsed_sql_query, use_external_process) + self.parser = SqlLineageSQLParser(self._parsed_sql_query, use_external_process, use_raw_names) def parse_sql_query(self, sql_query: str) -> str: sql_query = BigQuerySQLParser._parse_bigquery_comment_sign(sql_query) diff --git a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py index f25cb9615cc05..a9c00f6c6aa8b 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py +++ b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py @@ -28,9 +28,10 @@ class SqlLineageSQLParserImpl(SQLParser): _MYVIEW_SQL_TABLE_NAME_TOKEN = "__my_view__.__sql_table_name__" _MYVIEW_LOOKER_TOKEN = "my_view.SQL_TABLE_NAME" - def __init__(self, sql_query: str) -> None: + def __init__(self, sql_query: str, use_raw_names: bool = False) -> None: super().__init__(sql_query) original_sql_query = sql_query + self._use_raw_names = use_raw_names # SqlLineageParser makes mistakes on lateral flatten queries, use the prefix if "lateral flatten" in sql_query: @@ -88,12 +89,12 @@ def __init__(self, sql_query: str) -> None: ] with unittest.mock.patch( - "sqllineage.core.handlers.source.SourceHandler.end_of_query_cleanup", - datahub.utilities.sqllineage_patch.end_of_query_cleanup_patch, + "sqllineage.core.handlers.source.SourceHandler.end_of_query_cleanup", + datahub.utilities.sqllineage_patch.end_of_query_cleanup_patch, ): with unittest.mock.patch( - "sqllineage.core.holders.SubQueryLineageHolder.add_column_lineage", - datahub.utilities.sqllineage_patch.add_column_lineage_patch, + "sqllineage.core.holders.SubQueryLineageHolder.add_column_lineage", + datahub.utilities.sqllineage_patch.add_column_lineage_patch, ): self._stmt_holders = [ LineageAnalyzer().analyze(stmt) for stmt in self._stmt @@ -110,7 +111,9 @@ def get_tables(self) -> List[str]: logger.error("sql holder not present so cannot get tables") return result for table in self._sql_holder.source_tables: - table_normalized = re.sub(r"^.", "", table.raw_name) + table_normalized = re.sub( + r"^.", "", str(table) if not self._use_raw_names else f"{table.schema.raw_name}.{table.raw_name}" + ) result.append(str(table_normalized)) # We need to revert TOKEN replacements diff --git a/metadata-ingestion/src/datahub/utilities/sql_parser.py b/metadata-ingestion/src/datahub/utilities/sql_parser.py index 82a6d5ba46b11..43ead519d2f47 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_parser.py +++ b/metadata-ingestion/src/datahub/utilities/sql_parser.py @@ -68,8 +68,8 @@ def get_columns(self) -> List[str]: def sql_lineage_parser_impl_func_wrapper( - queue: Optional[multiprocessing.Queue], - sql_query: str, + queue: Optional[multiprocessing.Queue], + sql_query: str, use_raw_names: bool = False ) -> Optional[Tuple[List[str], List[str], Any]]: """ The wrapper function that computes the tables and columns using the SqlLineageSQLParserImpl @@ -78,13 +78,14 @@ def sql_lineage_parser_impl_func_wrapper( the sqllineage module. :param queue: The shared IPC queue on to which the results will be put. :param sql_query: The SQL query to extract the tables & columns from. + :param use_raw_names: Parameter used to ignore sqllineage's default lowercasing. :return: None. """ exception_details: Optional[Tuple[Optional[Type[BaseException]], str]] = None tables: List[str] = [] columns: List[str] = [] try: - parser = SqlLineageSQLParserImpl(sql_query) + parser = SqlLineageSQLParserImpl(sql_query, use_raw_names) tables = parser.get_tables() columns = parser.get_columns() except BaseException: @@ -101,14 +102,15 @@ def sql_lineage_parser_impl_func_wrapper( class SqlLineageSQLParser(SQLParser): - def __init__(self, sql_query: str, use_external_process: bool = True) -> None: + def __init__(self, sql_query: str, use_external_process: bool = True, use_raw_names: bool = False) -> None: super().__init__(sql_query, use_external_process) if use_external_process: self.tables, self.columns = self._get_tables_columns_process_wrapped( - sql_query + sql_query, + use_raw_names ) else: - return_tuple = sql_lineage_parser_impl_func_wrapper(None, sql_query) + return_tuple = sql_lineage_parser_impl_func_wrapper(None, sql_query, use_raw_names) if return_tuple is not None: ( self.tables, @@ -118,7 +120,7 @@ def __init__(self, sql_query: str, use_external_process: bool = True) -> None: @staticmethod def _get_tables_columns_process_wrapped( - sql_query: str, + sql_query: str, use_raw_names: bool = False ) -> Tuple[List[str], List[str]]: # Invoke sql_lineage_parser_impl_func_wrapper in a separate process to avoid # memory leaks from sqllineage module used by SqlLineageSQLParserImpl. This will help @@ -130,6 +132,7 @@ def _get_tables_columns_process_wrapped( args=( queue, sql_query, + use_raw_names ), ) process.start() diff --git a/metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py b/metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py index c2c6d6bd7c868..93f9b7474dabb 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py +++ b/metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py @@ -20,6 +20,81 @@ def test_bigquery_sql_lineage_hash_as_comment_sign_is_accepted(): assert parser.get_tables() == ["project.dataset.src_tbl"] +def test_bigquery_sql_lineage_camel_case_table(): + """ + This test aims to test the parameter to ignore sqllineage lowercasing. + On the BigQuery service, it's possible to use uppercase name un datasets and tables. + The lowercasing, by default, breaks the lineage construction in these cases. + """ + parser = BigQuerySQLParser( + sql_query=""" +/* +HERE IS A STANDARD COMMENT BLOCK +THIS WILL NOT BREAK sqllineage +*/ +CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS +#This, comment will not break sqllineage +SELECT foo, bar +-- this comment will not break sqllineage either +# this comment will not break sqllineage either +FROM `project.dataset.CamelCaseTable` + """, + use_raw_names=True + ) + + assert parser.get_tables() == ["project.dataset.CamelCaseTable"] + + +def test_bigquery_sql_lineage_camel_case_dataset(): + """ + This test aims to test the parameter to ignore sqllineage lowercasing. + On the BigQuery service, it's possible to use uppercase name un datasets and tables. + The lowercasing, by default, breaks the lineage construction in these cases. + """ + parser = BigQuerySQLParser( + sql_query=""" +/* +HERE IS A STANDARD COMMENT BLOCK +THIS WILL NOT BREAK sqllineage +*/ +CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS +#This, comment will not break sqllineage +SELECT foo, bar +-- this comment will not break sqllineage either +# this comment will not break sqllineage either +FROM `project.DataSet.table` + """, + use_raw_names=True + ) + + assert parser.get_tables() == ["project.DataSet.table"] + + +def test_bigquery_sql_lineage_camel_case_table_and_dataset(): + """ + This test aims to test the parameter to ignore sqllineage lowercasing. + On the BigQuery service, it's possible to use uppercase name un datasets and tables. + The lowercasing, by default, breaks the lineage construction in these cases. + """ + parser = BigQuerySQLParser( + sql_query=""" +/* +HERE IS A STANDARD COMMENT BLOCK +THIS WILL NOT BREAK sqllineage +*/ +CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS +#This, comment will not break sqllineage +SELECT foo, bar +-- this comment will not break sqllineage either +# this comment will not break sqllineage either +FROM `project.DataSet.CamelTable` + """, + use_raw_names=True + ) + + assert parser.get_tables() == ["project.DataSet.CamelTable"] + + def test_bigquery_sql_lineage_keyword_data_is_accepted(): parser = BigQuerySQLParser( sql_query=""" From cc020f88da2a5d8c7b596c9c7e0c5be03ca68d6a Mon Sep 17 00:00:00 2001 From: PatrickfBraz Date: Tue, 13 Dec 2022 10:45:11 -0300 Subject: [PATCH 3/5] chore(ingest): bigquery-lineage - applies changes to adjust the lint --- .../source/bigquery_v2/bigquery_config.py | 3 ++- .../ingestion/source/bigquery_v2/lineage.py | 10 ++++++---- .../datahub/utilities/bigquery_sql_parser.py | 11 +++++++++-- .../utilities/sql_lineage_parser_impl.py | 14 +++++++++----- .../src/datahub/utilities/sql_parser.py | 19 +++++++++++++------ .../tests/unit/test_bigquery_sql_lineage.py | 6 +++--- 6 files changed, 42 insertions(+), 21 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index f74c68b3d45c8..6b85acb2628e7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -82,9 +82,10 @@ class BigQueryV2Config(BigQueryConfig, LineageConfig): default=True, description="Sql parse view ddl to get lineage.", ) + lineage_sql_parser_use_raw_names: bool = Field( default=False, - description="This parameter ignores the lowercase pattern stipulated in the SQLParser. NOTE: Ignored if lineage_use_sql_parser is False." + description="This parameter ignores the lowercase pattern stipulated in the SQLParser. NOTE: Ignored if lineage_use_sql_parser is False.", ) convert_urns_to_lowercase: bool = Field( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index e7d069d458cba..9d00d6edacf7c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -432,8 +432,9 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st # to ensure we only use direct objects accessed for lineage try: parser = BigQuerySQLParser( - e.query, self.config.sql_parser_use_external_process, - use_raw_names=self.config.lineage_sql_parser_use_raw_names + e.query, + self.config.sql_parser_use_external_process, + use_raw_names=self.config.lineage_sql_parser_use_raw_names, ) referenced_objs = set( map(lambda x: x.split(".")[-1], parser.get_tables()) @@ -472,8 +473,9 @@ def parse_view_lineage( if view.view_definition: try: parser = BigQuerySQLParser( - view.view_definition, self.config.sql_parser_use_external_process, - use_raw_names=self.config.lineage_sql_parser_use_raw_names + view.view_definition, + self.config.sql_parser_use_external_process, + use_raw_names=self.config.lineage_sql_parser_use_raw_names, ) tables = parser.get_tables() except Exception as ex: diff --git a/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py b/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py index 3819d45aa0a20..ffa3c1f08afb0 100644 --- a/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py +++ b/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py @@ -9,11 +9,18 @@ class BigQuerySQLParser(SQLParser): parser: SQLParser - def __init__(self, sql_query: str, use_external_process: bool = False, use_raw_names: bool = False) -> None: + def __init__( + self, + sql_query: str, + use_external_process: bool = False, + use_raw_names: bool = False + ) -> None: super().__init__(sql_query) self._parsed_sql_query = self.parse_sql_query(sql_query) - self.parser = SqlLineageSQLParser(self._parsed_sql_query, use_external_process, use_raw_names) + self.parser = SqlLineageSQLParser( + self._parsed_sql_query, use_external_process, use_raw_names + ) def parse_sql_query(self, sql_query: str) -> str: sql_query = BigQuerySQLParser._parse_bigquery_comment_sign(sql_query) diff --git a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py index a9c00f6c6aa8b..15804df6d6c62 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py +++ b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py @@ -89,12 +89,12 @@ def __init__(self, sql_query: str, use_raw_names: bool = False) -> None: ] with unittest.mock.patch( - "sqllineage.core.handlers.source.SourceHandler.end_of_query_cleanup", - datahub.utilities.sqllineage_patch.end_of_query_cleanup_patch, + "sqllineage.core.handlers.source.SourceHandler.end_of_query_cleanup", + datahub.utilities.sqllineage_patch.end_of_query_cleanup_patch, ): with unittest.mock.patch( - "sqllineage.core.holders.SubQueryLineageHolder.add_column_lineage", - datahub.utilities.sqllineage_patch.add_column_lineage_patch, + "sqllineage.core.holders.SubQueryLineageHolder.add_column_lineage", + datahub.utilities.sqllineage_patch.add_column_lineage_patch, ): self._stmt_holders = [ LineageAnalyzer().analyze(stmt) for stmt in self._stmt @@ -112,7 +112,11 @@ def get_tables(self) -> List[str]: return result for table in self._sql_holder.source_tables: table_normalized = re.sub( - r"^.", "", str(table) if not self._use_raw_names else f"{table.schema.raw_name}.{table.raw_name}" + r"^.", + "", + str(table) + if not self._use_raw_names + else f"{table.schema.raw_name}.{table.raw_name}" ) result.append(str(table_normalized)) diff --git a/metadata-ingestion/src/datahub/utilities/sql_parser.py b/metadata-ingestion/src/datahub/utilities/sql_parser.py index 43ead519d2f47..74b35d40f5d10 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_parser.py +++ b/metadata-ingestion/src/datahub/utilities/sql_parser.py @@ -68,8 +68,7 @@ def get_columns(self) -> List[str]: def sql_lineage_parser_impl_func_wrapper( - queue: Optional[multiprocessing.Queue], - sql_query: str, use_raw_names: bool = False + queue: Optional[multiprocessing.Queue], sql_query: str, use_raw_names: bool = False ) -> Optional[Tuple[List[str], List[str], Any]]: """ The wrapper function that computes the tables and columns using the SqlLineageSQLParserImpl @@ -102,15 +101,23 @@ def sql_lineage_parser_impl_func_wrapper( class SqlLineageSQLParser(SQLParser): - def __init__(self, sql_query: str, use_external_process: bool = True, use_raw_names: bool = False) -> None: + def __init__( + self, + sql_query: str, + use_external_process: bool = True, + use_raw_names: bool = False + ) -> None: super().__init__(sql_query, use_external_process) if use_external_process: self.tables, self.columns = self._get_tables_columns_process_wrapped( + sql_query, use_raw_names + ) + else: + return_tuple = sql_lineage_parser_impl_func_wrapper( + None, sql_query, use_raw_names ) - else: - return_tuple = sql_lineage_parser_impl_func_wrapper(None, sql_query, use_raw_names) if return_tuple is not None: ( self.tables, @@ -120,7 +127,7 @@ def __init__(self, sql_query: str, use_external_process: bool = True, use_raw_na @staticmethod def _get_tables_columns_process_wrapped( - sql_query: str, use_raw_names: bool = False + sql_query: str, use_raw_names: bool = False ) -> Tuple[List[str], List[str]]: # Invoke sql_lineage_parser_impl_func_wrapper in a separate process to avoid # memory leaks from sqllineage module used by SqlLineageSQLParserImpl. This will help diff --git a/metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py b/metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py index 93f9b7474dabb..6214021e265ca 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py +++ b/metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py @@ -39,7 +39,7 @@ def test_bigquery_sql_lineage_camel_case_table(): # this comment will not break sqllineage either FROM `project.dataset.CamelCaseTable` """, - use_raw_names=True + use_raw_names=True, ) assert parser.get_tables() == ["project.dataset.CamelCaseTable"] @@ -64,7 +64,7 @@ def test_bigquery_sql_lineage_camel_case_dataset(): # this comment will not break sqllineage either FROM `project.DataSet.table` """, - use_raw_names=True + use_raw_names=True, ) assert parser.get_tables() == ["project.DataSet.table"] @@ -89,7 +89,7 @@ def test_bigquery_sql_lineage_camel_case_table_and_dataset(): # this comment will not break sqllineage either FROM `project.DataSet.CamelTable` """, - use_raw_names=True + use_raw_names=True, ) assert parser.get_tables() == ["project.DataSet.CamelTable"] From e2db21788d9f2bf74b0d2186a2d2004b5045fd31 Mon Sep 17 00:00:00 2001 From: PatrickfBraz Date: Tue, 13 Dec 2022 11:05:21 -0300 Subject: [PATCH 4/5] chore(ingest): bigquery-lineage - changes to adjust the lint --- .../datahub/utilities/bigquery_sql_parser.py | 8 ++++---- .../utilities/sql_lineage_parser_impl.py | 2 +- .../src/datahub/utilities/sql_parser.py | 20 +++++++------------ 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py b/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py index ffa3c1f08afb0..6097fb544a7c1 100644 --- a/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py +++ b/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py @@ -10,10 +10,10 @@ class BigQuerySQLParser(SQLParser): parser: SQLParser def __init__( - self, - sql_query: str, - use_external_process: bool = False, - use_raw_names: bool = False + self, + sql_query: str, + use_external_process: bool = False, + use_raw_names: bool = False ) -> None: super().__init__(sql_query) diff --git a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py index 15804df6d6c62..d0e38de661dd1 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py +++ b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py @@ -116,7 +116,7 @@ def get_tables(self) -> List[str]: "", str(table) if not self._use_raw_names - else f"{table.schema.raw_name}.{table.raw_name}" + else f"{table.schema.raw_name}.{table.raw_name}", ) result.append(str(table_normalized)) diff --git a/metadata-ingestion/src/datahub/utilities/sql_parser.py b/metadata-ingestion/src/datahub/utilities/sql_parser.py index 74b35d40f5d10..e6ca788c89695 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_parser.py +++ b/metadata-ingestion/src/datahub/utilities/sql_parser.py @@ -68,7 +68,7 @@ def get_columns(self) -> List[str]: def sql_lineage_parser_impl_func_wrapper( - queue: Optional[multiprocessing.Queue], sql_query: str, use_raw_names: bool = False + queue: Optional[multiprocessing.Queue], sql_query: str, use_raw_names: bool = False ) -> Optional[Tuple[List[str], List[str], Any]]: """ The wrapper function that computes the tables and columns using the SqlLineageSQLParserImpl @@ -102,10 +102,10 @@ def sql_lineage_parser_impl_func_wrapper( class SqlLineageSQLParser(SQLParser): def __init__( - self, - sql_query: str, - use_external_process: bool = True, - use_raw_names: bool = False + self, + sql_query: str, + use_external_process: bool = True, + use_raw_names: bool = False ) -> None: super().__init__(sql_query, use_external_process) if use_external_process: @@ -114,9 +114,7 @@ def __init__( ) else: return_tuple = sql_lineage_parser_impl_func_wrapper( - None, - sql_query, - use_raw_names + None, sql_query, use_raw_names ) if return_tuple is not None: ( @@ -136,11 +134,7 @@ def _get_tables_columns_process_wrapped( queue: multiprocessing.Queue = Queue() process: multiprocessing.Process = Process( target=sql_lineage_parser_impl_func_wrapper, - args=( - queue, - sql_query, - use_raw_names - ), + args=(queue, sql_query, use_raw_names), ) process.start() tables, columns, exception_details = queue.get(block=True) From c89a9c712c1f5c2a2393ac0793c5602e25081732 Mon Sep 17 00:00:00 2001 From: PatrickfBraz Date: Tue, 13 Dec 2022 17:24:48 -0300 Subject: [PATCH 5/5] test(ingest): bigquery-sql-lineage - adds tests for uppercase table names --- .../datahub/utilities/bigquery_sql_parser.py | 2 +- .../src/datahub/utilities/sql_parser.py | 2 +- .../tests/unit/test_bigquery_sql_lineage.py | 94 +++++++++++++++++++ 3 files changed, 96 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py b/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py index 6097fb544a7c1..6c8f9f26d9f00 100644 --- a/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py +++ b/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py @@ -13,7 +13,7 @@ def __init__( self, sql_query: str, use_external_process: bool = False, - use_raw_names: bool = False + use_raw_names: bool = False, ) -> None: super().__init__(sql_query) diff --git a/metadata-ingestion/src/datahub/utilities/sql_parser.py b/metadata-ingestion/src/datahub/utilities/sql_parser.py index e6ca788c89695..631b7d91ff3d9 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_parser.py +++ b/metadata-ingestion/src/datahub/utilities/sql_parser.py @@ -105,7 +105,7 @@ def __init__( self, sql_query: str, use_external_process: bool = True, - use_raw_names: bool = False + use_raw_names: bool = False, ) -> None: super().__init__(sql_query, use_external_process) if use_external_process: diff --git a/metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py b/metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py index 6214021e265ca..f807be747a193 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py +++ b/metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py @@ -95,6 +95,100 @@ def test_bigquery_sql_lineage_camel_case_table_and_dataset(): assert parser.get_tables() == ["project.DataSet.CamelTable"] +def test_bigquery_sql_lineage_camel_case_table_and_dataset_subquery(): + """ + This test aims to test the parameter to ignore sqllineage lowercasing. + On the BigQuery service, it's possible to use uppercase name un datasets and tables. + The lowercasing, by default, breaks the lineage construction in these cases. + """ + parser = BigQuerySQLParser( + sql_query=""" +/* +HERE IS A STANDARD COMMENT BLOCK +THIS WILL NOT BREAK sqllineage +*/ +CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS +#This, comment will not break sqllineage +SELECT foo, bar +-- this comment will not break sqllineage either +# this comment will not break sqllineage either +FROM ( + # this comment will not break sqllineage either + SELECT * FROM `project.DataSet.CamelTable` +) + """, + use_raw_names=True, + ) + + assert parser.get_tables() == ["project.DataSet.CamelTable"] + + +def test_bigquery_sql_lineage_camel_case_table_and_dataset_joins(): + """ + This test aims to test the parameter to ignore sqllineage lowercasing. + On the BigQuery service, it's possible to use uppercase name un datasets and tables. + The lowercasing, by default, breaks the lineage construction in these cases. + """ + parser = BigQuerySQLParser( + sql_query=""" +/* +HERE IS A STANDARD COMMENT BLOCK +THIS WILL NOT BREAK sqllineage +*/ +CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS +#This, comment will not break sqllineage +SELECT foo, bar +-- this comment will not break sqllineage either +# this comment will not break sqllineage either +FROM `project.DataSet1.CamelTable` +INNER JOIN `project.DataSet2.CamelTable2` + ON b.id = a.id +LEFT JOIN `project.DataSet3.CamelTable3` + on c.id = b.id + """, + use_raw_names=True, + ) + + assert parser.get_tables() == [ + "project.DataSet1.CamelTable", + "project.DataSet2.CamelTable2", + "project.DataSet3.CamelTable3", + ] + + +def test_bigquery_sql_lineage_camel_case_table_and_dataset_joins_and_subquery(): + """ + This test aims to test the parameter to ignore sqllineage lowercasing. + On the BigQuery service, it's possible to use uppercase name un datasets and tables. + The lowercasing, by default, breaks the lineage construction in these cases. + """ + parser = BigQuerySQLParser( + sql_query=""" +/* +HERE IS A STANDARD COMMENT BLOCK +THIS WILL NOT BREAK sqllineage +*/ +CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS +#This, comment will not break sqllineage +SELECT foo, bar +-- this comment will not break sqllineage either +# this comment will not break sqllineage either +FROM `project.DataSet1.CamelTable` a +INNER JOIN `project.DataSet2.CamelTable2` b + ON b.id = a.id +LEFT JOIN (SELECT * FROM `project.DataSet3.CamelTable3`) c + ON c.id = b.id + """, + use_raw_names=True, + ) + + assert parser.get_tables() == [ + "project.DataSet1.CamelTable", + "project.DataSet2.CamelTable2", + "project.DataSet3.CamelTable3", + ] + + def test_bigquery_sql_lineage_keyword_data_is_accepted(): parser = BigQuerySQLParser( sql_query="""