Skip to content

Commit

Permalink
New SQL template for BQ historical retrieval
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Delacour <[email protected]>
  • Loading branch information
Matt Delacour committed Jun 8, 2021
1 parent 3129dc8 commit 5430c46
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 94 deletions.
196 changes: 109 additions & 87 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,10 @@ def _get_bigquery_client():
# * Create temporary tables instead of keeping all tables in memory

SINGLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """
/*
Compute a deterministic hash for the `left_table_query_string` that will be used throughout
all the logic as the field to GROUP BY the data
*/
WITH entity_dataframe AS (
SELECT
*,
Expand All @@ -405,110 +409,128 @@ def _get_bigquery_client():
) AS entity_row_unique_id
FROM {{ left_table_query_string }}
),
{% for featureview in featureviews %}
/*
This query template performs the point-in-time correctness join for a single feature set table
to the provided entity table.
1. Concatenate the timestamp and entities from the feature set table with the entity dataset.
Feature values are joined to this table later for improved efficiency.
featureview_timestamp is equal to null in rows from the entity dataset.
*/
{{ featureview.name }}__union_features AS (
SELECT
-- unique identifier for each row in the entity dataset.
entity_row_unique_id,
-- event_timestamp contains the timestamps to join onto
{{entity_df_event_timestamp_col}} AS event_timestamp,
-- the feature_timestamp, i.e. the latest occurrence of the requested feature relative to the entity_dataset timestamp
NULL as {{ featureview.name }}_feature_timestamp,
-- created timestamp of the feature at the corresponding feature_timestamp
{{ 'NULL as created_timestamp,' if featureview.created_timestamp_column else '' }}
-- select only entities belonging to this feature set
{{ featureview.entities | join(', ')}},
-- boolean for filtering the dataset later
true AS is_entity_table
FROM entity_dataframe
UNION ALL
SELECT
NULL as entity_row_unique_id,
{{ featureview.event_timestamp_column }} as event_timestamp,
{{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp,
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}},
false AS is_entity_table
FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ max_timestamp }}'
{% if featureview.ttl == 0 %}{% else %}AND {{ featureview.event_timestamp_column }} >= Timestamp_sub(TIMESTAMP '{{ min_timestamp }}', interval {{ featureview.ttl }} second){% endif %}
1. We first join the current feature_view to the entity dataframe that has been passed.
This JOIN has the following logic:
- For each row of the entity dataframe, only keep the rows where the `event_timestamp_column`
is less than the one provided in the entity dataframe
- If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column`
is higher the the one provided minus the TTL
- For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been
computed previously
The output of this CTE will contain all the necessary information and already filtered out most
of the data that is not relevant.
*/
{{ featureview.name }}__subquery AS (
SELECT
{{ featureview.event_timestamp_column }} as event_timestamp,
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}},
{% for feature in featureview.features %}
{{ feature }} as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
),
{{ featureview.name }}__base AS (
SELECT
subquery.*,
entity_dataframe.{{entity_df_event_timestamp_col}} AS entity_timestamp,
entity_dataframe.entity_row_unique_id
FROM {{ featureview.name }}__subquery AS subquery
INNER JOIN entity_dataframe
ON TRUE
AND subquery.event_timestamp <= entity_dataframe.{{entity_df_event_timestamp_col}}
{% if featureview.ttl == 0 %}{% else %}
AND subquery.event_timestamp >= Timestamp_sub(entity_dataframe.{{entity_df_event_timestamp_col}}, interval {{ featureview.ttl }} second)
{% endif %}
{% for entity in featureview.entities %}
AND subquery.{{ entity }} = entity_dataframe.{{ entity }}
{% endfor %}
),
/*
2. Window the data in the unioned dataset, partitioning by entity and ordering by event_timestamp, as
well as is_entity_table.
Within each window, back-fill the feature_timestamp - as a result of this, the null feature_timestamps
in the rows from the entity table should now contain the latest timestamps relative to the row's
event_timestamp.
For rows where event_timestamp(provided datetime) - feature_timestamp > max age, set the
feature_timestamp to null.
*/
{{ featureview.name }}__joined AS (
SELECT
entity_row_unique_id,
event_timestamp,
{{ featureview.entities | join(', ')}},
{% for feature in featureview.features %}
IF(event_timestamp >= {{ featureview.name }}_feature_timestamp {% if featureview.ttl == 0 %}{% else %}AND Timestamp_sub(event_timestamp, interval {{ featureview.ttl }} second) < {{ featureview.name }}_feature_timestamp{% endif %}, {{ featureview.name }}__{{ feature }}, NULL) as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM (
SELECT
entity_row_unique_id,
event_timestamp,
{{ featureview.entities | join(', ')}},
{{ 'FIRST_VALUE(created_timestamp IGNORE NULLS) over w AS created_timestamp,' if featureview.created_timestamp_column else '' }}
FIRST_VALUE({{ featureview.name }}_feature_timestamp IGNORE NULLS) over w AS {{ featureview.name }}_feature_timestamp,
is_entity_table
FROM {{ featureview.name }}__union_features
WINDOW w AS (PARTITION BY {{ featureview.entities | join(', ') }} ORDER BY event_timestamp DESC, is_entity_table DESC{{', created_timestamp DESC' if featureview.created_timestamp_column else ''}} ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
)
2. If the `created_timestamp_column` has been set, we need to
deduplicate the data first. This is done by calculating the
`MAX(created_at_timestamp)` for each event_timestamp.
We then join the data on the next CTE
*/
{% if featureview.created_timestamp_column %}
{{ featureview.name }}__dedup AS (
SELECT
entity_row_unique_id,
event_timestamp,
MAX(created_timestamp) as created_timestamp,
FROM {{ featureview.name }}__base
GROUP BY entity_row_unique_id, event_timestamp
),
{% endif %}
/*
3. Select only the rows from the entity table, and join the features from the original feature set table
to the dataset using the entity values, feature_timestamp, and created_timestamps.
*/
LEFT JOIN (
SELECT
{{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp,
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}},
{% for feature in featureview.features %}
{{ feature }} as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ max_timestamp }}'
{% if featureview.ttl == 0 %}{% else %}AND {{ featureview.event_timestamp_column }} >= Timestamp_sub(TIMESTAMP '{{ min_timestamp }}', interval {{ featureview.ttl }} second){% endif %}
) USING ({{ featureview.name }}_feature_timestamp,{{ ' created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entities | join(', ')}})
WHERE is_entity_table
3. The data has been filtered during the first CTE "*__base"
Thus we only need to compute the latest timestamp of each feature.
*/
{{ featureview.name }}__latest AS (
SELECT
entity_row_unique_id,
MAX(event_timestamp) AS event_timestamp
{% if featureview.created_timestamp_column %}
,ANY_VALUE(created_timestamp) AS created_timestamp
{% endif %}
FROM {{ featureview.name }}__base
{% if featureview.created_timestamp_column %}
INNER JOIN {{ featureview.name }}__dedup
USING (entity_row_unique_id, event_timestamp, created_timestamp)
{% endif %}
GROUP BY entity_row_unique_id
),
/*
4. Finally, deduplicate the rows by selecting the first occurrence of each entity table entity_row_unique_id.
*/
{{ featureview.name }}__deduped AS (SELECT
k.*
FROM (
SELECT ARRAY_AGG(row LIMIT 1)[OFFSET(0)] k
FROM {{ featureview.name }}__joined row
GROUP BY entity_row_unique_id
)){% if loop.last %}{% else %}, {% endif %}
4. Once we know the latest value of each feature for a given timestamp,
we can join again the data back to the original "base" dataset
*/
{{ featureview.name }}__cleaned AS (
SELECT base.*
FROM {{ featureview.name }}__base as base
INNER JOIN {{ featureview.name }}__latest
USING(
entity_row_unique_id,
event_timestamp
{% if featureview.created_timestamp_column %}
,created_timestamp
{% endif %}
)
){% if loop.last %}{% else %}, {% endif %}
{% endfor %}
/*
Joins the outputs of multiple time travel joins to a single table.
The entity_dataframe dataset being our source of truth here.
*/
SELECT edf.{{entity_df_event_timestamp_col}} as {{entity_df_event_timestamp_col}}, * EXCEPT (entity_row_unique_id, {{entity_df_event_timestamp_col}}) FROM entity_dataframe edf
SELECT * EXCEPT (entity_row_unique_id)
FROM entity_dataframe
{% for featureview in featureviews %}
LEFT JOIN (
SELECT
entity_row_unique_id,
{% for feature in featureview.features %}
{{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.name }}__deduped
entity_row_unique_id,
{% for feature in featureview.features %}
{{ featureview.name }}__{{ feature }},
{% endfor %}
FROM {{ featureview.name }}__cleaned
) USING (entity_row_unique_id)
{% endfor %}
ORDER BY {{entity_df_event_timestamp_col}}
"""
17 changes: 10 additions & 7 deletions sdk/python/tests/test_historical_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ def test_historical_features_from_bigquery_sources(
start_date,
) = generate_entities(start_date, infer_event_timestamp_col)

