diff --git a/sdk/python/feast/infra/registry/snowflake.py b/sdk/python/feast/infra/registry/snowflake.py index 40ec27e7d9..56c7bc1f65 100644 --- a/sdk/python/feast/infra/registry/snowflake.py +++ b/sdk/python/feast/infra/registry/snowflake.py @@ -124,19 +124,15 @@ def __init__( f'"{self.registry_config.database}"."{self.registry_config.schema_}"' ) - if not self._verify_registry_database(): - # Verify the existing resitry database schema from snowflake. If any table names and column types is wrong, run table recreation SQL. - with GetSnowflakeConnection(self.registry_config) as conn: - sql_function_file = f"{os.path.dirname(feast.__file__)}/infra/utils/snowflake/registry/snowflake_table_creation.sql" - with open(sql_function_file, "r") as file: - sqlFile = file.read() - - sqlCommands = sqlFile.split(";") - for command in sqlCommands: - query = command.replace( - "REGISTRY_PATH", f"{self.registry_path}" - ) - execute_snowflake_statement(conn, query) + with GetSnowflakeConnection(self.registry_config) as conn: + sql_function_file = f"{os.path.dirname(feast.__file__)}/infra/utils/snowflake/registry/snowflake_table_creation.sql" + with open(sql_function_file, "r") as file: + sqlFile = file.read() + + sqlCommands = sqlFile.split(";") + for command in sqlCommands: + query = command.replace("REGISTRY_PATH", f"{self.registry_path}") + execute_snowflake_statement(conn, query) self.cached_registry_proto = self.proto() proto_registry_utils.init_project_metadata(self.cached_registry_proto, project) @@ -149,55 +145,6 @@ def __init__( ) self.project = project - def _verify_registry_database( - self, - ) -> bool: - """Verify the records in registry database. To check: - 1, the 11 tables are existed. - 2, the column types are correct. - - Example return from snowflake's cursor.describe("SELECT * FROM a_table") command: - [ResultMetadata(name='ENTITY_NAME', type_code=2, display_size=None, internal_size=16777216, precision=None, scale=None, is_nullable=False), - ResultMetadata(name='PROJECT_ID', type_code=2, display_size=None, internal_size=16777216, precision=None, scale=None, is_nullable=False), - ResultMetadata(name='LAST_UPDATED_TIMESTAMP', type_code=6, display_size=None, internal_size=None, precision=0, scale=9, is_nullable=False), - ResultMetadata(name='ENTITY_PROTO', type_code=11, display_size=None, internal_size=8388608, precision=None, scale=None, is_nullable=False)] - - Returns: - True if the necessary 11 tables are existed in Snowflake and schema of each table is correct. - False if failure happens. - """ - - from feast.infra.utils.snowflake.registry.snowflake_registry_table import ( - snowflake_registry_table_names_and_column_types as expect_tables, - ) - - res = True - - try: - with GetSnowflakeConnection(self.registry_config) as conn: - for table_name in expect_tables: - result_metadata_list = conn.cursor().describe( - f"SELECT * FROM {table_name}" - ) - for col in result_metadata_list: - if ( - expect_tables[table_name][col.name]["type_code"] - != col.type_code - ): - res = False - break - except Exception as e: - res = False # Set to False for all errors. - logger.debug( - f"Failed to verify Registry tables and columns types with exception: {e}." - ) - finally: - # The implementation in snowflake_utils.py will cache the established connection without re-connection logic. - # conn.close() - pass - - return res - def refresh(self, project: Optional[str] = None): if project: project_metadata = proto_registry_utils.get_project_metadata( diff --git a/sdk/python/feast/infra/utils/snowflake/registry/snowflake_registry_table.py b/sdk/python/feast/infra/utils/snowflake/registry/snowflake_registry_table.py deleted file mode 100644 index d24fbc27ec..0000000000 --- a/sdk/python/feast/infra/utils/snowflake/registry/snowflake_registry_table.py +++ /dev/null @@ -1,104 +0,0 @@ -# -*- coding: utf-8 -*- - -""" -The table names and column types are following the creation detail listed -in "snowflake_table_creation.sql". - -Snowflake Reference: -1, ResultMetadata: https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#label-python-connector-resultmetadata-object -2, Type Codes: https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#label-python-connector-type-codes ----------------------------------------------- -type_code String Representation Data Type -0 FIXED NUMBER/INT -1 REAL REAL -2 TEXT VARCHAR/STRING -3 DATE DATE -4 TIMESTAMP TIMESTAMP -5 VARIANT VARIANT -6 TIMESTAMP_LTZ TIMESTAMP_LTZ -7 TIMESTAMP_TZ TIMESTAMP_TZ -8 TIMESTAMP_NTZ TIMESTAMP_TZ -9 OBJECT OBJECT -10 ARRAY ARRAY -11 BINARY BINARY -12 TIME TIME -13 BOOLEAN BOOLEAN ----------------------------------------------- - -(last update: 2023-11-30) - -""" - -snowflake_registry_table_names_and_column_types = { - "DATA_SOURCES": { - "DATA_SOURCE_NAME": {"type_code": 2, "type": "VARCHAR"}, - "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, - "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, - "DATA_SOURCE_PROTO": {"type_code": 11, "type": "BINARY"}, - }, - "ENTITIES": { - "ENTITY_NAME": {"type_code": 2, "type": "VARCHAR"}, - "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, - "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, - "ENTITY_PROTO": {"type_code": 11, "type": "BINARY"}, - }, - "FEAST_METADATA": { - "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, - "METADATA_KEY": {"type_code": 2, "type": "VARCHAR"}, - "METADATA_VALUE": {"type_code": 2, "type": "VARCHAR"}, - "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, - }, - "FEATURE_SERVICES": { - "FEATURE_SERVICE_NAME": {"type_code": 2, "type": "VARCHAR"}, - "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, - "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, - "FEATURE_SERVICE_PROTO": {"type_code": 11, "type": "BINARY"}, - }, - "FEATURE_VIEWS": { - "FEATURE_VIEW_NAME": {"type_code": 2, "type": "VARCHAR"}, - "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, - "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, - "FEATURE_VIEW_PROTO": {"type_code": 11, "type": "BINARY"}, - "MATERIALIZED_INTERVALS": {"type_code": 11, "type": "BINARY"}, - "USER_METADATA": {"type_code": 11, "type": "BINARY"}, - }, - "MANAGED_INFRA": { - "INFRA_NAME": {"type_code": 2, "type": "VARCHAR"}, - "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, - "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, - "INFRA_PROTO": {"type_code": 11, "type": "BINARY"}, - }, - "ON_DEMAND_FEATURE_VIEWS": { - "ON_DEMAND_FEATURE_VIEW_NAME": {"type_code": 2, "type": "VARCHAR"}, - "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, - "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, - "ON_DEMAND_FEATURE_VIEW_PROTO": {"type_code": 11, "type": "BINARY"}, - "USER_METADATA": {"type_code": 11, "type": "BINARY"}, - }, - "REQUEST_FEATURE_VIEWS": { - "REQUEST_FEATURE_VIEW_NAME": {"type_code": 2, "type": "VARCHAR"}, - "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, - "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, - "REQUEST_FEATURE_VIEW_PROTO": {"type_code": 11, "type": "BINARY"}, - "USER_METADATA": {"type_code": 11, "type": "BINARY"}, - }, - "SAVED_DATASETS": { - "SAVED_DATASET_NAME": {"type_code": 2, "type": "VARCHAR"}, - "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, - "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, - "SAVED_DATASET_PROTO": {"type_code": 11, "type": "BINARY"}, - }, - "STREAM_FEATURE_VIEWS": { - "STREAM_FEATURE_VIEW_NAME": {"type_code": 2, "type": "VARCHAR"}, - "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, - "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, - "STREAM_FEATURE_VIEW_PROTO": {"type_code": 11, "type": "BINARY"}, - "USER_METADATA": {"type_code": 11, "type": "BINARY"}, - }, - "VALIDATION_REFERENCES": { - "VALIDATION_REFERENCE_NAME": {"type_code": 2, "type": "VARCHAR"}, - "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, - "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, - "VALIDATION_REFERENCE_PROTO": {"type_code": 11, "type": "BINARY"}, - }, -} diff --git a/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py b/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py index 3a56619bdb..a4cda89a6f 100644 --- a/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py +++ b/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py @@ -49,19 +49,19 @@ def __init__(self, config: str, autocommit=True): def __enter__(self): - assert self.config.type in { + assert self.config.type in [ "snowflake.registry", "snowflake.offline", "snowflake.engine", "snowflake.online", - } + ] if self.config.type not in _cache: if self.config.type == "snowflake.registry": config_header = "connections.feast_registry" elif self.config.type == "snowflake.offline": config_header = "connections.feast_offline_store" - elif self.config.type == "snowflake.engine": + if self.config.type == "snowflake.engine": config_header = "connections.feast_batch_engine" elif self.config.type == "snowflake.online": config_header = "connections.feast_online_store" @@ -113,11 +113,11 @@ def __exit__(self, exc_type, exc_val, exc_tb): def assert_snowflake_feature_names(feature_view: FeatureView) -> None: for feature in feature_view.features: - assert feature.name not in { + assert feature.name not in [ "entity_key", "feature_name", "feature_value", - }, f"Feature Name: {feature.name} is a protected name to ensure query stability" + ], f"Feature Name: {feature.name} is a protected name to ensure query stability" return None