Skip to content

Commit

Permalink
Merge branch 'master' into brijesh.vora/SAASMLOPS-910
Browse files Browse the repository at this point in the history
  • Loading branch information
brijesh-vora-sp authored Jan 30, 2024
2 parents 3050970 + 987f0fd commit 0011417
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ COPY README.md README.md
# git dir to infer the version of feast we're installing.
# https://github.com/pypa/setuptools_scm#usage-from-docker
# I think it also assumes that this dockerfile is being built from the root of the directory.
RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir '.[aws,gcp,bytewax,snowflake]'
RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir '.[aws,gcp,bytewax,snowflake,postgres]'
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
logging.basicConfig(level=logging.INFO)

with open("/var/feast/feature_store.yaml") as f:
feast_config = yaml.safe_load(f)
feast_config = yaml.load(f, Loader=yaml.Loader)

with open("/var/feast/bytewax_materialization_config.yaml") as b:
bytewax_config = yaml.safe_load(b)
bytewax_config = yaml.load(b, Loader=yaml.Loader)

config = RepoConfig(**feast_config)
store = FeatureStore(config=config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,28 @@ def update(

# We don't create any special state for the entities in this implementation.
for table in tables_to_keep:

table_name = _table_id(project, table)
index_name = f"{table_name}_ek"
cur.execute(
f"""CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512),
f"""CREATE TABLE IF NOT EXISTS {table_name} (entity_key VARCHAR(512),
feature_name VARCHAR(256),
value BLOB,
event_ts timestamp NULL DEFAULT NULL,
created_ts timestamp NULL DEFAULT NULL,
PRIMARY KEY(entity_key, feature_name))"""
)

cur.execute(
f"ALTER TABLE {_table_id(project, table)} ADD INDEX {_table_id(project, table)}_ek (entity_key);"
index_exists = cur.execute(
f"""
SELECT 1 FROM information_schema.statistics
WHERE table_schema = DATABASE() AND table_name = '{table_name}' AND index_name = '{index_name}'
"""
)
if not index_exists:
cur.execute(
f"ALTER TABLE {table_name} ADD INDEX {index_name} (entity_key);"
)

for table in tables_to_delete:
_drop_table_and_index(cur, project, table)
Expand Down

0 comments on commit 0011417

Please sign in to comment.