Skip to content

Commit

Permalink
refactor(redshift): Improve redshift error handling with new structur…
Browse files Browse the repository at this point in the history
…ed reporting system (datahub-project#10870)

Co-authored-by: John Joyce <[email protected]>
Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
3 people authored and aviv-julienjehannet committed Jul 25, 2024
1 parent 13635bc commit 895221d
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1216,8 +1216,22 @@ def _generate_single_profile(
except Exception as e:
if not self.config.catch_exceptions:
raise e
logger.exception(f"Encountered exception while profiling {pretty_name}")
self.report.report_warning(pretty_name, f"Profiling exception {e}")

error_message = str(e).lower()
if "permission denied" in error_message:
self.report.warning(
title="Unauthorized to extract data profile statistics",
message="We were denied access while attempting to generate profiling statistics for some assets. Please ensure the provided user has permission to query these tables and views.",
context=f"Asset: {pretty_name}",
exc=e,
)
else:
self.report.warning(
title="Failed to extract statistics for some assets",
message="Caught unexpected exception while attempting to extract profiling statistics for some assets.",
context=f"Asset: {pretty_name}",
exc=e,
)
return None
finally:
if batch is not None and self.base_engine.engine.name == TRINO:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from typing import Callable, Iterable, TypeVar, Union

import redshift_connector
from typing_extensions import ParamSpec

from datahub.ingestion.source.redshift.report import RedshiftReport

T = TypeVar("T")
P = ParamSpec("P")


def handle_redshift_exceptions(
report: RedshiftReport,
func: Callable[P, T],
*args: P.args,
**kwargs: P.kwargs,
) -> Union[T, None]:
try:
return func(*args, **kwargs)
except redshift_connector.Error as e:
report_redshift_failure(report, e)
return None


def handle_redshift_exceptions_yield(
report: RedshiftReport,
func: Callable[P, Iterable[T]],
*args: P.args,
**kwargs: P.kwargs,
) -> Iterable[T]:
try:
yield from func(*args, **kwargs)
except redshift_connector.Error as e:
report_redshift_failure(report, e)


def report_redshift_failure(
report: RedshiftReport, e: redshift_connector.Error
) -> None:
error_message = str(e).lower()
if "permission denied" in error_message:
if "svv_table_info" in error_message:
report.report_failure(
title="Permission denied",
message="Failed to extract metadata due to insufficient permission to access 'svv_table_info' table. Please ensure the provided database user has access.",
exc=e,
)
elif "svl_user_info" in error_message:
report.report_failure(
title="Permission denied",
message="Failed to extract metadata due to insufficient permission to access 'svl_user_info' table. Please ensure the provided database user has access.",
exc=e,
)
else:
report.report_failure(
title="Permission denied",
message="Failed to extract metadata due to insufficient permissions.",
exc=e,
)
else:
report.report_failure(
title="Failed to extract some metadata",
message="Failed to extract some metadata from Redshift.",
exc=e,
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import collections
import logging
import traceback
from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple, Union

import redshift_connector
Expand Down Expand Up @@ -249,8 +248,10 @@ def _populate_lineage_agg(
processor(lineage_row)
except Exception as e:
self.report.warning(
f"lineage-v2-extract-{lineage_type.name}",
f"Error was {e}, {traceback.format_exc()}",
title="Failed to extract some lineage",
message=f"Failed to extract lineage of type {lineage_type.name}",
context=f"Query: '{query}'",
exc=e,
)
self._lineage_v1.report_status(f"extract-{lineage_type.name}", False)

Expand Down Expand Up @@ -417,3 +418,9 @@ def _process_external_tables(
def generate(self) -> Iterable[MetadataWorkUnit]:
for mcp in self.aggregator.gen_metadata():
yield mcp.as_workunit()
if len(self.aggregator.report.observed_query_parse_failures) > 0:
self.report.report_failure(
title="Failed to extract some SQL lineage",
message="Unexpected error(s) while attempting to extract lineage from SQL queries. See the full logs for more details.",
context=f"Query Parsing Failures: {self.aggregator.report.observed_query_parse_failures}",
)
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,26 @@ def get_workunits(
if not self.config.schema_pattern.allowed(schema):
continue
for table in tables[db].get(schema, {}):
if (
not self.config.profiling.profile_external_tables
and table.type == "EXTERNAL_TABLE"
):
self.report.profiling_skipped_other[schema] += 1
logger.info(
f"Skipping profiling of external table {db}.{schema}.{table.name}"
)
continue
if table.type == "EXTERNAL_TABLE":
if not self.config.profiling.profile_external_tables:
# Case 1: If user did not tell us to profile external tables, simply log this.
self.report.profiling_skipped_other[schema] += 1
logger.info(
f"Skipping profiling of external table {db}.{schema}.{table.name}"
)
# Continue, since we should not profile this table.
continue
elif self.config.profiling.profile_table_level_only:
# Case 2: User DID tell us to profile external tables, but only at the table level.
# Currently, we do not support this combination. The user needs to also set
# profile_table_level_only to False in order to profile.
self.report.report_warning(
title="Skipped profiling for external tables",
message="External tables are not supported for profiling when 'profile_table_level_only' config is set to 'True'. Please set 'profile_table_level_only' to 'False' in order to profile external Redshift tables.",
context=f"External Table: {db}.{schema}.{table.name}",
)
# Continue, since we were unable to retrieve cheap profiling stats from svv_table_info.
continue
# Emit the profile work unit
profile_request = self.get_profile_request(table, schema, db)
if profile_request is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def list_tables(
else:
return f"{tables_query} UNION {external_tables_query}"

# Why is this unused. Is this a bug?
list_columns: str = """
SELECT
n.nspname as "schema",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
DatasetSubTypes,
)
from datahub.ingestion.source.redshift.config import RedshiftConfig
from datahub.ingestion.source.redshift.exception import handle_redshift_exceptions_yield
from datahub.ingestion.source.redshift.lineage import RedshiftLineageExtractor
from datahub.ingestion.source.redshift.lineage_v2 import RedshiftSqlLineageV2
from datahub.ingestion.source.redshift.profile import RedshiftProfiler
Expand Down Expand Up @@ -411,17 +412,33 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
]

def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
connection = RedshiftSource.get_redshift_connection(self.config)
connection = self._try_get_redshift_connection(self.config)

if connection is None:
# If we failed to establish a connection, short circuit the connector.
return

database = self.config.database
logger.info(f"Processing db {database}")
self.report.report_ingestion_stage_start(METADATA_EXTRACTION)
self.db_tables[database] = defaultdict()
self.db_views[database] = defaultdict()
self.db_schemas.setdefault(database, {})

# TODO: Ideally, we'd push down exception handling to the place where the connection is used, as opposed to keeping
# this fallback. For now, this gets us broad coverage quickly.
yield from handle_redshift_exceptions_yield(
self.report, self._extract_metadata, connection, database
)

def _extract_metadata(
self, connection: redshift_connector.Connection, database: str
) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:

yield from self.gen_database_container(
database=database,
)

self.cache_tables_and_views(connection, database)

self.report.tables_in_mem_size[database] = humanfriendly.format_size(
Expand Down Expand Up @@ -556,6 +573,7 @@ def process_schema(
):
for table in self.db_tables[schema.database][schema.name]:
table.columns = schema_columns[schema.name].get(table.name, [])
table.column_count = len(table.columns)
table_wu_generator = self._process_table(
table, database=database
)
Expand All @@ -575,8 +593,10 @@ def process_schema(
f"Table processed: {schema.database}.{schema.name}.{table.name}"
)
else:
logger.info(
f"No tables in cache for {schema.database}.{schema.name}, skipping"
self.report.info(
title="No tables found in some schemas",
message="No tables found in some schemas. This may be due to insufficient privileges for the provided user.",
context=f"Schema: {schema.database}.{schema.name}",
)
else:
logger.info("Table processing disabled, skipping")
Expand All @@ -589,6 +609,7 @@ def process_schema(
):
for view in self.db_views[schema.database][schema.name]:
view.columns = schema_columns[schema.name].get(view.name, [])
view.column_count = len(view.columns)
yield from self._process_view(
table=view, database=database, schema=schema
)
Expand All @@ -603,8 +624,10 @@ def process_schema(
f"Table processed: {schema.database}.{schema.name}.{view.name}"
)
else:
logger.info(
f"No views in cache for {schema.database}.{schema.name}, skipping"
self.report.info(
title="No views found in some schemas",
message="No views found in some schemas. This may be due to insufficient privileges for the provided user.",
context=f"Schema: {schema.database}.{schema.name}",
)
else:
logger.info("View processing disabled, skipping")
Expand Down Expand Up @@ -1088,3 +1111,43 @@ def add_config_to_report(self):
self.config.start_time,
self.config.end_time,
)

def _try_get_redshift_connection(
self,
config: RedshiftConfig,
) -> Optional[redshift_connector.Connection]:
try:
return RedshiftSource.get_redshift_connection(config)
except redshift_connector.Error as e:
error_message = str(e).lower()
if "password authentication failed" in error_message:
self.report.report_failure(
title="Invalid credentials",
message="Failed to connect to Redshift. Please verify your username, password, and database.",
exc=e,
)
elif "timeout" in error_message:
self.report.report_failure(
title="Unable to connect",
message="Failed to connect to Redshift. Please verify your host name and port number.",
exc=e,
)
elif "communication error" in error_message:
self.report.report_failure(
title="Unable to connect",
message="Failed to connect to Redshift. Please verify that the host name is valid and reachable.",
exc=e,
)
elif "database" in error_message and "does not exist" in error_message:
self.report.report_failure(
title="Database does not exist",
message="Failed to connect to Redshift. Please verify that the provided database exists and the provided user has access to it.",
exc=e,
)
else:
self.report.report_failure(
title="Unable to connect",
message="Failed to connect to Redshift. Please verify your connection details.",
exc=e,
)
return None
Loading

0 comments on commit 895221d

Please sign in to comment.