Skip to content

Commit

Permalink
Possibility to specify a project for BigQuery queries (feast-dev#1656)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Delacour <[email protected]>

Co-authored-by: Achal Shah <[email protected]>
Signed-off-by: Mwad22 <[email protected]>
  • Loading branch information
2 people authored and Mwad22 committed Jul 7, 2021
1 parent c2e2b4d commit d2cda24
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
22 changes: 16 additions & 6 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,14 @@ def get_historical_features(
expected_join_keys = _get_join_keys(project, feature_views, registry)

assert isinstance(config.offline_store, BigQueryOfflineStoreConfig)
dataset_project = config.offline_store.project_id or client.project

table = _upload_entity_df_into_bigquery(
client, config.project, config.offline_store.dataset, entity_df
client=client,
project=config.project,
dataset_name=config.offline_store.dataset,
dataset_project=dataset_project,
entity_df=entity_df,
)

entity_df_event_timestamp_col = _infer_event_timestamp_from_bigquery_query(
Expand Down Expand Up @@ -227,7 +233,8 @@ def _block_until_done():

today = date.today().strftime("%Y%m%d")
rand_id = str(uuid.uuid4())[:7]
path = f"{self.client.project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}"
dataset_project = self.config.offline_store.project_id or self.client.project
path = f"{dataset_project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}"
job_config = bigquery.QueryJobConfig(destination=path, dry_run=dry_run)
bq_job = self.client.query(self.query, job_config=job_config)

Expand Down Expand Up @@ -263,12 +270,12 @@ class FeatureViewQueryContext:


def _get_table_id_for_new_entity(
client: Client, project: str, dataset_name: str
client: Client, project: str, dataset_name: str, dataset_project: str
) -> str:
"""Gets the table_id for the new entity to be uploaded."""

# First create the BigQuery dataset if it doesn't exist
dataset = bigquery.Dataset(f"{client.project}.{dataset_name}")
dataset = bigquery.Dataset(f"{dataset_project}.{dataset_name}")
dataset.location = "US"

try:
Expand All @@ -277,18 +284,21 @@ def _get_table_id_for_new_entity(
# Only create the dataset if it does not exist
client.create_dataset(dataset, exists_ok=True)

return f"{client.project}.{dataset_name}.entity_df_{project}_{int(time.time())}"
return f"{dataset_project}.{dataset_name}.entity_df_{project}_{int(time.time())}"


def _upload_entity_df_into_bigquery(
client: Client,
project: str,
dataset_name: str,
dataset_project: str,
entity_df: Union[pandas.DataFrame, str],
) -> Table:
"""Uploads a Pandas entity dataframe into a BigQuery table and returns the resulting table"""

table_id = _get_table_id_for_new_entity(client, project, dataset_name)
table_id = _get_table_id_for_new_entity(
client, project, dataset_name, dataset_project
)

if type(entity_df) is str:
job = client.query(f"CREATE TABLE {table_id} AS ({entity_df})")
Expand Down
5 changes: 4 additions & 1 deletion sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ class BigQueryOfflineStoreConfig(FeastBaseModel):
""" Offline store type selector"""

dataset: StrictStr = "feast"
""" (optional) BigQuery Dataset name for temporary tables """
""" (optional) BigQuery dataset name used for the BigQuery offline store """

project_id: Optional[StrictStr] = None
""" (optional) GCP project name used for the BigQuery offline store """


OfflineStoreConfig = Union[FileOfflineStoreConfig, BigQueryOfflineStoreConfig]
Expand Down

0 comments on commit d2cda24

Please sign in to comment.