Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pulling query from BigQuery #1403

Merged
merged 5 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def to_proto(self) -> DataSourceProto.BigQueryOptions:
"""

bigquery_options_proto = DataSourceProto.BigQueryOptions(
table_ref=self.table_ref,
table_ref=self.table_ref, query=self.query,
)

return bigquery_options_proto
Expand Down Expand Up @@ -461,13 +461,16 @@ def from_proto(data_source):
created_timestamp_column=data_source.created_timestamp_column,
date_partition_column=data_source.date_partition_column,
)
elif data_source.bigquery_options.table_ref:
elif (
data_source.bigquery_options.table_ref or data_source.bigquery_options.query
):
data_source_obj = BigQuerySource(
field_mapping=data_source.field_mapping,
table_ref=data_source.bigquery_options.table_ref,
event_timestamp_column=data_source.event_timestamp_column,
created_timestamp_column=data_source.created_timestamp_column,
date_partition_column=data_source.date_partition_column,
query=data_source.bigquery_options.query,
)
elif (
data_source.kafka_options.bootstrap_servers
Expand Down
6 changes: 1 addition & 5 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,6 @@ def materialize(
raise NotImplementedError(
"This function is not yet implemented for File data sources"
)
if not feature_view.input.table_ref:
raise NotImplementedError(
f"This function is only implemented for FeatureViews with a table_ref; {feature_view.name} does not have one."
)
(
entity_names,
feature_names,
Expand All @@ -229,7 +225,7 @@ def materialize(
) = _run_reverse_field_mapping(feature_view)

offline_store = get_offline_store(self.config)
table = offline_store.pull_latest_from_table(
table = offline_store.pull_latest_from_table_or_query(
feature_view.input,
entity_names,
feature_names,
Expand Down
17 changes: 8 additions & 9 deletions sdk/python/feast/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class OfflineStore(ABC):

@staticmethod
@abstractmethod
def pull_latest_from_table(
def pull_latest_from_table_or_query(
data_source: DataSource,
entity_names: List[str],
feature_names: List[str],
Expand Down Expand Up @@ -115,7 +115,7 @@ def get_historical_features(

class BigQueryOfflineStore(OfflineStore):
@staticmethod
def pull_latest_from_table(
def pull_latest_from_table_or_query(
data_source: DataSource,
entity_names: List[str],
feature_names: List[str],
Expand All @@ -125,11 +125,10 @@ def pull_latest_from_table(
end_date: datetime,
) -> pyarrow.Table:
assert isinstance(data_source, BigQuerySource)
table_ref = data_source.table_ref
if table_ref is None:
raise ValueError(
"This function can only be called on a FeatureView with a table_ref"
)
if data_source.table_ref:
from_expression = f"`{data_source.table_ref}`"
else:
from_expression = f"({data_source.query})"

partition_by_entity_string = ", ".join(entity_names)
if partition_by_entity_string != "":
Expand All @@ -145,7 +144,7 @@ def pull_latest_from_table(
FROM (
SELECT {field_string},
ROW_NUMBER() OVER({partition_by_entity_string} ORDER BY {timestamp_desc_string}) AS _feast_row
FROM `{table_ref}`
FROM {from_expression}
WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}')
)
WHERE _feast_row = 1
Expand Down Expand Up @@ -287,7 +286,7 @@ def build_point_in_time_query(

class FileOfflineStore(OfflineStore):
@staticmethod
def pull_latest_from_table(
def pull_latest_from_table_or_query(
data_source: DataSource,
entity_names: List[str],
feature_names: List[str],
Expand Down
93 changes: 0 additions & 93 deletions sdk/python/tests/test_bigquery_ingestion.py

This file was deleted.

132 changes: 132 additions & 0 deletions sdk/python/tests/test_materialize_from_bigquery_to_datastore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import time
from datetime import datetime, timedelta

import pandas as pd
import pytest
from google.cloud import bigquery

from feast.data_source import BigQuerySource
from feast.feature import Feature
from feast.feature_store import FeatureStore
from feast.feature_view import FeatureView
from feast.repo_config import RepoConfig
from feast.value_type import ValueType


@pytest.mark.integration
class TestMaterializeFromBigQueryToDatastore:
def setup_method(self):
self.client = bigquery.Client()
self.gcp_project = self.client.project
self.bigquery_dataset = "test_ingestion"
dataset = bigquery.Dataset(f"{self.gcp_project}.{self.bigquery_dataset}")
self.client.create_dataset(dataset, exists_ok=True)
dataset.default_table_expiration_ms = (
1000 * 60 * 60 * 24 * 14
) # 2 weeks in milliseconds
self.client.update_dataset(dataset, ["default_table_expiration_ms"])

def test_bigquery_table_to_datastore_correctness(self):
# create dataset
ts = pd.Timestamp.now(tz="UTC").round("ms")
data = {
"id": [1, 2, 1],
"value": [0.1, 0.2, 0.3],
"ts_1": [ts - timedelta(minutes=2), ts, ts],
"created_ts": [ts, ts, ts],
}
df = pd.DataFrame.from_dict(data)

# load dataset into BigQuery
job_config = bigquery.LoadJobConfig()
table_id = f"{self.gcp_project}.{self.bigquery_dataset}.table_correctness_{int(time.time())}"
job = self.client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result()

# create FeatureView
fv = FeatureView(
name="test_bq_table_correctness",
entities=["driver_id"],
features=[Feature("value", ValueType.FLOAT)],
ttl=timedelta(minutes=5),
input=BigQuerySource(
event_timestamp_column="ts",
table_ref=table_id,
created_timestamp_column="created_ts",
field_mapping={"ts_1": "ts", "id": "driver_id"},
date_partition_column="",
),
)
config = RepoConfig(
metadata_store="./metadata.db",
project=f"test_bq_table_correctness_{int(time.time())}",
provider="gcp",
)
fs = FeatureStore(config=config)
fs.apply([fv])

# run materialize()
fs.materialize(
[fv.name],
datetime.utcnow() - timedelta(minutes=5),
datetime.utcnow() - timedelta(minutes=0),
)

# check result of materialize()
response_dict = fs.get_online_features(
[f"{fv.name}:value"], [{"driver_id": 1}]
).to_dict()
assert abs(response_dict[f"{fv.name}:value"][0] - 0.3) < 1e-6

def test_bigquery_query_to_datastore_correctness(self):
# create dataset
ts = pd.Timestamp.now(tz="UTC").round("ms")
data = {
"id": [1, 2, 1],
woop marked this conversation as resolved.
Show resolved Hide resolved
"value": [0.1, 0.2, 0.3],
"ts_1": [ts - timedelta(minutes=2), ts, ts],
"created_ts": [ts, ts, ts],
}
df = pd.DataFrame.from_dict(data)

# load dataset into BigQuery
job_config = bigquery.LoadJobConfig()
table_id = f"{self.gcp_project}.{self.bigquery_dataset}.query_correctness_{int(time.time())}"
query = f"SELECT * FROM `{table_id}`"
job = self.client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result()

# create FeatureView
fv = FeatureView(
name="test_bq_query_correctness",
entities=["driver_id"],
features=[Feature("value", ValueType.FLOAT)],
woop marked this conversation as resolved.
Show resolved Hide resolved
ttl=timedelta(minutes=5),
input=BigQuerySource(
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping={"ts_1": "ts", "id": "driver_id"},
date_partition_column="",
query=query,
),
)
config = RepoConfig(
metadata_store="./metadata.db",
project=f"test_bq_query_correctness_{int(time.time())}",
provider="gcp",
)
fs = FeatureStore(config=config)
fs.apply([fv])

# run materialize()
fs.materialize(
[fv.name],
datetime.utcnow() - timedelta(minutes=5),
datetime.utcnow() - timedelta(minutes=0),
)

# check result of materialize()
response_dict = fs.get_online_features(
[f"{fv.name}:value"], [{"driver_id": 1}]
).to_dict()
assert abs(response_dict[f"{fv.name}:value"][0] - 0.3) < 1e-6