# bigquery_dataset = "test_hist_retrieval_static"
bigquery_dataset = (
f"test_hist_retrieval_{int(time.time_ns())}_{random.randint(1000, 9999)}"
)
Expand Down Expand Up @@ -452,13 +451,16 @@ def test_historical_features_from_bigquery_sources(
)
)

assert sorted(expected_df.columns) == sorted(
actual_df_from_sql_entities.columns
)
assert_frame_equal(
expected_df.sort_values(
by=[event_timestamp, "order_id", "driver_id", "customer_id"]
).reset_index(drop=True),
actual_df_from_sql_entities.sort_values(
by=[event_timestamp, "order_id", "driver_id", "customer_id"]
).reset_index(drop=True),
actual_df_from_sql_entities[expected_df.columns]
.sort_values(by=[event_timestamp, "order_id", "driver_id", "customer_id"])
.reset_index(drop=True),
check_dtype=False,
)

Expand Down Expand Up @@ -532,12 +534,13 @@ def test_historical_features_from_bigquery_sources(
)
)

assert sorted(expected_df.columns) == sorted(actual_df_from_df_entities.columns)
assert_frame_equal(
expected_df.sort_values(
by=[event_timestamp, "order_id", "driver_id", "customer_id"]
).reset_index(drop=True),
actual_df_from_df_entities.sort_values(
by=[event_timestamp, "order_id", "driver_id", "customer_id"]
).reset_index(drop=True),
actual_df_from_df_entities[expected_df.columns]
.sort_values(by=[event_timestamp, "order_id", "driver_id", "customer_id"])
.reset_index(drop=True),
check_dtype=False,
)

0 comments on commit 5430c46

Please sign in to comment.