Skip to content

Commit

Permalink
Add bigquery table create disposition to offline store
Browse files Browse the repository at this point in the history
  • Loading branch information
nickozilla committed Sep 12, 2023
1 parent a8aeee9 commit 84237af
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pandas as pd
import pyarrow
import pyarrow.parquet
from pydantic import StrictStr, validator
from pydantic import StrictStr, validator, ConstrainedStr
from pydantic.typing import Literal
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed

Expand Down Expand Up @@ -71,6 +71,11 @@
def get_http_client_info():
return http_client_info.ClientInfo(user_agent=get_user_agent())

class BigQueryTableCreateDisposition(ConstrainedStr):
"""Custom constraint for table_create_disposition. To understand more, see:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.create_disposition"""
values = {"CREATE_NEVER", "CREATE_IF_NEEDED"}


class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
"""Offline store config for GCP BigQuery"""
Expand All @@ -95,6 +100,9 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
gcs_staging_location: Optional[str] = None
""" (optional) GCS location used for offloading BigQuery results as parquet files."""

table_create_disposition: Optional[BigQueryTableCreateDisposition] = None
""" (optional) Specifies whether the job is allowed to create new tables. The default value is CREATE_IF_NEEDED."""

@validator("billing_project_id")
def project_id_exists(cls, v, values, **kwargs):
if v and not values["project_id"]:
Expand Down Expand Up @@ -324,6 +332,7 @@ def write_logged_features(
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
schema=arrow_schema_to_bq_schema(source.get_schema(registry)),
create_disposition=config.offline_store.table_create_disposition,
time_partitioning=bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field=source.get_log_timestamp_column(),
Expand Down Expand Up @@ -384,6 +393,7 @@ def offline_write_batch(
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
schema=arrow_schema_to_bq_schema(pa_schema),
create_disposition=config.offline_store.table_create_disposition,
write_disposition="WRITE_APPEND", # Default but included for clarity
)

Expand Down

0 comments on commit 84237af

Please sign in to comment.