From 31e40dae57cdf1ac60561f5e66b831ea0d84d2b1 Mon Sep 17 00:00:00 2001 From: Matt Delacour Date: Tue, 29 Jun 2021 11:29:16 -0400 Subject: [PATCH] Cancel BigQuery job if timeout hits (#1672) * Cancel BigQuery job if timedout hits Signed-off-by: Matt Delacour * Fix typo Signed-off-by: Matt Delacour --- sdk/python/feast/errors.py | 5 ++++ .../feast/infra/offline_stores/bigquery.py | 27 ++++++++++++++----- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 2004f303fd..5eb0ea8422 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -126,6 +126,11 @@ def __init__(self, repo_obj_type: str, specific_issue: str): ) +class BigQueryJobCancelled(Exception): + def __init__(self, job_id): + super().__init__(f"The BigQuery job with ID '{job_id}' was cancelled") + + class RedshiftCredentialsError(Exception): def __init__(self): super().__init__("Redshift API failed due to incorrect credentials") diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 3b73248ee3..bcfc5f08f4 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -13,7 +13,7 @@ from feast import errors from feast.data_source import BigQuerySource, DataSource -from feast.errors import FeastProviderLoginError +from feast.errors import BigQueryJobCancelled, FeastProviderLoginError from feast.feature_view import FeatureView from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob from feast.infra.provider import ( @@ -249,10 +249,6 @@ def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[st Returns the destination table name or returns None if job_config.dry_run is True. """ - @retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True) - def _block_until_done(): - return self.client.get_job(bq_job.job_id).state in ["PENDING", "RUNNING"] - if not job_config: today = date.today().strftime("%Y%m%d") rand_id = str(uuid.uuid4())[:7] @@ -261,7 +257,7 @@ def _block_until_done(): bq_job = self.client.query(self.query, job_config=job_config) - _block_until_done() + block_until_done(client=self.client, bq_job=bq_job) if bq_job.exception(): raise bq_job.exception() @@ -279,6 +275,25 @@ def to_arrow(self) -> pyarrow.Table: return self.client.query(self.query).to_arrow() +def block_until_done(client, bq_job): + def _is_done(job_id): + return client.get_job(job_id).state in ["PENDING", "RUNNING"] + + @retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True) + def _wait_until_done(job_id): + return _is_done(job_id) + + job_id = bq_job.job_id + _wait_until_done(job_id=job_id) + + if not _is_done(job_id): + client.cancel_job(job_id) + raise BigQueryJobCancelled(job_id=job_id) + + if bq_job.exception(): + raise bq_job.exception() + + @dataclass(frozen=True) class FeatureViewQueryContext: """Context object used to template a BigQuery point-in-time SQL query"""