Skip to content

Commit

Permalink
Cancel BigQuery job if timeout hits (feast-dev#1672)
Browse files Browse the repository at this point in the history
* Cancel BigQuery job if timedout hits

Signed-off-by: Matt Delacour <[email protected]>

* Fix typo

Signed-off-by: Matt Delacour <[email protected]>
Signed-off-by: Mwad22 <[email protected]>
  • Loading branch information
MattDelac authored and Mwad22 committed Jul 7, 2021
1 parent d0fe0a9 commit 6e8670e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 6 deletions.
5 changes: 5 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ def __init__(self, repo_obj_type: str, specific_issue: str):
)


class BigQueryJobCancelled(Exception):
def __init__(self, job_id):
super().__init__(f"The BigQuery job with ID '{job_id}' was cancelled")


class RedshiftCredentialsError(Exception):
def __init__(self):
super().__init__("Redshift API failed due to incorrect credentials")
Expand Down
27 changes: 21 additions & 6 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from feast import errors
from feast.data_source import BigQuerySource, DataSource
from feast.errors import FeastProviderLoginError
from feast.errors import BigQueryJobCancelled, FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.provider import (
Expand Down Expand Up @@ -251,10 +251,6 @@ def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[st
Returns the destination table name or returns None if job_config.dry_run is True.
"""

@retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True)
def _block_until_done():
return self.client.get_job(bq_job.job_id).state in ["PENDING", "RUNNING"]

if not job_config:
today = date.today().strftime("%Y%m%d")
rand_id = str(uuid.uuid4())[:7]
Expand All @@ -263,7 +259,7 @@ def _block_until_done():

bq_job = self.client.query(self.query, job_config=job_config)

_block_until_done()
block_until_done(client=self.client, bq_job=bq_job)

if bq_job.exception():
raise bq_job.exception()
Expand All @@ -281,6 +277,25 @@ def to_arrow(self) -> pyarrow.Table:
return self.client.query(self.query).to_arrow()


def block_until_done(client, bq_job):
def _is_done(job_id):
return client.get_job(job_id).state in ["PENDING", "RUNNING"]

@retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True)
def _wait_until_done(job_id):
return _is_done(job_id)

job_id = bq_job.job_id
_wait_until_done(job_id=job_id)

if not _is_done(job_id):
client.cancel_job(job_id)
raise BigQueryJobCancelled(job_id=job_id)

if bq_job.exception():
raise bq_job.exception()


@dataclass(frozen=True)
class FeatureViewQueryContext:
"""Context object used to template a BigQuery point-in-time SQL query"""
Expand Down

0 comments on commit 6e8670e

Please sign in to comment.