diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 8a593b23d1f9c..20a43a94f6bda 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -454,7 +454,7 @@ }, # FIXME: I don't think tableau uses sqllineage anymore so we should be able # to remove that dependency. - "tableau": {"tableauserverclient>=0.17.0"} | sqllineage_lib | sqlglot_lib, + "tableau": {"tableauserverclient>=0.24.0"} | sqllineage_lib | sqlglot_lib, "teradata": sql_common | usage_common | sqlglot_lib diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 788bec97a6488..a4de8b382430c 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -137,7 +137,10 @@ def report_log( ) # Add the simple exception details to the context. - context = f"{context}: {exc}" + if context: + context = f"{context} {type(exc)}: {exc}" + else: + context = f"{type(exc)}: {exc}" elif log: logger.log(level=level.value, msg=log_content, stacklevel=stacklevel) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py index ee1ccdff781dc..7ba06fe24155d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py @@ -298,7 +298,7 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]: if copy_previous_state_and_fail: logger.info( - f"Copying urns from last state (size {last_checkpoint_state.urns}) to current state (size {cur_checkpoint_state.urns}) " + f"Copying urns from last state (size {len(last_checkpoint_state.urns)}) to current state (size {len(cur_checkpoint_state.urns)}) " "to ensure stale entities from previous runs are deleted on the next successful run." ) for urn in last_checkpoint_state.urns: diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index 1655724f2d402..9cde3b1f8d3a0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -1,5 +1,6 @@ import logging import re +import time from collections import OrderedDict from dataclasses import dataclass from datetime import datetime @@ -13,6 +14,7 @@ Optional, Set, Tuple, + Type, Union, cast, ) @@ -158,6 +160,21 @@ from datahub.utilities import config_clean from datahub.utilities.urns.dataset_urn import DatasetUrn +try: + # On earlier versions of the tableauserverclient, the NonXMLResponseError + # was thrown when reauthentication was needed. We'll keep both exceptions + # around for now, but can remove this in the future. + from tableauserverclient.server.endpoint.exceptions import ( # type: ignore + NotSignedInError, + ) + + REAUTHENTICATE_ERRORS: Tuple[Type[Exception], ...] = ( + NotSignedInError, + NonXMLResponseError, + ) +except ImportError: + REAUTHENTICATE_ERRORS = (NonXMLResponseError,) + logger: logging.Logger = logging.getLogger(__name__) # Replace / with | @@ -965,7 +982,7 @@ def get_connection_object_page( query_data = query_metadata( self.server, query, connection_type, count, offset, query_filter ) - except NonXMLResponseError: + except REAUTHENTICATE_ERRORS: if not retry_on_auth_error: raise @@ -1038,6 +1055,35 @@ def get_connection_object_page( ) else: + # As of Tableau Server 2024.2, the metadata API sporadically returns a 30 second + # timeout error. It doesn't reliably happen, so retrying a couple times makes sense. + if all( + error.get("message") + == "Execution canceled because timeout of 30000 millis was reached" + for error in errors + ): + # If it was only a timeout error, we can retry. + if retries_remaining <= 0: + raise + + # This is a pretty dumb backoff mechanism, but it's good enough for now. + backoff_time = min( + (self.config.max_retries - retries_remaining + 1) ** 2, 60 + ) + logger.info( + f"Query {connection_type} received a 30 second timeout error - will retry in {backoff_time} seconds. " + f"Retries remaining: {retries_remaining}" + ) + time.sleep(backoff_time) + return self.get_connection_object_page( + query, + connection_type, + query_filter, + count, + offset, + retry_on_auth_error=False, + retries_remaining=retries_remaining - 1, + ) raise RuntimeError(f"Query {connection_type} error: {errors}") connection_object = query_data.get(c.DATA, {}).get(connection_type, {})