-
Notifications
You must be signed in to change notification settings - Fork 998
/
redshift.py
82 lines (67 loc) · 2.73 KB
/
redshift.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from typing import Dict, Optional
import pandas as pd
from feast import RedshiftSource
from feast.data_source import DataSource
from feast.infra.offline_stores.redshift import RedshiftOfflineStoreConfig
from feast.infra.utils import aws_utils
from feast.repo_config import FeastConfigBaseModel
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
class RedshiftDataSourceCreator(DataSourceCreator):
tables = []
def __init__(self, project_name: str):
super().__init__()
self.project_name = project_name
self.client = aws_utils.get_redshift_data_client("us-west-2")
self.s3 = aws_utils.get_s3_resource("us-west-2")
self.offline_store_config = RedshiftOfflineStoreConfig(
cluster_id="feast-integration-tests",
region="us-west-2",
user="admin",
database="feast",
s3_staging_location="s3://feast-integration-tests/redshift/tests/ingestion",
iam_role="arn:aws:iam::402087665549:role/redshift_s3_access_role",
)
def create_data_source(
self,
df: pd.DataFrame,
destination_name: str,
suffix: Optional[str] = None,
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping: Dict[str, str] = None,
) -> DataSource:
destination_name = self.get_prefixed_table_name(destination_name)
aws_utils.upload_df_to_redshift(
self.client,
self.offline_store_config.cluster_id,
self.offline_store_config.database,
self.offline_store_config.user,
self.s3,
f"{self.offline_store_config.s3_staging_location}/copy/{destination_name}.parquet",
self.offline_store_config.iam_role,
destination_name,
df,
)
self.tables.append(destination_name)
return RedshiftSource(
table=destination_name,
event_timestamp_column=event_timestamp_column,
created_timestamp_column=created_timestamp_column,
date_partition_column="",
field_mapping=field_mapping or {"ts_1": "ts"},
)
def create_offline_store_config(self) -> FeastConfigBaseModel:
return self.offline_store_config
def get_prefixed_table_name(self, suffix: str) -> str:
return f"{self.project_name}_{suffix}"
def teardown(self):
for table in self.tables:
aws_utils.execute_redshift_statement(
self.client,
self.offline_store_config.cluster_id,
self.offline_store_config.database,
self.offline_store_config.user,
f"DROP TABLE IF EXISTS {table}",
)