-
Notifications
You must be signed in to change notification settings - Fork 999
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add Cassandra/AstraDB online store contribution #2873
Conversation
Codecov Report
@@ Coverage Diff @@
## master #2873 +/- ##
==========================================
+ Coverage 67.64% 76.67% +9.02%
==========================================
Files 167 198 +31
Lines 14696 16344 +1648
==========================================
+ Hits 9941 12531 +2590
+ Misses 4755 3813 -942
Flags with carried forward coverage won't be shown. Click here to find out more.
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @hemidactylus ! This generally looks good to me. one thing that I would add is to add the spec for how data is stored in C* or Astra here: https://github.com/feast-dev/feast/tree/master/docs/specs
Other than that, I think the main gap is in testing. What kind of support do you need there to get this over the line?
cache_key = statement | ||
if cache_key not in self._prepared_statements: | ||
logger.info(f"Preparing a {op_name} statement on {fqtable}.") | ||
self._prepared_statements[cache_key] = session.prepare(statement) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, what effect does preparing sessions have here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preparing statements means that the "shape" of the statement is exchanged once between client and DB servers and metadata info (schema, types) is cached on both sides under an ID to be re-used later any time the command is sent to the DB (read or write), reducing the parsing-time overhead basically. (See https://docs.datastax.com/en/developer/python-driver/3.25/getting_started/#prepared-statement if you are interested.)
INSERT_CQL_5_TEMPLATE = ( | ||
"INSERT INTO {fqtable} (feature_name, " | ||
"value, entity_key, event_ts, created_ts)" | ||
" VALUES (?, ?, ?, ?, ?);" | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're in the process of deprecating created_ts in other sources so I would be tempted to keep this out of the implementation for now if feasible..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All right, I'll be ignoring created_ts
even if non-null passed within data
to function online_write_batch
.
In other words, I'll pretend created_ts
is always None
and go from there.
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] | ||
|
||
for entity_key in entity_keys: | ||
entity_key_bin = serialize_entity_key(entity_key).hex() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment here
I added the specifications for this online store in |
29a8c0c
to
ce651b4
Compare
The Cassandra plugin now is covered by the universal online-store tests with Dockerized local cassandra instance. (Thanks to the Feast team for precious guidance!) |
you'll need to also sign your commits too! |
@hemidactylus thanks for your hard work! I'll be reviewing this today. But like @adchia mentioned do you mind signing your commits? |
Hello, I should have signed the commit. Thank you for your support and sorry for the inconvenience! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great! We still need to add some unit tests I think for the online read and write functionality. @hemidactylus Also, need to fix the integration tests.
sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py
Outdated
Show resolved
Hide resolved
sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py
Outdated
Show resolved
Hide resolved
sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py
Outdated
Show resolved
Hide resolved
sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py
Outdated
Show resolved
Hide resolved
self, | ||
config: RepoConfig, | ||
project: str, | ||
table: FeatureView, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would rather you name this featureView to prevent confusion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit reluctant to rename ths parameter other than table
: it has this name on any other online store implementation, plus in several places in the test code there are invocations to the method that use table
for keyword-arguments.
Wouldn't that break some tests (or worse) ?
apply_cassandra_store_settings(config_file, settings) | ||
|
||
|
||
def replace_str_in_file(file_path, match_str, sub_str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you move these to utils so we can use them for other use cases too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done (this led me to create a top-level "file_utils.py' module and touch several template-related modules as well).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few last nits, and a rebase is probably necessary (which may reformat all the python files), but after that I think we're ready to merge!
keyspace_creation_command = ( | ||
"create KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};" % keyspace_name | ||
) | ||
self.container.exec('cqlsh -e "%s"' % keyspace_creation_command) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, we usually prefer f-strings in the codebase these days.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, thank you for pointing out.
project = config.project | ||
# | ||
for entity_key, values, timestamp, created_ts in data: | ||
entity_key_bin = serialize_entity_key(entity_key).hex() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strongly recommend using entity_key_serialization_version=2
in serialize_entity_key
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I'm glad you noticed that!
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] | ||
|
||
for entity_key in entity_keys: | ||
entity_key_bin = serialize_entity_key(entity_key).hex() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment here
hey! wanted to double check here and make sure things were still going well? |
Hello! I was OOTO for some time, sorry. I got back to working on this PR right now, thank you for the feedback! |
@hemidactylus The PR looks pretty good but there is something about the way you setup the test environment that is causing the pr integration tests to hang. Could you please take a look at that? |
Thank you for spotting that. Fixed now. |
* Refactor file-editing to a shared utils module * Use f-strings in the CassandraOnlineStoreCreator * Specify version 2 in serializing to make the entity key * Remove unnecessary empty comment lines * Rename proj to columns in _read_rows_by_entity_key * Introduce Cassandra-specific pytest targets * Adapt roadmaps and docs to cover/index Cassandra online store * Add license notes to code files Signed-off-by: Stefano Lottini <[email protected]>
Signed-off-by: Danny Chiao <[email protected]>
Signed-off-by: Danny Chiao <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: achals, adchia, hemidactylus The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
# [0.24.0](v0.23.0...v0.24.0) (2022-08-25) ### Bug Fixes * Check if on_demand_feature_views is an empty list rather than None for snowflake provider ([#3046](#3046)) ([9b05e65](9b05e65)) * FeatureStore.apply applies BatchFeatureView correctly ([#3098](#3098)) ([41be511](41be511)) * Fix Feast Java inconsistency with int64 serialization vs python ([#3031](#3031)) ([4bba787](4bba787)) * Fix feature service inference logic ([#3089](#3089)) ([4310ed7](4310ed7)) * Fix field mapping logic during feature inference ([#3067](#3067)) ([cdfa761](cdfa761)) * Fix incorrect on demand feature view diffing and improve Java tests ([#3074](#3074)) ([0702310](0702310)) * Fix Java helm charts to work with refactored logic. Fix FTS image ([#3105](#3105)) ([2b493e0](2b493e0)) * Fix on demand feature view output in feast plan + Web UI crash ([#3057](#3057)) ([bfae6ac](bfae6ac)) * Fix release workflow to release 0.24.0 ([#3138](#3138)) ([a69aaae](a69aaae)) * Fix Spark offline store type conversion to arrow ([#3071](#3071)) ([b26566d](b26566d)) * Fixing Web UI, which fails for the SQL registry ([#3028](#3028)) ([64603b6](64603b6)) * Force Snowflake Session to Timezone UTC ([#3083](#3083)) ([9f221e6](9f221e6)) * Make infer dummy entity join key idempotent ([#3115](#3115)) ([1f5b1e0](1f5b1e0)) * More explicit error messages ([#2708](#2708)) ([e4d7afd](e4d7afd)) * Parse inline data sources ([#3036](#3036)) ([c7ba370](c7ba370)) * Prevent overwriting existing file during `persist` ([#3088](#3088)) ([69af21f](69af21f)) * Register BatchFeatureView in feature repos correctly ([#3092](#3092)) ([b8e39ea](b8e39ea)) * Return an empty infra object from sql registry when it doesn't exist ([#3022](#3022)) ([8ba87d1](8ba87d1)) * Teardown tables for Snowflake Materialization testing ([#3106](#3106)) ([0a0c974](0a0c974)) * UI error when saved dataset is present in registry. ([#3124](#3124)) ([83cf753](83cf753)) * Update sql.py ([#3096](#3096)) ([2646a86](2646a86)) * Updated snowflake template ([#3130](#3130)) ([f0594e1](f0594e1)) ### Features * Add authentication option for snowflake connector ([#3039](#3039)) ([74c75f1](74c75f1)) * Add Cassandra/AstraDB online store contribution ([#2873](#2873)) ([feb6cb8](feb6cb8)) * Add Snowflake materialization engine ([#2948](#2948)) ([f3b522b](f3b522b)) * Adding saved dataset capabilities for Postgres ([#3070](#3070)) ([d3253c3](d3253c3)) * Allow passing repo config path via flag ([#3077](#3077)) ([0d2d951](0d2d951)) * Contrib azure provider with synapse/mssql offline store and Azure registry store ([#3072](#3072)) ([9f7e557](9f7e557)) * Custom Docker image for Bytewax batch materialization ([#3099](#3099)) ([cdd1b07](cdd1b07)) * Feast AWS Athena offline store (again) ([#3044](#3044)) ([989ce08](989ce08)) * Implement spark offline store `offline_write_batch` method ([#3076](#3076)) ([5b0cc87](5b0cc87)) * Initial Bytewax materialization engine ([#2974](#2974)) ([55c61f9](55c61f9)) * Refactor feature server helm charts to allow passing feature_store.yaml in environment variables ([#3113](#3113)) ([85ee789](85ee789))
This PR adds support for Cassandra/Astra DB online store for Feast.
What this PR does / why we need it:
Self-managed Cassandra clusters and Astra DB instances (based on Apache Cassandra) are a good fit to work as online features stores. This plugin makes it possible to do so within Feast.
Notes:
make test-python
: all passed.README.md
in the module dir and the relevantrst
autodoc files.