Skip to content

Commit

Permalink
Add test
Browse files Browse the repository at this point in the history
Signed-off-by: Jacob Klegar <[email protected]>
  • Loading branch information
jklegar committed Mar 23, 2021
1 parent 00a55e5 commit de8fdb9
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 16 deletions.
7 changes: 4 additions & 3 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,7 @@ def to_proto(self) -> DataSourceProto.BigQueryOptions:
"""

bigquery_options_proto = DataSourceProto.BigQueryOptions(
table_ref=self.table_ref,
query=self.query,
table_ref=self.table_ref, query=self.query,
)

return bigquery_options_proto
Expand Down Expand Up @@ -462,7 +461,9 @@ def from_proto(data_source):
created_timestamp_column=data_source.created_timestamp_column,
date_partition_column=data_source.date_partition_column,
)
elif (data_source.bigquery_options.table_ref or data_source.bigquery_options.query):
elif (
data_source.bigquery_options.table_ref or data_source.bigquery_options.query
):
data_source_obj = BigQuerySource(
field_mapping=data_source.field_mapping,
table_ref=data_source.bigquery_options.table_ref,
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def materialize(
) = _run_reverse_field_mapping(feature_view)

offline_store = get_offline_store(self.config)
table = offline_store.pull_latest_from_table(
table = offline_store.pull_latest_from_table_or_query(
feature_view.input,
entity_names,
feature_names,
Expand Down
12 changes: 6 additions & 6 deletions sdk/python/feast/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class OfflineStore(ABC):

@staticmethod
@abstractmethod
def pull_latest_from_table(
def pull_latest_from_table_or_query(
data_source: DataSource,
entity_names: List[str],
feature_names: List[str],
Expand Down Expand Up @@ -115,7 +115,7 @@ def get_historical_features(

class BigQueryOfflineStore(OfflineStore):
@staticmethod
def pull_latest_from_table(
def pull_latest_from_table_or_query(
data_source: DataSource,
entity_names: List[str],
feature_names: List[str],
Expand All @@ -126,9 +126,9 @@ def pull_latest_from_table(
) -> pyarrow.Table:
assert isinstance(data_source, BigQuerySource)
if data_source.table_ref:
from_table = f"`{data_source.table_ref}`"
from_expression = f"`{data_source.table_ref}`"
else:
from_table = f"({data_source.query})"
from_expression = f"({data_source.query})"

partition_by_entity_string = ", ".join(entity_names)
if partition_by_entity_string != "":
Expand All @@ -144,7 +144,7 @@ def pull_latest_from_table(
FROM (
SELECT {field_string},
ROW_NUMBER() OVER({partition_by_entity_string} ORDER BY {timestamp_desc_string}) AS _feast_row
FROM {from_table}
FROM {from_expression}
WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}')
)
WHERE _feast_row = 1
Expand Down Expand Up @@ -286,7 +286,7 @@ def build_point_in_time_query(

class FileOfflineStore(OfflineStore):
@staticmethod
def pull_latest_from_table(
def pull_latest_from_table_or_query(
data_source: DataSource,
entity_names: List[str],
feature_names: List[str],
Expand Down
70 changes: 64 additions & 6 deletions sdk/python/tests/test_bigquery_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def setup_method(self):
) # 2 weeks in milliseconds
self.client.update_dataset(dataset, ["default_table_expiration_ms"])

def test_bigquery_ingestion_correctness(self):
def test_bigquery_table_to_datastore_correctness(self):
# create dataset
ts = pd.Timestamp.now(tz="UTC").round("ms")
checked_value = (
Expand All @@ -45,15 +45,13 @@ def test_bigquery_ingestion_correctness(self):

# load dataset into BigQuery
job_config = bigquery.LoadJobConfig()
table_id = (
f"{self.gcp_project}.{self.bigquery_dataset}.correctness_{int(time.time())}"
)
table_id = f"{self.gcp_project}.{self.bigquery_dataset}.table_correctness_{int(time.time())}"
job = self.client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result()

# create FeatureView
fv = FeatureView(
name="test_bq_correctness",
name="test_bq_table_correctness",
entities=["driver_id"],
features=[Feature("value", ValueType.FLOAT)],
ttl=timedelta(minutes=5),
Expand All @@ -78,7 +76,67 @@ def test_bigquery_ingestion_correctness(self):

# run materialize()
fs.materialize(
["test_bq_correctness"],
[fv.name],
datetime.utcnow() - timedelta(minutes=5),
datetime.utcnow() - timedelta(minutes=0),
)

# check result of materialize()
entity_key = EntityKeyProto(
entity_names=["driver_id"], entity_values=[ValueProto(int64_val=1)]
)
t, val = fs._get_provider().online_read("default", fv, entity_key)
assert abs(val["value"].double_val - checked_value) < 1e-6

def test_bigquery_query_to_datastore_correctness(self):
# create dataset
ts = pd.Timestamp.now(tz="UTC").round("ms")
checked_value = (
random.random()
) # random value so test doesn't still work if no values written to online store
data = {
"id": [1, 2, 1],
"value": [0.1, 0.2, checked_value],
"ts_1": [ts - timedelta(minutes=2), ts, ts],
"created_ts": [ts, ts, ts],
}
df = pd.DataFrame.from_dict(data)

# load dataset into BigQuery
job_config = bigquery.LoadJobConfig()
table_id = f"{self.gcp_project}.{self.bigquery_dataset}.query_correctness_{int(time.time())}"
query = f"SELECT * FROM `{table_id}`"
job = self.client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result()

# create FeatureView
fv = FeatureView(
name="test_bq_query_correctness",
entities=["driver_id"],
features=[Feature("value", ValueType.FLOAT)],
ttl=timedelta(minutes=5),
input=BigQuerySource(
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping={"ts_1": "ts", "id": "driver_id"},
date_partition_column="",
query=query,
),
)
config = RepoConfig(
metadata_store="./metadata.db",
project="default",
provider="gcp",
online_store=OnlineStoreConfig(
local=LocalOnlineStoreConfig("online_store.db")
),
)
fs = FeatureStore(config=config)
fs.apply([fv])

# run materialize()
fs.materialize(
[fv.name],
datetime.utcnow() - timedelta(minutes=5),
datetime.utcnow() - timedelta(minutes=0),
)
Expand Down

0 comments on commit de8fdb9

Please sign in to comment.