From a799056d7c71b63f4f0fea968d436866c7ac0d27 Mon Sep 17 00:00:00 2001 From: Daniel Lin Date: Fri, 16 Dec 2022 22:22:51 -0800 Subject: [PATCH] feat: Add Rockset as an OnlineStore This commit adds Rockset as a contributed online store. It implements the Update, Teardown, Read and Write apis using the Rockset pythonn client. Signed-off-by: Daniel Lin --- docs/SUMMARY.md | 1 + docs/reference/online-stores/README.md | 4 + docs/reference/online-stores/rockset.md | 38 ++ ...ne_stores.contrib.rockset_online_store.rst | 21 + .../feast.infra.online_stores.contrib.rst | 9 + .../contrib/rockset_online_store/__init__.py | 0 .../contrib/rockset_online_store/rockset.py | 525 ++++++++++++++++++ sdk/python/feast/repo_config.py | 1 + sdk/python/feast/templates/rockset/README.md | 21 + .../feast/templates/rockset/__init__.py | 0 .../feast/templates/rockset/bootstrap.py | 28 + .../rockset/feature_repo/feature_store.yaml | 8 + sdk/python/feast/ui_server.py | 1 + .../requirements/py3.10-ci-requirements.txt | 6 + .../requirements/py3.8-ci-requirements.txt | 6 + .../requirements/py3.9-ci-requirements.txt | 6 + .../feature_repos/repo_configuration.py | 11 + setup.py | 5 + 18 files changed, 691 insertions(+) create mode 100644 docs/reference/online-stores/rockset.md create mode 100644 sdk/python/docs/source/feast.infra.online_stores.contrib.rockset_online_store.rst create mode 100644 sdk/python/feast/infra/online_stores/contrib/rockset_online_store/__init__.py create mode 100644 sdk/python/feast/infra/online_stores/contrib/rockset_online_store/rockset.py create mode 100644 sdk/python/feast/templates/rockset/README.md create mode 100644 sdk/python/feast/templates/rockset/__init__.py create mode 100644 sdk/python/feast/templates/rockset/bootstrap.py create mode 100644 sdk/python/feast/templates/rockset/feature_repo/feature_store.yaml diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 1bab8a61ef..cdca6f3784 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -96,6 +96,7 @@ * [PostgreSQL (contrib)](reference/online-stores/postgres.md) * [Cassandra + Astra DB (contrib)](reference/online-stores/cassandra.md) * [MySQL (contrib)](reference/online-stores/mysql.md) + * [Rockset (contrib)](reference/online-stores/rockset.md) * [Providers](reference/providers/README.md) * [Local](reference/providers/local.md) * [Google Cloud Platform](reference/providers/google-cloud-platform.md) diff --git a/docs/reference/online-stores/README.md b/docs/reference/online-stores/README.md index e46fc28d16..792d1b9553 100644 --- a/docs/reference/online-stores/README.md +++ b/docs/reference/online-stores/README.md @@ -42,3 +42,7 @@ Please see [Online Store](../../getting-started/architecture-and-components/onli [mysql.md](mysql.md) {% endcontent-ref %} +{% content-ref url="mysql.md" %} +[rockset.md](rockset.md) +{% endcontent-ref %} + diff --git a/docs/reference/online-stores/rockset.md b/docs/reference/online-stores/rockset.md new file mode 100644 index 0000000000..0ffb29a2d4 --- /dev/null +++ b/docs/reference/online-stores/rockset.md @@ -0,0 +1,38 @@ +# Rockset (contrib) + +## Description + +In Alpha Development. + +The [Rockset](https://rockset.com/demo-signup/) online store provides support for materializing feature values within a Rockset collection for serving online features in real-time. + +* Each document is uniquely identified by its '_id' value. Repeated inserts into the same document '_id' will result in an upsert. + +Rockset indexes all columns allowing for quick per feature look up and also allows for a dynamic typed schema that can change based on any new requirements. ApiKeys can be found in the console +along with host urls which you can find in "View Region Endpoint Urls". + +Data Model Used Per Doc + +``` +{ + "_id": (STRING) Unique Identifier for the feature document. + : (STRING) Feature Values Mapped by Feature Name. Feature + values stored as a serialized hex string. + .... + "event_ts": (STRING) ISO Stringified Timestamp. + "created_ts": (STRING) ISO Stringified Timestamp. +} +``` + + +## Example + +```yaml +project: my_feature_app +registry: data/registry.db +provider: local +online_stores + type: rockset + apikey: MY_APIKEY_HERE + host: api.usw2a1.rockset.com +``` diff --git a/sdk/python/docs/source/feast.infra.online_stores.contrib.rockset_online_store.rst b/sdk/python/docs/source/feast.infra.online_stores.contrib.rockset_online_store.rst new file mode 100644 index 0000000000..b3de7479a0 --- /dev/null +++ b/sdk/python/docs/source/feast.infra.online_stores.contrib.rockset_online_store.rst @@ -0,0 +1,21 @@ +feast.infra.online\_stores.contrib.rockset\_online\_store package +================================================================= + +Submodules +---------- + +feast.infra.online\_stores.contrib.rockset\_online\_store.rockset module +------------------------------------------------------------------------ + +.. automodule:: feast.infra.online_stores.contrib.rockset_online_store.rockset + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: feast.infra.online_stores.contrib.rockset_online_store + :members: + :undoc-members: + :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.online_stores.contrib.rst b/sdk/python/docs/source/feast.infra.online_stores.contrib.rst index 6b175f4584..f10ff306f3 100644 --- a/sdk/python/docs/source/feast.infra.online_stores.contrib.rst +++ b/sdk/python/docs/source/feast.infra.online_stores.contrib.rst @@ -10,6 +10,7 @@ Subpackages feast.infra.online_stores.contrib.cassandra_online_store feast.infra.online_stores.contrib.hbase_online_store feast.infra.online_stores.contrib.mysql_online_store + feast.infra.online_stores.contrib.rockset_online_store Submodules ---------- @@ -54,6 +55,14 @@ feast.infra.online\_stores.contrib.postgres\_repo\_configuration module :undoc-members: :show-inheritance: +feast.infra.online\_stores.contrib.rockset\_repo\_configuration module +---------------------------------------------------------------------- + +.. automodule:: feast.infra.online_stores.contrib.rockset_repo_configuration + :members: + :undoc-members: + :show-inheritance: + Module contents --------------- diff --git a/sdk/python/feast/infra/online_stores/contrib/rockset_online_store/__init__.py b/sdk/python/feast/infra/online_stores/contrib/rockset_online_store/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/infra/online_stores/contrib/rockset_online_store/rockset.py b/sdk/python/feast/infra/online_stores/contrib/rockset_online_store/rockset.py new file mode 100644 index 0000000000..37cfbd86af --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/rockset_online_store/rockset.py @@ -0,0 +1,525 @@ +# Copyright 2022 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import os +import random +import time +from datetime import datetime +from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, cast + +import requests +from rockset.exceptions import BadRequestException, RocksetException +from rockset.models import QueryRequestSql +from rockset.query_paginator import QueryPaginator +from rockset.rockset_client import RocksetClient + +from feast.entity import Entity +from feast.feature_view import FeatureView +from feast.infra.online_stores.helpers import compute_entity_id +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.usage import log_exceptions_and_usage + +logger = logging.getLogger(__name__) + + +class RocksetOnlineStoreConfig(FeastConfigBaseModel): + """Online store config for Rockset store""" + + type: Literal["rockset"] = "rockset" + """Online store type selector""" + + api_key: Optional[str] = None + """Api Key to be used for Rockset Account. If not set the env var ROCKSET_APIKEY will be used.""" + + host: Optional[str] = None + """The Host Url for Rockset requests. If not set the env var ROCKSET_APISERVER will be used.""" + + read_pagination_batch_size: int = 100 + """Batch size of records that will be turned per page when paginating a batched read""" + + collection_created_timeout_secs: int = 60 + """The amount of time, in seconds, we will wait for the collection to become visible to the API""" + + collection_ready_timeout_secs: int = 30 * 60 + """The amount of time, in seconds, we will wait for the collection to enter READY state""" + + fence_all_writes: bool = True + """Whether to wait for all writes to be flushed from log and queryable. If False, documents that are written may not be seen immediately in subsequent reads""" + + fence_timeout_secs: int = 10 * 60 + """The amount of time we will wait, in seconds, for the write fence to be passed""" + + initial_request_backoff_secs: int = 2 + """Initial backoff, in seconds, we will wait between requests when polling for a response""" + + max_request_backoff_secs: int = 30 + """Initial backoff, in seconds, we will wait between requests when polling for a response""" + + max_request_attempts: int = 10 * 1000 + """The max amount of times we will retry a failed request""" + + +class RocksetOnlineStore(OnlineStore): + """ + Rockset implementation of the online store interface. + + Attributes: + _rockset_client: Rockset openapi client. + """ + + _rockset_client = None + + @log_exceptions_and_usage(online_store="rockset") + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + """ + Write a batch of feature rows to online Rockset store. + + Args: + config: The RepoConfig for the current FeatureStore. + table: Feast FeatureView. + data: a list of quadruplets containing Feature data. Each quadruplet contains an Entity Key, + a dict containing feature values, an event timestamp for the row, and + the created timestamp for the row if it exists. + progress: Optional function to be called once every mini-batch of rows is written to + the online store. Can be used to display progress. + """ + + online_config = config.online_store + assert isinstance(online_config, RocksetOnlineStoreConfig) + + rs = self.get_rockset_client(online_config) + collection_name = self.get_collection_name(config, table) + + # We need to deduplicate on entity_id and we will save the latest timestamp version. + dedup_dict = {} + for feature_vals in data: + entity_key, features, timestamp, created_ts = feature_vals + serialized_key = compute_entity_id( + entity_key=entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + + if serialized_key not in dedup_dict: + dedup_dict[serialized_key] = feature_vals + continue + + # If the entity already existings in the dictionary ignore the entry if it has a lower timestamp. + if timestamp <= dedup_dict[serialized_key][2]: + continue + + dedup_dict[serialized_key] = feature_vals + + request_batch = [] + for serialized_key, feature_vals in dedup_dict.items(): + document = {} + entity_key, features, timestamp, created_ts = feature_vals + document["_id"] = serialized_key + + # Rockset python client currently does not handle datetime correctly and will convert + # to string instead of native Rockset DATETIME. This will be fixed, but until then we + # use isoformat. + document["event_ts"] = timestamp.isoformat() + document["created_ts"] = ( + "" if created_ts is None else created_ts.isoformat() + ) + for k, v in features.items(): + # Rockset client currently does not support bytes type. + document[k] = v.SerializeToString().hex() + + # TODO: Implement async batching with retries. + request_batch.append(document) + + if progress: + progress(1) + + resp = rs.Documents.add_documents( + collection=collection_name, data=request_batch + ) + if online_config.fence_all_writes: + self.wait_for_fence(rs, collection_name, resp["last_offset"], online_config) + + return None + + @log_exceptions_and_usage(online_store="rockset") + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """ + Retrieve feature values from the online Rockset store. + + Args: + config: The RepoConfig for the current FeatureStore. + table: Feast FeatureView. + entity_keys: a list of entity keys that should be read from the FeatureStore. + """ + online_config = config.online_store + assert isinstance(online_config, RocksetOnlineStoreConfig) + + rs = self.get_rockset_client(online_config) + collection_name = self.get_collection_name(config, table) + + feature_list = "" + if requested_features is not None: + feature_list = ",".join(requested_features) + + entity_serialized_key_list = [ + compute_entity_id( + k, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + for k in entity_keys + ] + + entity_query_str = ",".join( + "'{id}'".format(id=s) for s in entity_serialized_key_list + ) + + query_str = f""" + SELECT + "_id", + "event_ts", + {feature_list} + FROM + {collection_name} + WHERE + "_id" IN ({entity_query_str}) + """ + + feature_set = set() + if requested_features: + feature_set.update(requested_features) + + result_map = {} + for page in QueryPaginator( + rs, + rs.Queries.query( + sql=QueryRequestSql( + query=query_str, + paginate=True, + initial_paginate_response_doc_count=online_config.read_pagination_batch_size, + ) + ), + ): + for doc in page: + result = {} + for k, v in doc.items(): + if k not in feature_set: + # We want to skip deserializing values that are not feature values like bookeeping values. + continue + + val = ValueProto() + + # TODO: Remove bytes <-> string parsing once client supports bytes. + val.ParseFromString(bytes.fromhex(v)) + result[k] = val + result_map[doc["_id"]] = ( + datetime.fromisoformat(doc["event_ts"]), + result, + ) + + results_list: List[ + Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]] + ] = [] + for key in entity_serialized_key_list: + if key not in result_map: + # If not found, we add a gap to let the client know. + results_list.append((None, None)) + continue + + results_list.append(result_map[key]) + + return results_list + + @log_exceptions_and_usage(online_store="rockset") + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + """ + Update tables from the Rockset Online Store. + + Args: + config: The RepoConfig for the current FeatureStore. + tables_to_delete: Tables to delete from the Rockset Online Store. + tables_to_keep: Tables to keep in the Rockset Online Store. + """ + online_config = config.online_store + assert isinstance(online_config, RocksetOnlineStoreConfig) + rs = self.get_rockset_client(online_config) + + created_collections = [] + for table_instance in tables_to_keep: + try: + collection_name = self.get_collection_name(config, table_instance) + rs.Collections.create_file_upload_collection(name=collection_name) + created_collections.append(collection_name) + except BadRequestException as e: + if self.parse_request_error_type(e) == "AlreadyExists": + # Table already exists nothing to do. We should still make sure it is ready though. + created_collections.append(collection_name) + continue + raise + + for table_to_delete in tables_to_delete: + self.delete_collection( + rs, collection_name=self.get_collection_name(config, table_to_delete) + ) + + # Now wait for all collections to be READY. + self.wait_for_ready_collections( + rs, created_collections, online_config=online_config + ) + + @log_exceptions_and_usage(online_store="rockset") + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): + """ + Delete all collections from the Rockset Online Store. + + Args: + config: The RepoConfig for the current FeatureStore. + tables: Tables to delete from the feature repo. + """ + online_config = config.online_store + assert isinstance(online_config, RocksetOnlineStoreConfig) + rs = self.get_rockset_client(online_config) + for table in tables: + self.delete_collection( + rs, collection_name=self.get_collection_name(config, table) + ) + + def get_rockset_client( + self, onlineConfig: RocksetOnlineStoreConfig + ) -> RocksetClient: + """ + Fetches the RocksetClient to be used for all requests for this online store based on the api + configuration in the provided config. If no configuration provided local ENV vars will be used. + + Args: + onlineConfig: The RocksetOnlineStoreConfig associated with this online store. + """ + if self._rockset_client is not None: + return self._rockset_client + + _api_key = ( + os.getenv("ROCKSET_APIKEY") + if isinstance(onlineConfig.api_key, type(None)) + else onlineConfig.api_key + ) + _host = ( + os.getenv("ROCKSET_APISERVER") + if isinstance(onlineConfig.host, type(None)) + else onlineConfig.host + ) + self._rockset_client = RocksetClient(host=_host, api_key=_api_key) + return self._rockset_client + + @staticmethod + def delete_collection(rs: RocksetClient, collection_name: str): + """ + Deletes the collection whose name was provided + + Args: + rs: The RocksetClient to be used for the deletion. + collection_name: The name of the collection to be deleted. + """ + + try: + rs.Collections.delete(collection=collection_name) + except RocksetException as e: + if RocksetOnlineStore.parse_request_error_type(e) == "NotFound": + logger.warning( + f"Trying to delete collection that does not exist {collection_name}" + ) + return + raise + + @staticmethod + def get_collection_name(config: RepoConfig, feature_view: FeatureView) -> str: + """ + Returns the collection name based on the provided config and FeatureView. + + Args: + config: RepoConfig for the online store. + feature_view: FeatureView that is backed by the returned collection name. + + Returns: + The collection name as a string. + """ + project_val = config.project if config.project else "feast" + table_name = feature_view.name if feature_view.name else "feature_store" + return f"{project_val}_{table_name}" + + @staticmethod + def parse_request_error_type(e: RocksetException) -> str: + """ + Parse a throw RocksetException. Will return a string representing the type of error that was thrown. + + Args: + e: The RockException that is being parsed. + + Returns: + Error type parsed as a string. + """ + + body_dict = json.loads(e.body) + return body_dict["type"] + + @staticmethod + def wait_for_fence( + rs: RocksetClient, + collection_name: str, + last_offset: str, + online_config: RocksetOnlineStoreConfig, + ): + """ + Waits until 'last_offset' is flushed and values are ready to be read. If wait lasts longer than the timeout specified in config + a timeout exception will be throw. + + Args: + rs: Rockset client that will be used to make all requests. + collection_name: Collection associated with the offsets we are waiting for. + last_offset: The actual offsets we are waiting to be flushed. + online_config: The config that will be used to determine timeouts and backout configurations. + """ + + resource_path = ( + f"/v1/orgs/self/ws/commons/collections/{collection_name}/offsets/commit" + ) + request = {"name": [last_offset]} + + headers = {} + headers["Content-Type"] = "application/json" + headers["Authorization"] = f"ApiKey {rs.api_client.configuration.api_key}" + + t_start = time.time() + for num_attempts in range(online_config.max_request_attempts): + delay = time.time() - t_start + resp = requests.post( + url=f"{rs.api_client.configuration.host}{resource_path}", + json=request, + headers=headers, + ) + + if resp.status_code == 200 and resp.json()["data"]["passed"] is True: + break + + if delay > online_config.fence_timeout_secs: + raise TimeoutError( + f"Write to collection {collection_name} at offset {last_offset} was not available for read after {delay} secs" + ) + + if resp.status_code == 429: + RocksetOnlineStore.backoff_sleep(num_attempts, online_config) + continue + elif resp.status_code != 200: + raise Exception(f"[{resp.status_code}]: {resp.reason}") + + RocksetOnlineStore.backoff_sleep(num_attempts, online_config) + + @staticmethod + def wait_for_ready_collections( + rs: RocksetClient, + collection_names: List[str], + online_config: RocksetOnlineStoreConfig, + ): + """ + Waits until all collections provided have entered READY state and can accept new documents. If wait + lasts longer than timeout a TimeoutError exception will be thrown. + + Args: + rs: Rockset client that will be used to make all requests. + collection_names: All collections that we will wait for. + timeout: The max amount of time we will wait for the collections to become READY. + """ + + t_start = time.time() + for cname in collection_names: + # We will wait until the provided timeout for all collections to become READY. + for num_attempts in range(online_config.max_request_attempts): + resp = None + delay = time.time() - t_start + try: + resp = rs.Collections.get(collection=cname) + except RocksetException as e: + error_type = RocksetOnlineStore.parse_request_error_type(e) + if error_type == "NotFound": + if delay > online_config.collection_created_timeout_secs: + raise TimeoutError( + f"Collection {cname} failed to become visible after {delay} seconds" + ) + elif error_type == "RateLimitExceeded": + RocksetOnlineStore.backoff_sleep(num_attempts, online_config) + continue + else: + raise + + if ( + resp is not None + and cast(Dict[str, dict], resp)["data"]["status"] == "READY" + ): + break + + if delay > online_config.collection_ready_timeout_secs: + raise TimeoutError( + f"Collection {cname} failed to become ready after {delay} seconds" + ) + + RocksetOnlineStore.backoff_sleep(num_attempts, online_config) + + @staticmethod + def backoff_sleep(attempts: int, online_config: RocksetOnlineStoreConfig): + """ + Sleep for the needed amount of time based on the number of request attempts. + + Args: + backoff: The amount of time we will sleep for + max_backoff: The max amount of time we should ever backoff for. + rate_limited: Whether this method is being called as part of a rate limited request. + """ + + default_backoff = online_config.initial_request_backoff_secs + + # Full jitter, exponential backoff. + backoff = random.uniform( + default_backoff, + min(default_backoff << attempts, online_config.max_request_backoff_secs), + ) + time.sleep(backoff) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 673d039ff0..28847294b3 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -53,6 +53,7 @@ "hbase": "feast.infra.online_stores.contrib.hbase_online_store.hbase.HbaseOnlineStore", "cassandra": "feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store.CassandraOnlineStore", "mysql": "feast.infra.online_stores.contrib.mysql_online_store.mysql.MySQLOnlineStore", + "rockset": "feast.infra.online_stores.contrib.rockset_online_store.rockset.RocksetOnlineStore", } OFFLINE_STORE_CLASS_FOR_TYPE = { diff --git a/sdk/python/feast/templates/rockset/README.md b/sdk/python/feast/templates/rockset/README.md new file mode 100644 index 0000000000..d4f1ef6faf --- /dev/null +++ b/sdk/python/feast/templates/rockset/README.md @@ -0,0 +1,21 @@ +# Feast Quickstart +A quick view of what's in this repository: + +* `data/` contains raw demo parquet data +* `feature_repo/driver_repo.py` contains demo feature definitions +* `feature_repo/feature_store.yaml` contains a demo setup configuring where data sources are +* `test_workflow.py` showcases how to run all key Feast commands, including defining, retrieving, and pushing features. + +You can run the overall workflow with `python test_workflow.py`. + +## To move from this into a more production ready workflow: +> See more details in [Running Feast in production](https://docs.feast.dev/how-to-guides/running-feast-in-production) + +1. `feature_store.yaml` points to a local file as a registry. You'll want to setup a remote file (e.g. in S3/GCS) or a + SQL registry. See [registry docs](https://docs.feast.dev/getting-started/concepts/registry) for more details. +2. Setup CI/CD + dev vs staging vs prod environments to automatically update the registry as you change Feast feature definitions. See [docs](https://docs.feast.dev/how-to-guides/running-feast-in-production#1.-automatically-deploying-changes-to-your-feature-definitions). +3. (optional) Regularly scheduled materialization to power low latency feature retrieval (e.g. via Airflow). See [Batch data ingestion](https://docs.feast.dev/getting-started/concepts/data-ingestion#batch-data-ingestion) + for more details. +4. (optional) Deploy feature server instances with `feast serve` to expose endpoints to retrieve online features. + - See [Python feature server](https://docs.feast.dev/reference/feature-servers/python-feature-server) for details. + - Use cases can also directly call the Feast client to fetch features as per [Feature retrieval](https://docs.feast.dev/getting-started/concepts/feature-retrieval) diff --git a/sdk/python/feast/templates/rockset/__init__.py b/sdk/python/feast/templates/rockset/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/templates/rockset/bootstrap.py b/sdk/python/feast/templates/rockset/bootstrap.py new file mode 100644 index 0000000000..ad9c8a5f80 --- /dev/null +++ b/sdk/python/feast/templates/rockset/bootstrap.py @@ -0,0 +1,28 @@ +import click + +from feast.file_utils import replace_str_in_file + + +def bootstrap(): + # Bootstrap() will automatically be called from the init_repo() during `feast init` + import pathlib + + repo_path = pathlib.Path(__file__).parent.absolute() / "feature_repo" + config_file = repo_path / "feature_store.yaml" + data_path = repo_path / "data" + data_path.mkdir(exist_ok=True) + + rockset_apikey = click.prompt( + "Rockset Api Key (If blank will be read from ROCKSET_APIKEY in ENV):" + ) + + rockset_host = click.prompt( + "Rockset Host (If blank will be read from ROCKSET_APISERVER in ENV):" + ) + + replace_str_in_file(config_file, "ROCKSET_APIKEY", rockset_apikey) + replace_str_in_file(config_file, "ROCKSET_APISERVER", rockset_host) + + +if __name__ == "__main__": + bootstrap() diff --git a/sdk/python/feast/templates/rockset/feature_repo/feature_store.yaml b/sdk/python/feast/templates/rockset/feature_repo/feature_store.yaml new file mode 100644 index 0000000000..57cf8e73bb --- /dev/null +++ b/sdk/python/feast/templates/rockset/feature_repo/feature_store.yaml @@ -0,0 +1,8 @@ +project: my_project +registry: registry.db +provider: local +online_store: + type: rockset + api_key: ROCKSET_APIKEY + host: ROCKSET_APISERVER # (api.usw2a1.rockset.com, api.euc1a1.rockset.com, api.use1a1.rockset.com) +entity_key_serialization_version: 2 diff --git a/sdk/python/feast/ui_server.py b/sdk/python/feast/ui_server.py index 94860bdf73..4971b5dfae 100644 --- a/sdk/python/feast/ui_server.py +++ b/sdk/python/feast/ui_server.py @@ -111,4 +111,5 @@ def start_server( host, port, ) + assert root_path is not None uvicorn.run(app, host=host, port=port, root_path=root_path) diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index c001cbae61..7e2a9e7de8 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -220,6 +220,8 @@ fsspec==2022.1.0 # s3fs gcsfs==2022.1.0 # via feast (setup.py) +geojson==2.5.0 + # via rockset geomet==0.2.1.post1 # via cassandra-driver google-api-core[grpc]==2.11.0 @@ -627,6 +629,7 @@ python-dateutil==2.8.2 # kubernetes # moto # pandas + # rockset python-dotenv==0.21.1 # via uvicorn pytz==2022.7.1 @@ -679,6 +682,8 @@ responses==0.22.0 # via moto rfc3986[idna2008]==1.5.0 # via httpx +rockset==1.0.3 + # via feast (setup.py) rsa==4.9 # via google-auth ruamel-yaml==0.17.17 @@ -831,6 +836,7 @@ urllib3==1.26.14 # minio # requests # responses + # rockset # snowflake-connector-python uvicorn[standard]==0.20.0 # via feast (setup.py) diff --git a/sdk/python/requirements/py3.8-ci-requirements.txt b/sdk/python/requirements/py3.8-ci-requirements.txt index ba05d8fe4c..6e6b823844 100644 --- a/sdk/python/requirements/py3.8-ci-requirements.txt +++ b/sdk/python/requirements/py3.8-ci-requirements.txt @@ -224,6 +224,8 @@ fsspec==2022.1.0 # s3fs gcsfs==2022.1.0 # via feast (setup.py) +geojson==2.5.0 + # via rockset geomet==0.2.1.post1 # via cassandra-driver google-api-core[grpc]==2.11.0 @@ -637,6 +639,7 @@ python-dateutil==2.8.2 # kubernetes # moto # pandas + # rockset python-dotenv==0.21.1 # via uvicorn pytz==2022.7.1 @@ -689,6 +692,8 @@ responses==0.22.0 # via moto rfc3986[idna2008]==1.5.0 # via httpx +rockset==1.0.3 + # via feast (setup.py) rsa==4.9 # via google-auth ruamel-yaml==0.17.17 @@ -846,6 +851,7 @@ urllib3==1.26.14 # minio # requests # responses + # rockset # snowflake-connector-python uvicorn[standard]==0.20.0 # via feast (setup.py) diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index 1ec1c03fd1..c75e790163 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -220,6 +220,8 @@ fsspec==2022.1.0 # s3fs gcsfs==2022.1.0 # via feast (setup.py) +geojson==2.5.0 + # via rockset geomet==0.2.1.post1 # via cassandra-driver google-api-core[grpc]==2.11.0 @@ -629,6 +631,7 @@ python-dateutil==2.8.2 # kubernetes # moto # pandas + # rockset python-dotenv==0.21.1 # via uvicorn pytz==2022.7.1 @@ -681,6 +684,8 @@ responses==0.22.0 # via moto rfc3986[idna2008]==1.5.0 # via httpx +rockset==1.0.3 + # via feast (setup.py) rsa==4.9 # via google-auth ruamel-yaml==0.17.17 @@ -838,6 +843,7 @@ urllib3==1.26.14 # minio # requests # responses + # rockset # snowflake-connector-python uvicorn[standard]==0.20.0 # via feast (setup.py) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 174b0b91ad..bb864cdbe7 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -93,6 +93,12 @@ "instance": os.getenv("BIGTABLE_INSTANCE_ID", "feast-integration-tests"), } +ROCKSET_CONFIG = { + "type": "rockset", + "api_key": os.getenv("ROCKSET_APIKEY", ""), + "host": os.getenv("ROCKSET_APISERVER", "api.rs2.usw2.rockset.com"), +} + OFFLINE_STORE_TO_PROVIDER_CONFIG: Dict[str, DataSourceCreator] = { "file": ("local", FileDataSourceCreator), "bigquery": ("gcp", BigQueryDataSourceCreator), @@ -126,6 +132,11 @@ AVAILABLE_ONLINE_STORES["snowflake"] = (SNOWFLAKE_CONFIG, None) AVAILABLE_ONLINE_STORES["bigtable"] = (BIGTABLE_CONFIG, None) + # Uncomment to test using private Rockset account. Currently not enabled as + # there is no dedicated Rockset instance for CI testing and there is no + # containerized version of Rockset. + # AVAILABLE_ONLINE_STORES["rockset"] = (ROCKSET_CONFIG, None) + full_repo_configs_module = os.environ.get(FULL_REPO_CONFIGS_MODULE_ENV_NAME) if full_repo_configs_module is not None: diff --git a/setup.py b/setup.py index 7f7be30124..f89448487e 100644 --- a/setup.py +++ b/setup.py @@ -140,6 +140,10 @@ "pymssql", ] +ROCKSET_REQUIRED = [ + "rockset>=1.0.3", +] + CI_REQUIRED = ( [ "build", @@ -197,6 +201,7 @@ + HBASE_REQUIRED + CASSANDRA_REQUIRED + AZURE_REQUIRED + + ROCKSET_REQUIRED )