Skip to content

Commit

Permalink
revert: Verify the existence of Registry tables in snowflake… (#3907)
Browse files Browse the repository at this point in the history
Revert "fix: Verify the existence of Registry tables in snowflake before calling CREATE sql command. Allow read-only user to call feast apply. (#3851)"

This reverts commit 9a3590e.

Signed-off-by: Edson Tirelli <[email protected]>
  • Loading branch information
etirelli authored Jan 25, 2024
1 parent 9a3590e commit c0d358a
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 171 deletions.
71 changes: 9 additions & 62 deletions sdk/python/feast/infra/registry/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,15 @@ def __init__(
f'"{self.registry_config.database}"."{self.registry_config.schema_}"'
)

if not self._verify_registry_database():
# Verify the existing resitry database schema from snowflake. If any table names and column types is wrong, run table recreation SQL.
with GetSnowflakeConnection(self.registry_config) as conn:
sql_function_file = f"{os.path.dirname(feast.__file__)}/infra/utils/snowflake/registry/snowflake_table_creation.sql"
with open(sql_function_file, "r") as file:
sqlFile = file.read()

sqlCommands = sqlFile.split(";")
for command in sqlCommands:
query = command.replace(
"REGISTRY_PATH", f"{self.registry_path}"
)
execute_snowflake_statement(conn, query)
with GetSnowflakeConnection(self.registry_config) as conn:
sql_function_file = f"{os.path.dirname(feast.__file__)}/infra/utils/snowflake/registry/snowflake_table_creation.sql"
with open(sql_function_file, "r") as file:
sqlFile = file.read()

sqlCommands = sqlFile.split(";")
for command in sqlCommands:
query = command.replace("REGISTRY_PATH", f"{self.registry_path}")
execute_snowflake_statement(conn, query)

self.cached_registry_proto = self.proto()
proto_registry_utils.init_project_metadata(self.cached_registry_proto, project)
Expand All @@ -149,55 +145,6 @@ def __init__(
)
self.project = project

def _verify_registry_database(
self,
) -> bool:
"""Verify the records in registry database. To check:
1, the 11 tables are existed.
2, the column types are correct.
Example return from snowflake's cursor.describe("SELECT * FROM a_table") command:
[ResultMetadata(name='ENTITY_NAME', type_code=2, display_size=None, internal_size=16777216, precision=None, scale=None, is_nullable=False),
ResultMetadata(name='PROJECT_ID', type_code=2, display_size=None, internal_size=16777216, precision=None, scale=None, is_nullable=False),
ResultMetadata(name='LAST_UPDATED_TIMESTAMP', type_code=6, display_size=None, internal_size=None, precision=0, scale=9, is_nullable=False),
ResultMetadata(name='ENTITY_PROTO', type_code=11, display_size=None, internal_size=8388608, precision=None, scale=None, is_nullable=False)]
Returns:
True if the necessary 11 tables are existed in Snowflake and schema of each table is correct.
False if failure happens.
"""

from feast.infra.utils.snowflake.registry.snowflake_registry_table import (
snowflake_registry_table_names_and_column_types as expect_tables,
)

res = True

try:
with GetSnowflakeConnection(self.registry_config) as conn:
for table_name in expect_tables:
result_metadata_list = conn.cursor().describe(
f"SELECT * FROM {table_name}"
)
for col in result_metadata_list:
if (
expect_tables[table_name][col.name]["type_code"]
!= col.type_code
):
res = False
break
except Exception as e:
res = False # Set to False for all errors.
logger.debug(
f"Failed to verify Registry tables and columns types with exception: {e}."
)
finally:
# The implementation in snowflake_utils.py will cache the established connection without re-connection logic.
# conn.close()
pass

return res

def refresh(self, project: Optional[str] = None):
if project:
project_metadata = proto_registry_utils.get_project_metadata(
Expand Down

This file was deleted.

10 changes: 5 additions & 5 deletions sdk/python/feast/infra/utils/snowflake/snowflake_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,19 @@ def __init__(self, config: str, autocommit=True):

def __enter__(self):

assert self.config.type in {
assert self.config.type in [
"snowflake.registry",
"snowflake.offline",
"snowflake.engine",
"snowflake.online",
}
]

if self.config.type not in _cache:
if self.config.type == "snowflake.registry":
config_header = "connections.feast_registry"
elif self.config.type == "snowflake.offline":
config_header = "connections.feast_offline_store"
elif self.config.type == "snowflake.engine":
if self.config.type == "snowflake.engine":
config_header = "connections.feast_batch_engine"
elif self.config.type == "snowflake.online":
config_header = "connections.feast_online_store"
Expand Down Expand Up @@ -113,11 +113,11 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def assert_snowflake_feature_names(feature_view: FeatureView) -> None:
for feature in feature_view.features:
assert feature.name not in {
assert feature.name not in [
"entity_key",
"feature_name",
"feature_value",
}, f"Feature Name: {feature.name} is a protected name to ensure query stability"
], f"Feature Name: {feature.name} is a protected name to ensure query stability"
return None


Expand Down

0 comments on commit c0d358a

Please sign in to comment.