Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel BigQuery job if timeout hits #1672

Merged
merged 2 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ def __init__(self, repo_obj_type: str, specific_issue: str):
)


class BigQueryJobCancelled(Exception):
def __init__(self, job_id):
super().__init__(f"The BigQuery job with ID '{job_id}' was cancelled")


class RedshiftCredentialsError(Exception):
def __init__(self):
super().__init__("Redshift API failed due to incorrect credentials")
Expand Down
27 changes: 21 additions & 6 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from feast import errors
from feast.data_source import BigQuerySource, DataSource
from feast.errors import FeastProviderLoginError
from feast.errors import BigQueryJobCancelled, FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.provider import (
Expand Down Expand Up @@ -249,10 +249,6 @@ def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[st
Returns the destination table name or returns None if job_config.dry_run is True.
"""

@retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True)
def _block_until_done():
return self.client.get_job(bq_job.job_id).state in ["PENDING", "RUNNING"]

if not job_config:
today = date.today().strftime("%Y%m%d")
rand_id = str(uuid.uuid4())[:7]
Expand All @@ -261,7 +257,7 @@ def _block_until_done():

bq_job = self.client.query(self.query, job_config=job_config)

_block_until_done()
block_until_done(client=self.client, bq_job=bq_job)

if bq_job.exception():
raise bq_job.exception()
Expand All @@ -279,6 +275,25 @@ def to_table(self) -> pyarrow.Table:
return self.client.query(self.query).to_arrow()


def block_until_done(client, bq_job):
def _is_done(job_id):
return client.get_job(job_id).state in ["PENDING", "RUNNING"]

@retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True)
def _wait_until_done(job_id):
return _is_done(job_id)

job_id = bq_job.job_id
_wait_until_done(job_id=job_id)

if not _is_done(job_id):
client.cancel_job(job_id)
raise BigQueryJobCancelled(job_id=job_id)

if bq_job.exception():
raise bq_job.exception()


@dataclass(frozen=True)
class FeatureViewQueryContext:
"""Context object used to template a BigQuery point-in-time SQL query"""
Expand Down