diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index c2518cd4fc478..b23799b886fde 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -93,6 +93,11 @@ class BigQueryV2Config(BigQueryConfig, LineageConfig): description="Sql parse view ddl to get lineage.", ) + lineage_sql_parser_use_raw_names: bool = Field( + default=False, + description="This parameter ignores the lowercase pattern stipulated in the SQLParser. NOTE: Ignored if lineage_use_sql_parser is False.", + ) + convert_urns_to_lowercase: bool = Field( default=False, description="Convert urns to lowercase.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index faf02649d5b43..9d00d6edacf7c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -432,7 +432,9 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st # to ensure we only use direct objects accessed for lineage try: parser = BigQuerySQLParser( - e.query, self.config.sql_parser_use_external_process + e.query, + self.config.sql_parser_use_external_process, + use_raw_names=self.config.lineage_sql_parser_use_raw_names, ) referenced_objs = set( map(lambda x: x.split(".")[-1], parser.get_tables()) @@ -471,7 +473,9 @@ def parse_view_lineage( if view.view_definition: try: parser = BigQuerySQLParser( - view.view_definition, self.config.sql_parser_use_external_process + view.view_definition, + self.config.sql_parser_use_external_process, + use_raw_names=self.config.lineage_sql_parser_use_raw_names, ) tables = parser.get_tables() except Exception as ex: diff --git a/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py b/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py index ca23a60fab8ae..6c8f9f26d9f00 100644 --- a/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py +++ b/metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py @@ -9,11 +9,18 @@ class BigQuerySQLParser(SQLParser): parser: SQLParser - def __init__(self, sql_query: str, use_external_process: bool = False) -> None: + def __init__( + self, + sql_query: str, + use_external_process: bool = False, + use_raw_names: bool = False, + ) -> None: super().__init__(sql_query) self._parsed_sql_query = self.parse_sql_query(sql_query) - self.parser = SqlLineageSQLParser(self._parsed_sql_query, use_external_process) + self.parser = SqlLineageSQLParser( + self._parsed_sql_query, use_external_process, use_raw_names + ) def parse_sql_query(self, sql_query: str) -> str: sql_query = BigQuerySQLParser._parse_bigquery_comment_sign(sql_query) diff --git a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py index a386a000c50cc..d0e38de661dd1 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py +++ b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py @@ -28,9 +28,10 @@ class SqlLineageSQLParserImpl(SQLParser): _MYVIEW_SQL_TABLE_NAME_TOKEN = "__my_view__.__sql_table_name__" _MYVIEW_LOOKER_TOKEN = "my_view.SQL_TABLE_NAME" - def __init__(self, sql_query: str) -> None: + def __init__(self, sql_query: str, use_raw_names: bool = False) -> None: super().__init__(sql_query) original_sql_query = sql_query + self._use_raw_names = use_raw_names # SqlLineageParser makes mistakes on lateral flatten queries, use the prefix if "lateral flatten" in sql_query: @@ -110,7 +111,13 @@ def get_tables(self) -> List[str]: logger.error("sql holder not present so cannot get tables") return result for table in self._sql_holder.source_tables: - table_normalized = re.sub(r"^.", "", str(table)) + table_normalized = re.sub( + r"^.", + "", + str(table) + if not self._use_raw_names + else f"{table.schema.raw_name}.{table.raw_name}", + ) result.append(str(table_normalized)) # We need to revert TOKEN replacements diff --git a/metadata-ingestion/src/datahub/utilities/sql_parser.py b/metadata-ingestion/src/datahub/utilities/sql_parser.py index 82a6d5ba46b11..631b7d91ff3d9 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_parser.py +++ b/metadata-ingestion/src/datahub/utilities/sql_parser.py @@ -68,8 +68,7 @@ def get_columns(self) -> List[str]: def sql_lineage_parser_impl_func_wrapper( - queue: Optional[multiprocessing.Queue], - sql_query: str, + queue: Optional[multiprocessing.Queue], sql_query: str, use_raw_names: bool = False ) -> Optional[Tuple[List[str], List[str], Any]]: """ The wrapper function that computes the tables and columns using the SqlLineageSQLParserImpl @@ -78,13 +77,14 @@ def sql_lineage_parser_impl_func_wrapper( the sqllineage module. :param queue: The shared IPC queue on to which the results will be put. :param sql_query: The SQL query to extract the tables & columns from. + :param use_raw_names: Parameter used to ignore sqllineage's default lowercasing. :return: None. """ exception_details: Optional[Tuple[Optional[Type[BaseException]], str]] = None tables: List[str] = [] columns: List[str] = [] try: - parser = SqlLineageSQLParserImpl(sql_query) + parser = SqlLineageSQLParserImpl(sql_query, use_raw_names) tables = parser.get_tables() columns = parser.get_columns() except BaseException: @@ -101,14 +101,21 @@ def sql_lineage_parser_impl_func_wrapper( class SqlLineageSQLParser(SQLParser): - def __init__(self, sql_query: str, use_external_process: bool = True) -> None: + def __init__( + self, + sql_query: str, + use_external_process: bool = True, + use_raw_names: bool = False, + ) -> None: super().__init__(sql_query, use_external_process) if use_external_process: self.tables, self.columns = self._get_tables_columns_process_wrapped( - sql_query + sql_query, use_raw_names ) else: - return_tuple = sql_lineage_parser_impl_func_wrapper(None, sql_query) + return_tuple = sql_lineage_parser_impl_func_wrapper( + None, sql_query, use_raw_names + ) if return_tuple is not None: ( self.tables, @@ -118,7 +125,7 @@ def __init__(self, sql_query: str, use_external_process: bool = True) -> None: @staticmethod def _get_tables_columns_process_wrapped( - sql_query: str, + sql_query: str, use_raw_names: bool = False ) -> Tuple[List[str], List[str]]: # Invoke sql_lineage_parser_impl_func_wrapper in a separate process to avoid # memory leaks from sqllineage module used by SqlLineageSQLParserImpl. This will help @@ -127,10 +134,7 @@ def _get_tables_columns_process_wrapped( queue: multiprocessing.Queue = Queue() process: multiprocessing.Process = Process( target=sql_lineage_parser_impl_func_wrapper, - args=( - queue, - sql_query, - ), + args=(queue, sql_query, use_raw_names), ) process.start() tables, columns, exception_details = queue.get(block=True) diff --git a/metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py b/metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py index c2c6d6bd7c868..f807be747a193 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py +++ b/metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py @@ -20,6 +20,175 @@ def test_bigquery_sql_lineage_hash_as_comment_sign_is_accepted(): assert parser.get_tables() == ["project.dataset.src_tbl"] +def test_bigquery_sql_lineage_camel_case_table(): + """ + This test aims to test the parameter to ignore sqllineage lowercasing. + On the BigQuery service, it's possible to use uppercase name un datasets and tables. + The lowercasing, by default, breaks the lineage construction in these cases. + """ + parser = BigQuerySQLParser( + sql_query=""" +/* +HERE IS A STANDARD COMMENT BLOCK +THIS WILL NOT BREAK sqllineage +*/ +CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS +#This, comment will not break sqllineage +SELECT foo, bar +-- this comment will not break sqllineage either +# this comment will not break sqllineage either +FROM `project.dataset.CamelCaseTable` + """, + use_raw_names=True, + ) + + assert parser.get_tables() == ["project.dataset.CamelCaseTable"] + + +def test_bigquery_sql_lineage_camel_case_dataset(): + """ + This test aims to test the parameter to ignore sqllineage lowercasing. + On the BigQuery service, it's possible to use uppercase name un datasets and tables. + The lowercasing, by default, breaks the lineage construction in these cases. + """ + parser = BigQuerySQLParser( + sql_query=""" +/* +HERE IS A STANDARD COMMENT BLOCK +THIS WILL NOT BREAK sqllineage +*/ +CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS +#This, comment will not break sqllineage +SELECT foo, bar +-- this comment will not break sqllineage either +# this comment will not break sqllineage either +FROM `project.DataSet.table` + """, + use_raw_names=True, + ) + + assert parser.get_tables() == ["project.DataSet.table"] + + +def test_bigquery_sql_lineage_camel_case_table_and_dataset(): + """ + This test aims to test the parameter to ignore sqllineage lowercasing. + On the BigQuery service, it's possible to use uppercase name un datasets and tables. + The lowercasing, by default, breaks the lineage construction in these cases. + """ + parser = BigQuerySQLParser( + sql_query=""" +/* +HERE IS A STANDARD COMMENT BLOCK +THIS WILL NOT BREAK sqllineage +*/ +CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS +#This, comment will not break sqllineage +SELECT foo, bar +-- this comment will not break sqllineage either +# this comment will not break sqllineage either +FROM `project.DataSet.CamelTable` + """, + use_raw_names=True, + ) + + assert parser.get_tables() == ["project.DataSet.CamelTable"] + + +def test_bigquery_sql_lineage_camel_case_table_and_dataset_subquery(): + """ + This test aims to test the parameter to ignore sqllineage lowercasing. + On the BigQuery service, it's possible to use uppercase name un datasets and tables. + The lowercasing, by default, breaks the lineage construction in these cases. + """ + parser = BigQuerySQLParser( + sql_query=""" +/* +HERE IS A STANDARD COMMENT BLOCK +THIS WILL NOT BREAK sqllineage +*/ +CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS +#This, comment will not break sqllineage +SELECT foo, bar +-- this comment will not break sqllineage either +# this comment will not break sqllineage either +FROM ( + # this comment will not break sqllineage either + SELECT * FROM `project.DataSet.CamelTable` +) + """, + use_raw_names=True, + ) + + assert parser.get_tables() == ["project.DataSet.CamelTable"] + + +def test_bigquery_sql_lineage_camel_case_table_and_dataset_joins(): + """ + This test aims to test the parameter to ignore sqllineage lowercasing. + On the BigQuery service, it's possible to use uppercase name un datasets and tables. + The lowercasing, by default, breaks the lineage construction in these cases. + """ + parser = BigQuerySQLParser( + sql_query=""" +/* +HERE IS A STANDARD COMMENT BLOCK +THIS WILL NOT BREAK sqllineage +*/ +CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS +#This, comment will not break sqllineage +SELECT foo, bar +-- this comment will not break sqllineage either +# this comment will not break sqllineage either +FROM `project.DataSet1.CamelTable` +INNER JOIN `project.DataSet2.CamelTable2` + ON b.id = a.id +LEFT JOIN `project.DataSet3.CamelTable3` + on c.id = b.id + """, + use_raw_names=True, + ) + + assert parser.get_tables() == [ + "project.DataSet1.CamelTable", + "project.DataSet2.CamelTable2", + "project.DataSet3.CamelTable3", + ] + + +def test_bigquery_sql_lineage_camel_case_table_and_dataset_joins_and_subquery(): + """ + This test aims to test the parameter to ignore sqllineage lowercasing. + On the BigQuery service, it's possible to use uppercase name un datasets and tables. + The lowercasing, by default, breaks the lineage construction in these cases. + """ + parser = BigQuerySQLParser( + sql_query=""" +/* +HERE IS A STANDARD COMMENT BLOCK +THIS WILL NOT BREAK sqllineage +*/ +CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS +#This, comment will not break sqllineage +SELECT foo, bar +-- this comment will not break sqllineage either +# this comment will not break sqllineage either +FROM `project.DataSet1.CamelTable` a +INNER JOIN `project.DataSet2.CamelTable2` b + ON b.id = a.id +LEFT JOIN (SELECT * FROM `project.DataSet3.CamelTable3`) c + ON c.id = b.id + """, + use_raw_names=True, + ) + + assert parser.get_tables() == [ + "project.DataSet1.CamelTable", + "project.DataSet2.CamelTable2", + "project.DataSet3.CamelTable3", + ] + + def test_bigquery_sql_lineage_keyword_data_is_accepted(): parser = BigQuerySQLParser( sql_query="""