Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel BigQuery job if block_until_done call times out or is interrupted #1699

Merged
5 changes: 5 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ def __init__(self, repo_obj_type: str, specific_issue: str):
)


class BigQueryJobStillRunning(Exception):
def __init__(self, job_id):
super().__init__(f"The BigQuery job with ID '{job_id}' is still running.")


class BigQueryJobCancelled(Exception):
def __init__(self, job_id):
super().__init__(f"The BigQuery job with ID '{job_id}' was cancelled")
Expand Down
73 changes: 53 additions & 20 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@
from pandas import Timestamp
from pydantic import StrictStr
from pydantic.typing import Literal
from tenacity import retry, stop_after_delay, wait_fixed
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed

from feast import errors
from feast.data_source import BigQuerySource, DataSource
from feast.errors import BigQueryJobCancelled, FeastProviderLoginError
from feast.errors import (
BigQueryJobCancelled,
BigQueryJobStillRunning,
FeastProviderLoginError,
)
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.provider import (
Expand Down Expand Up @@ -249,12 +253,20 @@ def to_sql(self) -> str:
"""
return self.query

def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[str]:
def to_bigquery(
self,
job_config: bigquery.QueryJobConfig = None,
timeout: int = 1800,
retry_cadence: int = 10,
) -> Optional[str]:
"""
Triggers the execution of a historical feature retrieval query and exports the results to a BigQuery table.
Runs for a maximum amount of time specified by the timeout parameter (defaulting to 30 minutes).

Args:
job_config: An optional bigquery.QueryJobConfig to specify options like destination table, dry run, etc.
timeout: An optional number of seconds for setting the time limit of the QueryJob.
retry_cadence: An optional number of seconds for setting how long the job should checked for completion.

Returns:
Returns the destination table name or returns None if job_config.dry_run is True.
Expand All @@ -274,10 +286,7 @@ def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[st
)
return None

block_until_done(client=self.client, bq_job=bq_job)

if bq_job.exception():
raise bq_job.exception()
block_until_done(client=self.client, bq_job=bq_job, timeout=timeout)

print(f"Done writing to '{job_config.destination}'.")
return str(job_config.destination)
Expand All @@ -286,23 +295,47 @@ def to_arrow(self) -> pyarrow.Table:
return self.client.query(self.query).to_arrow()


def block_until_done(client, bq_job):
def _is_done(job_id):
return client.get_job(job_id).state in ["PENDING", "RUNNING"]
def block_until_done(
client: Client,
bq_job: Union[bigquery.job.query.QueryJob, bigquery.job.load.LoadJob],
timeout: int = 1800,
retry_cadence: int = 10,
):
"""
Waits for bq_job to finish running, up to a maximum amount of time specified by the timeout parameter (defaulting to 30 minutes).

Args:
client: A bigquery.client.Client to monitor the bq_job.
codyjlin marked this conversation as resolved.
Show resolved Hide resolved
bq_job: The bigquery.job.QueryJob that blocks until done runnning.
timeout: An optional number of seconds for setting the time limit of the job.
retry_cadence: An optional number of seconds for setting how long the job should checked for completion.

Raises:
BigQueryJobStillRunning exception if the function has blocked longer than 30 minutes.
BigQueryJobCancelled exception to signify when that the job has been cancelled (i.e. from timeout or KeyboardInterrupt).
"""

@retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True)
def _wait_until_done(job_id):
return _is_done(job_id)
if client.get_job(job_id).state in ["PENDING", "RUNNING"]:
raise BigQueryJobStillRunning(job_id=job_id)

job_id = bq_job.job_id
_wait_until_done(job_id=job_id)
try:
retryer = Retrying(
wait=wait_fixed(retry_cadence),
stop=stop_after_delay(timeout),
retry=retry_if_exception_type(BigQueryJobStillRunning),
reraise=True,
)
retryer(_wait_until_done, job_id)

if bq_job.exception():
raise bq_job.exception()
finally:
if client.get_job(job_id).state in ["PENDING", "RUNNING"]:
client.cancel_job(job_id)
raise BigQueryJobCancelled(job_id=job_id)

if not _is_done(job_id):
codyjlin marked this conversation as resolved.
Show resolved Hide resolved
client.cancel_job(job_id)
raise BigQueryJobCancelled(job_id=job_id)
if bq_job.exception():
raise bq_job.exception()


@dataclass(frozen=True)
Expand Down Expand Up @@ -354,7 +387,7 @@ def _upload_entity_df_into_bigquery(

if type(entity_df) is str:
job = client.query(f"CREATE TABLE {table_id} AS ({entity_df})")
job.result()
block_until_done(client, job)
elif isinstance(entity_df, pandas.DataFrame):
# Drop the index so that we dont have unnecessary columns
entity_df.reset_index(drop=True, inplace=True)
Expand All @@ -364,7 +397,7 @@ def _upload_entity_df_into_bigquery(
job = client.load_table_from_dataframe(
entity_df, table_id, job_config=job_config
)
job.result()
block_until_done(client, job)
else:
raise ValueError(
f"The entity dataframe you have provided must be a Pandas DataFrame or BigQuery SQL query, "
Expand Down