diff --git a/docs/reference/online-stores/cassandra.md b/docs/reference/online-stores/cassandra.md index 663a7f6eac..30514305b6 100644 --- a/docs/reference/online-stores/cassandra.md +++ b/docs/reference/online-stores/cassandra.md @@ -33,6 +33,7 @@ online_store: local_dc: 'datacenter1' # optional load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional read_concurrency: 100 # optional + write_concurrency: 100 # optional ``` {% endcode %} @@ -54,6 +55,7 @@ online_store: local_dc: 'eu-central-1' # optional load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional read_concurrency: 100 # optional + write_concurrency: 100 # optional ``` {% endcode %} diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md index a8d7a0ec02..7d9393f30e 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md @@ -59,6 +59,7 @@ online_store: local_dc: 'datacenter1' # optional load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional read_concurrency: 100 # optional + write_concurrency: 100 # optional ``` #### Astra DB setup: @@ -86,6 +87,7 @@ online_store: local_dc: 'eu-central-1' # optional load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional read_concurrency: 100 # optional + write_concurrency: 100 # optional ``` #### Protocol version and load-balancing settings @@ -113,13 +115,13 @@ The former parameter is a region name for Astra DB instances (as can be verified See the source code of the online store integration for the allowed values of the latter parameter. -#### Read concurrency value +#### Read/write concurrency value -You can optionally specify the value of `read_concurrency`, which will be -passed to the Cassandra driver function handling -[concurrent reading of multiple entities](https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/concurrent/#module-cassandra.concurrent). -Consult the reference for guidance on this parameter (which in most cases can be left to its default value of 100). -This is relevant only for retrieval of several entities at once. +You can optionally specify the value of `read_concurrency` and `write_concurrency`, +which will be passed to the Cassandra driver function handling +[concurrent reading/writing of multiple entities](https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/concurrent/#module-cassandra.concurrent). +Consult the reference for guidance on this parameter (which in most cases can be left to its default value of). +This is relevant only for retrieval of several entities at once and during bulk writes, such as in the materialization step. ### More info diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index e13fd4cfba..34a8cab036 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -170,7 +170,15 @@ class CassandraLoadBalancingPolicy(FeastConfigBaseModel): read_concurrency: Optional[StrictInt] = 100 """ Value of the `concurrency` parameter internally passed to Cassandra driver's - `execute_concurrent_with_args ` call. + `execute_concurrent_with_args` call when reading rows from tables. + See https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/concurrent/#module-cassandra.concurrent . + Default: 100. + """ + + write_concurrency: Optional[StrictInt] = 100 + """ + Value of the `concurrency` parameter internally passed to Cassandra driver's + `execute_concurrent_with_args` call when writing rows to tables. See https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/concurrent/#module-cassandra.concurrent . Default: 100. """ @@ -327,21 +335,37 @@ def online_write_batch( display progress. """ project = config.project - for entity_key, values, timestamp, created_ts in data: - entity_key_bin = serialize_entity_key( - entity_key, - entity_key_serialization_version=config.entity_key_serialization_version, - ).hex() - with tracing_span(name="remote_call"): - self._write_rows( - config, - project, - table, - entity_key_bin, - values.items(), - timestamp, - created_ts, - ) + + def unroll_insertion_tuples() -> Iterable[Tuple[str, bytes, str, datetime]]: + """ + We craft an iterable over all rows to be inserted (entities->features), + but this way we can call `progress` after each entity is done. + """ + for entity_key, values, timestamp, created_ts in data: + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ).hex() + for feature_name, val in values.items(): + params: Tuple[str, bytes, str, datetime] = ( + feature_name, + val.SerializeToString(), + entity_key_bin, + timestamp, + ) + yield params + # this happens N-1 times, will be corrected outside: + if progress: + progress(1) + + with tracing_span(name="remote_call"): + self._write_rows_concurrently( + config, + project, + table, + unroll_insertion_tuples(), + ) + # correction for the last missing call to `progress`: if progress: progress(1) @@ -458,39 +482,24 @@ def _fq_table_name(keyspace: str, project: str, table: FeatureView) -> str: """ return f'"{keyspace}"."{project}_{table.name}"' - def _write_rows( + def _write_rows_concurrently( self, config: RepoConfig, project: str, table: FeatureView, - entity_key_bin: str, - features_vals: Iterable[Tuple[str, ValueProto]], - timestamp: datetime, - created_ts: Optional[datetime], + rows: Iterable[Tuple[str, bytes, str, datetime]], ): - """ - Handle the CQL (low-level) insertion of feature values to a table. - - Note: `created_ts` can be None: in that case we avoid explicitly - inserting it to prevent unnecessary tombstone creation on Cassandra. - Note: `created_ts` is being deprecated (July 2022) and the following - reflects this fact. - """ session: Session = self._get_session(config) keyspace: str = self._keyspace fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table) insert_cql = self._get_cql_statement(config, "insert4", fqtable=fqtable) - for feature_name, val in features_vals: - params: Sequence[object] = ( - feature_name, - val.SerializeToString(), - entity_key_bin, - timestamp, - ) - session.execute( - insert_cql, - params, - ) + # + execute_concurrent_with_args( + session, + insert_cql, + rows, + concurrency=config.online_store.write_concurrency, + ) def _read_rows_by_entity_keys( self, diff --git a/sdk/python/feast/templates/cassandra/bootstrap.py b/sdk/python/feast/templates/cassandra/bootstrap.py index f66ae99eff..fa70917914 100644 --- a/sdk/python/feast/templates/cassandra/bootstrap.py +++ b/sdk/python/feast/templates/cassandra/bootstrap.py @@ -122,11 +122,17 @@ def collect_cassandra_store_settings(): c_local_dc = None c_load_balancing_policy = None - needs_concurrency = click.confirm("Specify read concurrency level?", default=False) - if needs_concurrency: - c_concurrency = click.prompt(" Concurrency level?", default=100, type=int) + specify_concurrency = click.confirm("Specify concurrency levels?", default=False) + if specify_concurrency: + c_r_concurrency = click.prompt( + " Read-concurrency level?", default=100, type=int + ) + c_w_concurrency = click.prompt( + " Write-concurrency level?", default=100, type=int + ) else: - c_concurrency = None + c_r_concurrency = None + c_w_concurrency = None return { "c_secure_bundle_path": c_secure_bundle_path, @@ -138,7 +144,8 @@ def collect_cassandra_store_settings(): "c_protocol_version": c_protocol_version, "c_local_dc": c_local_dc, "c_load_balancing_policy": c_load_balancing_policy, - "c_concurrency": c_concurrency, + "c_r_concurrency": c_r_concurrency, + "c_w_concurrency": c_w_concurrency, } @@ -156,7 +163,8 @@ def apply_cassandra_store_settings(config_file, settings): 'c_protocol_version' 'c_local_dc' 'c_load_balancing_policy' - 'c_concurrency' + 'c_r_concurrency' + 'c_w_concurrency' """ write_setting_or_remove( config_file, @@ -224,12 +232,18 @@ def apply_cassandra_store_settings(config_file, settings): remove_lines_from_file(config_file, "load_balancing:") remove_lines_from_file(config_file, "local_dc:") remove_lines_from_file(config_file, "load_balancing_policy:") - # + write_setting_or_remove( config_file, - settings["c_concurrency"], + settings["c_r_concurrency"], "read_concurrency", - "100", + "c_r_concurrency", + ) + write_setting_or_remove( + config_file, + settings["c_w_concurrency"], + "write_concurrency", + "c_w_concurrency", ) diff --git a/sdk/python/feast/templates/cassandra/feature_repo/feature_store.yaml b/sdk/python/feast/templates/cassandra/feature_repo/feature_store.yaml index b43790eda2..ce50275554 100644 --- a/sdk/python/feast/templates/cassandra/feature_repo/feature_store.yaml +++ b/sdk/python/feast/templates/cassandra/feature_repo/feature_store.yaml @@ -16,5 +16,6 @@ online_store: load_balancing: local_dc: c_local_dc load_balancing_policy: c_load_balancing_policy - read_concurrency: 100 + read_concurrency: c_r_concurrency + write_concurrency: c_w_concurrency entity_key_serialization_version: 2