Skip to content

Commit

Permalink
refactor(ingest): bigquery-lineage - allow tables and datasets in upp…
Browse files Browse the repository at this point in the history
…ercase (#6739)
  • Loading branch information
PatrickfBraz authored Dec 14, 2022
1 parent 68fd802 commit f0a3719
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ class BigQueryV2Config(BigQueryConfig, LineageConfig):
description="Sql parse view ddl to get lineage.",
)

lineage_sql_parser_use_raw_names: bool = Field(
default=False,
description="This parameter ignores the lowercase pattern stipulated in the SQLParser. NOTE: Ignored if lineage_use_sql_parser is False.",
)

convert_urns_to_lowercase: bool = Field(
default=False,
description="Convert urns to lowercase.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,9 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st
# to ensure we only use direct objects accessed for lineage
try:
parser = BigQuerySQLParser(
e.query, self.config.sql_parser_use_external_process
e.query,
self.config.sql_parser_use_external_process,
use_raw_names=self.config.lineage_sql_parser_use_raw_names,
)
referenced_objs = set(
map(lambda x: x.split(".")[-1], parser.get_tables())
Expand Down Expand Up @@ -471,7 +473,9 @@ def parse_view_lineage(
if view.view_definition:
try:
parser = BigQuerySQLParser(
view.view_definition, self.config.sql_parser_use_external_process
view.view_definition,
self.config.sql_parser_use_external_process,
use_raw_names=self.config.lineage_sql_parser_use_raw_names,
)
tables = parser.get_tables()
except Exception as ex:
Expand Down
11 changes: 9 additions & 2 deletions metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@
class BigQuerySQLParser(SQLParser):
parser: SQLParser

def __init__(self, sql_query: str, use_external_process: bool = False) -> None:
def __init__(
self,
sql_query: str,
use_external_process: bool = False,
use_raw_names: bool = False,
) -> None:
super().__init__(sql_query)

self._parsed_sql_query = self.parse_sql_query(sql_query)
self.parser = SqlLineageSQLParser(self._parsed_sql_query, use_external_process)
self.parser = SqlLineageSQLParser(
self._parsed_sql_query, use_external_process, use_raw_names
)

def parse_sql_query(self, sql_query: str) -> str:
sql_query = BigQuerySQLParser._parse_bigquery_comment_sign(sql_query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ class SqlLineageSQLParserImpl(SQLParser):
_MYVIEW_SQL_TABLE_NAME_TOKEN = "__my_view__.__sql_table_name__"
_MYVIEW_LOOKER_TOKEN = "my_view.SQL_TABLE_NAME"

def __init__(self, sql_query: str) -> None:
def __init__(self, sql_query: str, use_raw_names: bool = False) -> None:
super().__init__(sql_query)
original_sql_query = sql_query
self._use_raw_names = use_raw_names

# SqlLineageParser makes mistakes on lateral flatten queries, use the prefix
if "lateral flatten" in sql_query:
Expand Down Expand Up @@ -110,7 +111,13 @@ def get_tables(self) -> List[str]:
logger.error("sql holder not present so cannot get tables")
return result
for table in self._sql_holder.source_tables:
table_normalized = re.sub(r"^<default>.", "", str(table))
table_normalized = re.sub(
r"^<default>.",
"",
str(table)
if not self._use_raw_names
else f"{table.schema.raw_name}.{table.raw_name}",
)
result.append(str(table_normalized))

# We need to revert TOKEN replacements
Expand Down
26 changes: 15 additions & 11 deletions metadata-ingestion/src/datahub/utilities/sql_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ def get_columns(self) -> List[str]:


def sql_lineage_parser_impl_func_wrapper(
queue: Optional[multiprocessing.Queue],
sql_query: str,
queue: Optional[multiprocessing.Queue], sql_query: str, use_raw_names: bool = False
) -> Optional[Tuple[List[str], List[str], Any]]:
"""
The wrapper function that computes the tables and columns using the SqlLineageSQLParserImpl
Expand All @@ -78,13 +77,14 @@ def sql_lineage_parser_impl_func_wrapper(
the sqllineage module.
:param queue: The shared IPC queue on to which the results will be put.
:param sql_query: The SQL query to extract the tables & columns from.
:param use_raw_names: Parameter used to ignore sqllineage's default lowercasing.
:return: None.
"""
exception_details: Optional[Tuple[Optional[Type[BaseException]], str]] = None
tables: List[str] = []
columns: List[str] = []
try:
parser = SqlLineageSQLParserImpl(sql_query)
parser = SqlLineageSQLParserImpl(sql_query, use_raw_names)
tables = parser.get_tables()
columns = parser.get_columns()
except BaseException:
Expand All @@ -101,14 +101,21 @@ def sql_lineage_parser_impl_func_wrapper(


class SqlLineageSQLParser(SQLParser):
def __init__(self, sql_query: str, use_external_process: bool = True) -> None:
def __init__(
self,
sql_query: str,
use_external_process: bool = True,
use_raw_names: bool = False,
) -> None:
super().__init__(sql_query, use_external_process)
if use_external_process:
self.tables, self.columns = self._get_tables_columns_process_wrapped(
sql_query
sql_query, use_raw_names
)
else:
return_tuple = sql_lineage_parser_impl_func_wrapper(None, sql_query)
return_tuple = sql_lineage_parser_impl_func_wrapper(
None, sql_query, use_raw_names
)
if return_tuple is not None:
(
self.tables,
Expand All @@ -118,7 +125,7 @@ def __init__(self, sql_query: str, use_external_process: bool = True) -> None:

@staticmethod
def _get_tables_columns_process_wrapped(
sql_query: str,
sql_query: str, use_raw_names: bool = False
) -> Tuple[List[str], List[str]]:
# Invoke sql_lineage_parser_impl_func_wrapper in a separate process to avoid
# memory leaks from sqllineage module used by SqlLineageSQLParserImpl. This will help
Expand All @@ -127,10 +134,7 @@ def _get_tables_columns_process_wrapped(
queue: multiprocessing.Queue = Queue()
process: multiprocessing.Process = Process(
target=sql_lineage_parser_impl_func_wrapper,
args=(
queue,
sql_query,
),
args=(queue, sql_query, use_raw_names),
)
process.start()
tables, columns, exception_details = queue.get(block=True)
Expand Down
169 changes: 169 additions & 0 deletions metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,175 @@ def test_bigquery_sql_lineage_hash_as_comment_sign_is_accepted():
assert parser.get_tables() == ["project.dataset.src_tbl"]


def test_bigquery_sql_lineage_camel_case_table():
"""
This test aims to test the parameter to ignore sqllineage lowercasing.
On the BigQuery service, it's possible to use uppercase name un datasets and tables.
The lowercasing, by default, breaks the lineage construction in these cases.
"""
parser = BigQuerySQLParser(
sql_query="""
/*
HERE IS A STANDARD COMMENT BLOCK
THIS WILL NOT BREAK sqllineage
*/
CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS
#This, comment will not break sqllineage
SELECT foo, bar
-- this comment will not break sqllineage either
# this comment will not break sqllineage either
FROM `project.dataset.CamelCaseTable`
""",
use_raw_names=True,
)

assert parser.get_tables() == ["project.dataset.CamelCaseTable"]


def test_bigquery_sql_lineage_camel_case_dataset():
"""
This test aims to test the parameter to ignore sqllineage lowercasing.
On the BigQuery service, it's possible to use uppercase name un datasets and tables.
The lowercasing, by default, breaks the lineage construction in these cases.
"""
parser = BigQuerySQLParser(
sql_query="""
/*
HERE IS A STANDARD COMMENT BLOCK
THIS WILL NOT BREAK sqllineage
*/
CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS
#This, comment will not break sqllineage
SELECT foo, bar
-- this comment will not break sqllineage either
# this comment will not break sqllineage either
FROM `project.DataSet.table`
""",
use_raw_names=True,
)

assert parser.get_tables() == ["project.DataSet.table"]


def test_bigquery_sql_lineage_camel_case_table_and_dataset():
"""
This test aims to test the parameter to ignore sqllineage lowercasing.
On the BigQuery service, it's possible to use uppercase name un datasets and tables.
The lowercasing, by default, breaks the lineage construction in these cases.
"""
parser = BigQuerySQLParser(
sql_query="""
/*
HERE IS A STANDARD COMMENT BLOCK
THIS WILL NOT BREAK sqllineage
*/
CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS
#This, comment will not break sqllineage
SELECT foo, bar
-- this comment will not break sqllineage either
# this comment will not break sqllineage either
FROM `project.DataSet.CamelTable`
""",
use_raw_names=True,
)

assert parser.get_tables() == ["project.DataSet.CamelTable"]


def test_bigquery_sql_lineage_camel_case_table_and_dataset_subquery():
"""
This test aims to test the parameter to ignore sqllineage lowercasing.
On the BigQuery service, it's possible to use uppercase name un datasets and tables.
The lowercasing, by default, breaks the lineage construction in these cases.
"""
parser = BigQuerySQLParser(
sql_query="""
/*
HERE IS A STANDARD COMMENT BLOCK
THIS WILL NOT BREAK sqllineage
*/
CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS
#This, comment will not break sqllineage
SELECT foo, bar
-- this comment will not break sqllineage either
# this comment will not break sqllineage either
FROM (
# this comment will not break sqllineage either
SELECT * FROM `project.DataSet.CamelTable`
)
""",
use_raw_names=True,
)

assert parser.get_tables() == ["project.DataSet.CamelTable"]


def test_bigquery_sql_lineage_camel_case_table_and_dataset_joins():
"""
This test aims to test the parameter to ignore sqllineage lowercasing.
On the BigQuery service, it's possible to use uppercase name un datasets and tables.
The lowercasing, by default, breaks the lineage construction in these cases.
"""
parser = BigQuerySQLParser(
sql_query="""
/*
HERE IS A STANDARD COMMENT BLOCK
THIS WILL NOT BREAK sqllineage
*/
CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS
#This, comment will not break sqllineage
SELECT foo, bar
-- this comment will not break sqllineage either
# this comment will not break sqllineage either
FROM `project.DataSet1.CamelTable`
INNER JOIN `project.DataSet2.CamelTable2`
ON b.id = a.id
LEFT JOIN `project.DataSet3.CamelTable3`
on c.id = b.id
""",
use_raw_names=True,
)

assert parser.get_tables() == [
"project.DataSet1.CamelTable",
"project.DataSet2.CamelTable2",
"project.DataSet3.CamelTable3",
]


def test_bigquery_sql_lineage_camel_case_table_and_dataset_joins_and_subquery():
"""
This test aims to test the parameter to ignore sqllineage lowercasing.
On the BigQuery service, it's possible to use uppercase name un datasets and tables.
The lowercasing, by default, breaks the lineage construction in these cases.
"""
parser = BigQuerySQLParser(
sql_query="""
/*
HERE IS A STANDARD COMMENT BLOCK
THIS WILL NOT BREAK sqllineage
*/
CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS
#This, comment will not break sqllineage
SELECT foo, bar
-- this comment will not break sqllineage either
# this comment will not break sqllineage either
FROM `project.DataSet1.CamelTable` a
INNER JOIN `project.DataSet2.CamelTable2` b
ON b.id = a.id
LEFT JOIN (SELECT * FROM `project.DataSet3.CamelTable3`) c
ON c.id = b.id
""",
use_raw_names=True,
)

assert parser.get_tables() == [
"project.DataSet1.CamelTable",
"project.DataSet2.CamelTable2",
"project.DataSet3.CamelTable3",
]


def test_bigquery_sql_lineage_keyword_data_is_accepted():
parser = BigQuerySQLParser(
sql_query="""
Expand Down

0 comments on commit f0a3719

Please sign in to comment.