Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Cassandra online store, concurrency in bulk write operations #3367

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/reference/online-stores/cassandra.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ online_store:
local_dc: 'datacenter1' # optional
load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional
read_concurrency: 100 # optional
write_concurrency: 100 # optional
```
{% endcode %}

Expand All @@ -54,6 +55,7 @@ online_store:
local_dc: 'eu-central-1' # optional
load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional
read_concurrency: 100 # optional
write_concurrency: 100 # optional
```
{% endcode %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ online_store:
local_dc: 'datacenter1' # optional
load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional
read_concurrency: 100 # optional
write_concurrency: 100 # optional
```

#### Astra DB setup:
Expand Down Expand Up @@ -86,6 +87,7 @@ online_store:
local_dc: 'eu-central-1' # optional
load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional
read_concurrency: 100 # optional
write_concurrency: 100 # optional
```

#### Protocol version and load-balancing settings
Expand Down Expand Up @@ -113,13 +115,13 @@ The former parameter is a region name for Astra DB instances (as can be verified
See the source code of the online store integration for the allowed values of
the latter parameter.

#### Read concurrency value
#### Read/write concurrency value

You can optionally specify the value of `read_concurrency`, which will be
passed to the Cassandra driver function handling
[concurrent reading of multiple entities](https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/concurrent/#module-cassandra.concurrent).
Consult the reference for guidance on this parameter (which in most cases can be left to its default value of 100).
This is relevant only for retrieval of several entities at once.
You can optionally specify the value of `read_concurrency` and `write_concurrency`,
which will be passed to the Cassandra driver function handling
[concurrent reading/writing of multiple entities](https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/concurrent/#module-cassandra.concurrent).
Consult the reference for guidance on this parameter (which in most cases can be left to its default value of).
This is relevant only for retrieval of several entities at once and during bulk writes, such as in the materialization step.

### More info

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,15 @@ class CassandraLoadBalancingPolicy(FeastConfigBaseModel):
read_concurrency: Optional[StrictInt] = 100
"""
Value of the `concurrency` parameter internally passed to Cassandra driver's
`execute_concurrent_with_args ` call.
`execute_concurrent_with_args` call when reading rows from tables.
See https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/concurrent/#module-cassandra.concurrent .
Default: 100.
"""

write_concurrency: Optional[StrictInt] = 100
"""
Value of the `concurrency` parameter internally passed to Cassandra driver's
`execute_concurrent_with_args` call when writing rows to tables.
See https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/concurrent/#module-cassandra.concurrent .
Default: 100.
"""
Expand Down Expand Up @@ -327,21 +335,37 @@ def online_write_batch(
display progress.
"""
project = config.project
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
with tracing_span(name="remote_call"):
self._write_rows(
config,
project,
table,
entity_key_bin,
values.items(),
timestamp,
created_ts,
)

def unroll_insertion_tuples() -> Iterable[Tuple[str, bytes, str, datetime]]:
"""
We craft an iterable over all rows to be inserted (entities->features),
but this way we can call `progress` after each entity is done.
"""
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
for feature_name, val in values.items():
params: Tuple[str, bytes, str, datetime] = (
feature_name,
val.SerializeToString(),
entity_key_bin,
timestamp,
)
yield params
# this happens N-1 times, will be corrected outside:
if progress:
progress(1)

with tracing_span(name="remote_call"):
self._write_rows_concurrently(
config,
project,
table,
unroll_insertion_tuples(),
)
# correction for the last missing call to `progress`:
if progress:
progress(1)

Expand Down Expand Up @@ -458,39 +482,24 @@ def _fq_table_name(keyspace: str, project: str, table: FeatureView) -> str:
"""
return f'"{keyspace}"."{project}_{table.name}"'

def _write_rows(
def _write_rows_concurrently(
self,
config: RepoConfig,
project: str,
table: FeatureView,
entity_key_bin: str,
features_vals: Iterable[Tuple[str, ValueProto]],
timestamp: datetime,
created_ts: Optional[datetime],
rows: Iterable[Tuple[str, bytes, str, datetime]],
):
"""
Handle the CQL (low-level) insertion of feature values to a table.

Note: `created_ts` can be None: in that case we avoid explicitly
inserting it to prevent unnecessary tombstone creation on Cassandra.
Note: `created_ts` is being deprecated (July 2022) and the following
reflects this fact.
"""
session: Session = self._get_session(config)
keyspace: str = self._keyspace
fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table)
insert_cql = self._get_cql_statement(config, "insert4", fqtable=fqtable)
for feature_name, val in features_vals:
params: Sequence[object] = (
feature_name,
val.SerializeToString(),
entity_key_bin,
timestamp,
)
session.execute(
insert_cql,
params,
)
#
execute_concurrent_with_args(
session,
insert_cql,
rows,
concurrency=config.online_store.write_concurrency,
)

def _read_rows_by_entity_keys(
self,
Expand Down
32 changes: 23 additions & 9 deletions sdk/python/feast/templates/cassandra/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,17 @@ def collect_cassandra_store_settings():
c_local_dc = None
c_load_balancing_policy = None

needs_concurrency = click.confirm("Specify read concurrency level?", default=False)
if needs_concurrency:
c_concurrency = click.prompt(" Concurrency level?", default=100, type=int)
specify_concurrency = click.confirm("Specify concurrency levels?", default=False)
if specify_concurrency:
c_r_concurrency = click.prompt(
" Read-concurrency level?", default=100, type=int
)
c_w_concurrency = click.prompt(
" Write-concurrency level?", default=100, type=int
)
else:
c_concurrency = None
c_r_concurrency = None
c_w_concurrency = None

return {
"c_secure_bundle_path": c_secure_bundle_path,
Expand All @@ -138,7 +144,8 @@ def collect_cassandra_store_settings():
"c_protocol_version": c_protocol_version,
"c_local_dc": c_local_dc,
"c_load_balancing_policy": c_load_balancing_policy,
"c_concurrency": c_concurrency,
"c_r_concurrency": c_r_concurrency,
"c_w_concurrency": c_w_concurrency,
}


Expand All @@ -156,7 +163,8 @@ def apply_cassandra_store_settings(config_file, settings):
'c_protocol_version'
'c_local_dc'
'c_load_balancing_policy'
'c_concurrency'
'c_r_concurrency'
'c_w_concurrency'
"""
write_setting_or_remove(
config_file,
Expand Down Expand Up @@ -224,12 +232,18 @@ def apply_cassandra_store_settings(config_file, settings):
remove_lines_from_file(config_file, "load_balancing:")
remove_lines_from_file(config_file, "local_dc:")
remove_lines_from_file(config_file, "load_balancing_policy:")
#

write_setting_or_remove(
config_file,
settings["c_concurrency"],
settings["c_r_concurrency"],
"read_concurrency",
"100",
"c_r_concurrency",
)
write_setting_or_remove(
config_file,
settings["c_w_concurrency"],
"write_concurrency",
"c_w_concurrency",
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ online_store:
load_balancing:
local_dc: c_local_dc
load_balancing_policy: c_load_balancing_policy
read_concurrency: 100
read_concurrency: c_r_concurrency
write_concurrency: c_w_concurrency
entity_key_serialization_version: 2