Skip to content

Commit

Permalink
feat: Add SingleStore as an OnlineStore (feast-dev#4285)
Browse files Browse the repository at this point in the history
Add SingleStore as an OnlineStore

Signed-off-by: Olha Kramarenko <[email protected]>
  • Loading branch information
okramarenko authored Jun 26, 2024
1 parent 372fd75 commit 2c38946
Show file tree
Hide file tree
Showing 15 changed files with 433 additions and 6 deletions.
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,17 @@ test-python-universal-cassandra-no-cloud-providers:
not test_snowflake" \
sdk/python/tests

test-python-universal-singlestore-online:
PYTHONPATH='.' \
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.singlestore_repo_configuration \
PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.singlestore \
python -m pytest -n 8 --integration \
-k "not test_universal_cli and \
not gcs_registry and \
not s3_registry and \
not test_snowflake" \
sdk/python/tests

test-python-universal:
python -m pytest -n 8 --integration sdk/python/tests

Expand Down
1 change: 1 addition & 0 deletions docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
* [Rockset (contrib)](reference/online-stores/rockset.md)
* [Hazelcast (contrib)](reference/online-stores/hazelcast.md)
* [ScyllaDB (contrib)](reference/online-stores/scylladb.md)
* [SingleStore (contrib)](reference/online-stores/singlestore.md)
* [Providers](reference/providers/README.md)
* [Local](reference/providers/local.md)
* [Google Cloud Platform](reference/providers/google-cloud-platform.md)
Expand Down
3 changes: 3 additions & 0 deletions docs/reference/online-stores/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,7 @@ Please see [Online Store](../../getting-started/architecture-and-components/onli

{% content-ref url="remote.md" %}
[remote.md](remote.md)

{% content-ref url="singlestore.md" %}
[singlestore.md](singlestore.md)
{% endcontent-ref %}
51 changes: 51 additions & 0 deletions docs/reference/online-stores/singlestore.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# SingleStore online store (contrib)

## Description

The SingleStore online store provides support for materializing feature values into a SingleStore database for serving online features.

## Getting started
In order to use this online store, you'll need to run `pip install 'feast[singlestore]'`. You can get started by then running `feast init` and then setting the `feature_store.yaml` as described below.

## Example

{% code title="feature_store.yaml" %}
```yaml
project: my_feature_repo
registry: data/registry.db
provider: local
online_store:
type: singlestore
host: DB_HOST
port: DB_PORT
database: DB_NAME
user: DB_USERNAME
password: DB_PASSWORD
```
{% endcode %}
## Functionality Matrix
The set of functionality supported by online stores is described in detail [here](overview.md#functionality).
Below is a matrix indicating which functionality is supported by the SingleStore online store.
| | SingleStore |
| :-------------------------------------------------------- | :----------- |
| write feature values to the online store | yes |
| read feature values from the online store | yes |
| update infrastructure (e.g. tables) in the online store | yes |
| teardown infrastructure (e.g. tables) in the online store | yes |
| generate a plan of infrastructure changes | no |
| support for on-demand transforms | yes |
| readable by Python SDK | yes |
| readable by Java | no |
| readable by Go | no |
| support for entityless feature views | yes |
| support for concurrent writing to the same key | no |
| support for ttl (time to live) at retrieval | no |
| support for deleting expired data | no |
| collocated by feature view | yes |
| collocated by feature service | no |
| collocated by entity key | no |
To compare this set of functionality against other online stores, please see the full [functionality matrix](overview.md#functionality-matrix).
8 changes: 8 additions & 0 deletions sdk/python/docs/source/feast.infra.online_stores.contrib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ feast.infra.online\_stores.contrib.postgres\_repo\_configuration module
:undoc-members:
:show-inheritance:

feast.infra.online\_stores.contrib.singlestore\_repo\_configuration module
--------------------------------------------------------------------------

.. automodule:: feast.infra.online_stores.contrib.singlestore_repo_configuration
:members:
:undoc-members:
:show-inheritance:

Module contents
---------------

Expand Down
21 changes: 21 additions & 0 deletions sdk/python/docs/source/feast.infra.registry.contrib.postgres.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
feast.infra.registry.contrib.postgres package
=============================================

Submodules
----------

feast.infra.registry.contrib.postgres.postgres\_registry\_store module
----------------------------------------------------------------------

.. automodule:: feast.infra.registry.contrib.postgres.postgres_registry_store
:members:
:undoc-members:
:show-inheritance:

Module contents
---------------

.. automodule:: feast.infra.registry.contrib.postgres
:members:
:undoc-members:
:show-inheritance:
1 change: 1 addition & 0 deletions sdk/python/docs/source/feast.infra.registry.contrib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Subpackages
:maxdepth: 4

feast.infra.registry.contrib.azure
feast.infra.registry.contrib.postgres

Module contents
---------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
from __future__ import absolute_import

from collections import defaultdict
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple

import pytz
import singlestoredb
from pydantic import StrictStr
from singlestoredb.connection import Connection, Cursor
from singlestoredb.exceptions import InterfaceError

from feast import Entity, FeatureView, RepoConfig
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel


class SingleStoreOnlineStoreConfig(FeastConfigBaseModel):
"""
Configuration for the SingleStore online store.
NOTE: The class *must* end with the `OnlineStoreConfig` suffix.
"""

type: Literal["singlestore"] = "singlestore"

host: Optional[StrictStr] = None
user: Optional[StrictStr] = None
password: Optional[StrictStr] = None
database: Optional[StrictStr] = None
port: Optional[int] = None


class SingleStoreOnlineStore(OnlineStore):
"""
An online store implementation that uses SingleStore.
NOTE: The class *must* end with the `OnlineStore` suffix.
"""

_conn: Optional[Connection] = None

def _init_conn(self, config: RepoConfig) -> Connection:
online_store_config = config.online_store
assert isinstance(online_store_config, SingleStoreOnlineStoreConfig)
return singlestoredb.connect(
host=online_store_config.host or "127.0.0.1",
user=online_store_config.user or "test",
password=online_store_config.password or "test",
database=online_store_config.database or "feast",
port=online_store_config.port or 3306,
autocommit=True,
)

def _get_cursor(self, config: RepoConfig) -> Any:
# This will try to reconnect also.
# In case it fails, we will have to create a new connection.
if not self._conn:
self._conn = self._init_conn(config)
try:
self._conn.ping(reconnect=True)
except InterfaceError:
self._conn = self._init_conn(config)
return self._conn.cursor()

def online_write_batch(
self,
config: RepoConfig,
table: FeatureView,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
project = config.project
with self._get_cursor(config) as cur:
insert_values = []
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=2,
).hex()
timestamp = _to_naive_utc(timestamp)
if created_ts is not None:
created_ts = _to_naive_utc(created_ts)

for feature_name, val in values.items():
insert_values.append(
(
entity_key_bin,
feature_name,
val.SerializeToString(),
timestamp,
created_ts,
)
)
# Control the batch so that we can update the progress
batch_size = 50000
for i in range(0, len(insert_values), batch_size):
current_batch = insert_values[i : i + batch_size]
cur.executemany(
f"""
INSERT INTO {_table_id(project, table)}
(entity_key, feature_name, value, event_ts, created_ts)
values (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
value = VALUES(value),
event_ts = VALUES(event_ts),
created_ts = VALUES(created_ts);
""",
current_batch,
)
if progress:
progress(len(current_batch))

def online_read(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
project = config.project
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
with self._get_cursor(config) as cur:
keys = []
for entity_key in entity_keys:
keys.append(
serialize_entity_key(
entity_key,
entity_key_serialization_version=2,
).hex()
)

if not requested_features:
entity_key_placeholders = ",".join(["%s" for _ in keys])
cur.execute(
f"""
SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table)}
WHERE entity_key IN ({entity_key_placeholders})
ORDER BY event_ts;
""",
tuple(keys),
)
else:
entity_key_placeholders = ",".join(["%s" for _ in keys])
requested_features_placeholders = ",".join(
["%s" for _ in requested_features]
)
cur.execute(
f"""
SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table)}
WHERE entity_key IN ({entity_key_placeholders}) and feature_name IN ({requested_features_placeholders})
ORDER BY event_ts;
""",
tuple(keys + requested_features),
)
rows = cur.fetchall() or []

# Since we don't know the order returned from MySQL we'll need
# to construct a dict to be able to quickly look up the correct row
# when we iterate through the keys since they are in the correct order
values_dict = defaultdict(list)
for row in rows:
values_dict[row[0]].append(row[1:])

for key in keys:
if key in values_dict:
key_values = values_dict[key]
res = {}
res_ts: Optional[datetime] = None
for feature_name, value_bin, event_ts in key_values:
val = ValueProto()
val.ParseFromString(bytes(value_bin))
res[feature_name] = val
res_ts = event_ts
result.append((res_ts, res))
else:
result.append((None, None))
return result

def update(
self,
config: RepoConfig,
tables_to_delete: Sequence[FeatureView],
tables_to_keep: Sequence[FeatureView],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
) -> None:
project = config.project
with self._get_cursor(config) as cur:
# We don't create any special state for the entities in this implementation.
for table in tables_to_keep:
cur.execute(
f"""CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512),
feature_name VARCHAR(256),
value BLOB,
event_ts timestamp NULL DEFAULT NULL,
created_ts timestamp NULL DEFAULT NULL,
PRIMARY KEY(entity_key, feature_name),
INDEX {_table_id(project, table)}_ek (entity_key))"""
)

for table in tables_to_delete:
_drop_table_and_index(cur, project, table)

def teardown(
self,
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
) -> None:
project = config.project
with self._get_cursor(config) as cur:
for table in tables:
_drop_table_and_index(cur, project, table)


def _drop_table_and_index(cur: Cursor, project: str, table: FeatureView) -> None:
table_name = _table_id(project, table)
cur.execute(f"DROP INDEX {table_name}_ek ON {table_name};")
cur.execute(f"DROP TABLE IF EXISTS {table_name}")


def _table_id(project: str, table: FeatureView) -> str:
return f"{project}_{table.name}"


def _to_naive_utc(ts: datetime) -> datetime:
if ts.tzinfo is None:
return ts
else:
return ts.astimezone(pytz.utc).replace(tzinfo=None)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
)
from tests.integration.feature_repos.universal.online_store.singlestore import (
SingleStoreOnlineStoreCreator,
)

FULL_REPO_CONFIGS = [
IntegrationTestRepoConfig(online_store_creator=SingleStoreOnlineStoreCreator),
]
1 change: 1 addition & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
"ikv": "feast.infra.online_stores.contrib.ikv_online_store.ikv.IKVOnlineStore",
"elasticsearch": "feast.infra.online_stores.contrib.elasticsearch.ElasticSearchOnlineStore",
"remote": "feast.infra.online_stores.remote.RemoteOnlineStore",
"singlestore": "feast.infra.online_stores.contrib.singlestore_online_store.singlestore.SingleStoreOnlineStore",
}

OFFLINE_STORE_CLASS_FOR_TYPE = {
Expand Down
Loading

0 comments on commit 2c38946

Please sign in to comment.