From f8d3890f9f049c4b9190456b071e0fdb29aae69e Mon Sep 17 00:00:00 2001 From: Adam Schmidt Date: Tue, 4 Apr 2023 01:58:46 +1000 Subject: [PATCH] fix: Snowflake remote storage (#3574) * fix: Snowflake remote storage Signed-off-by: adamschmidt * fix: lint Signed-off-by: adamschmidt * fix: field string build Signed-off-by: adamschmidt * fix: join typo Signed-off-by: adamschmidt * fix: formatting Signed-off-by: adamschmidt --------- Signed-off-by: adamschmidt --- .../feast/infra/offline_stores/snowflake.py | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 404927146a..1dc18256fa 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -109,6 +109,9 @@ class SnowflakeOfflineStoreConfig(FeastConfigBaseModel): blob_export_location: Optional[str] = None """ Location (in S3, Google storage or Azure storage) where data is offloaded """ + convert_timestamp_columns: Optional[bool] = None + """ Convert timestamp columns on export to a Parquet-supported format """ + class Config: allow_population_by_field_name = True @@ -152,6 +155,29 @@ def pull_latest_from_table_or_query( + '"' ) + if config.offline_store.convert_timestamp_columns: + select_fields = list( + map( + lambda field_name: f'"{field_name}"', + join_key_columns + feature_name_columns, + ) + ) + select_timestamps = list( + map( + lambda field_name: f"to_varchar({field_name}, 'YYYY-MM-DD\"T\"HH24:MI:SS.FFTZH:TZM') as {field_name}", + timestamp_columns, + ) + ) + inner_field_string = ", ".join(select_fields + select_timestamps) + else: + select_fields = list( + map( + lambda field_name: f'"{field_name}"', + join_key_columns + feature_name_columns + timestamp_columns, + ) + ) + inner_field_string = ", ".join(select_fields) + if data_source.snowflake_options.warehouse: config.offline_store.warehouse = data_source.snowflake_options.warehouse @@ -166,7 +192,7 @@ def pull_latest_from_table_or_query( {field_string} {f''', TRIM({repr(DUMMY_ENTITY_VAL)}::VARIANT,'"') AS "{DUMMY_ENTITY_ID}"''' if not join_key_columns else ""} FROM ( - SELECT {field_string}, + SELECT {inner_field_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS "_feast_row" FROM {from_expression} WHERE "{timestamp_field}" BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' @@ -533,7 +559,7 @@ def to_remote_storage(self) -> List[str]: self.to_snowflake(table) query = f""" - COPY INTO '{self.config.offline_store.blob_export_location}/{table}' FROM "{self.config.offline_store.database}"."{self.config.offline_store.schema_}"."{table}"\n + COPY INTO '{self.export_path}/{table}' FROM "{self.config.offline_store.database}"."{self.config.offline_store.schema_}"."{table}"\n STORAGE_INTEGRATION = {self.config.offline_store.storage_integration_name}\n FILE_FORMAT = (TYPE = PARQUET) DETAILED_OUTPUT = TRUE