Skip to content

Commit

Permalink
Cancel BigQuery job if block_until_done call times out or is interrup…
Browse files Browse the repository at this point in the history
…ted (#1699)

* Cancel job if to_bigquery is cancelled by user

Signed-off-by: Cody Lin <[email protected]>

* cancel job in _upload_entity_df_into_bq as well

Signed-off-by: Cody Lin <[email protected]>

* Fix _is_done logic?

Signed-off-by: Cody Lin <[email protected]>

* make cancel job code more readable

Signed-off-by: Cody Lin <[email protected]>

* move KeyboardInterrupt catch outside retry logic; fix retry logic

Signed-off-by: Cody Lin <[email protected]>

* make block_until_done public; add custom exception for BQJobStillRunning

Signed-off-by: Cody Lin <[email protected]>

* fix retry logic to catch specific exception

Signed-off-by: Cody Lin <[email protected]>

* Make retry params configurable; use finally clause to catch more cancellation cases

Signed-off-by: Cody Lin <[email protected]>

* Modify docstring

Signed-off-by: Cody Lin <[email protected]>

* Typo in docstring

Signed-off-by: Cody Lin <[email protected]>

* Fix lint

Signed-off-by: Cody Lin <[email protected]>
  • Loading branch information
codyjlin authored Jul 14, 2021
1 parent 703c4be commit 88489d9
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 20 deletions.
5 changes: 5 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ def __init__(self, repo_obj_type: str, specific_issue: str):
)


class BigQueryJobStillRunning(Exception):
def __init__(self, job_id):
super().__init__(f"The BigQuery job with ID '{job_id}' is still running.")


class BigQueryJobCancelled(Exception):
def __init__(self, job_id):
super().__init__(f"The BigQuery job with ID '{job_id}' was cancelled")
Expand Down
73 changes: 53 additions & 20 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@
from pandas import Timestamp
from pydantic import StrictStr
from pydantic.typing import Literal
from tenacity import retry, stop_after_delay, wait_fixed
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed

from feast import errors
from feast.data_source import BigQuerySource, DataSource
from feast.errors import BigQueryJobCancelled, FeastProviderLoginError
from feast.errors import (
BigQueryJobCancelled,
BigQueryJobStillRunning,
FeastProviderLoginError,
)
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.provider import (
Expand Down Expand Up @@ -249,12 +253,20 @@ def to_sql(self) -> str:
"""
return self.query

def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[str]:
def to_bigquery(
self,
job_config: bigquery.QueryJobConfig = None,
timeout: int = 1800,
retry_cadence: int = 10,
) -> Optional[str]:
"""
Triggers the execution of a historical feature retrieval query and exports the results to a BigQuery table.
Runs for a maximum amount of time specified by the timeout parameter (defaulting to 30 minutes).
Args:
job_config: An optional bigquery.QueryJobConfig to specify options like destination table, dry run, etc.
timeout: An optional number of seconds for setting the time limit of the QueryJob.
retry_cadence: An optional number of seconds for setting how long the job should checked for completion.
Returns:
Returns the destination table name or returns None if job_config.dry_run is True.
Expand All @@ -274,10 +286,7 @@ def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[st
)
return None

block_until_done(client=self.client, bq_job=bq_job)

if bq_job.exception():
raise bq_job.exception()
block_until_done(client=self.client, bq_job=bq_job, timeout=timeout)

print(f"Done writing to '{job_config.destination}'.")
return str(job_config.destination)
Expand All @@ -286,23 +295,47 @@ def to_arrow(self) -> pyarrow.Table:
return self.client.query(self.query).to_arrow()


def block_until_done(client, bq_job):
def _is_done(job_id):
return client.get_job(job_id).state in ["PENDING", "RUNNING"]
def block_until_done(
client: Client,
bq_job: Union[bigquery.job.query.QueryJob, bigquery.job.load.LoadJob],
timeout: int = 1800,
retry_cadence: int = 10,
):
"""
Waits for bq_job to finish running, up to a maximum amount of time specified by the timeout parameter (defaulting to 30 minutes).
Args:
client: A bigquery.client.Client to monitor the bq_job.
bq_job: The bigquery.job.QueryJob that blocks until done runnning.
timeout: An optional number of seconds for setting the time limit of the job.
retry_cadence: An optional number of seconds for setting how long the job should checked for completion.
Raises:
BigQueryJobStillRunning exception if the function has blocked longer than 30 minutes.
BigQueryJobCancelled exception to signify when that the job has been cancelled (i.e. from timeout or KeyboardInterrupt).
"""

@retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True)
def _wait_until_done(job_id):
return _is_done(job_id)
if client.get_job(job_id).state in ["PENDING", "RUNNING"]:
raise BigQueryJobStillRunning(job_id=job_id)

job_id = bq_job.job_id
_wait_until_done(job_id=job_id)
try:
retryer = Retrying(
wait=wait_fixed(retry_cadence),
stop=stop_after_delay(timeout),
retry=retry_if_exception_type(BigQueryJobStillRunning),
reraise=True,
)
retryer(_wait_until_done, job_id)

if bq_job.exception():
raise bq_job.exception()
finally:
if client.get_job(job_id).state in ["PENDING", "RUNNING"]:
client.cancel_job(job_id)
raise BigQueryJobCancelled(job_id=job_id)

if not _is_done(job_id):
client.cancel_job(job_id)
raise BigQueryJobCancelled(job_id=job_id)
if bq_job.exception():
raise bq_job.exception()


@dataclass(frozen=True)
Expand Down Expand Up @@ -354,7 +387,7 @@ def _upload_entity_df_into_bigquery(

if type(entity_df) is str:
job = client.query(f"CREATE TABLE {table_id} AS ({entity_df})")
job.result()
block_until_done(client, job)
elif isinstance(entity_df, pandas.DataFrame):
# Drop the index so that we dont have unnecessary columns
entity_df.reset_index(drop=True, inplace=True)
Expand All @@ -364,7 +397,7 @@ def _upload_entity_df_into_bigquery(
job = client.load_table_from_dataframe(
entity_df, table_id, job_config=job_config
)
job.result()
block_until_done(client, job)
else:
raise ValueError(
f"The entity dataframe you have provided must be a Pandas DataFrame or BigQuery SQL query, "
Expand Down

0 comments on commit 88489d9

Please sign in to comment.