-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ENH] add .clean_log()
to Producers
#2549
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
Please tag your PR title with one of: [ENH | BUG | DOC | TST | BLD | PERF | TYP | CLN | CHORE]. See https://docs.trychroma.com/contributing#contributing-code-and-ideas |
6624dce
to
388bfb3
Compare
@@ -243,6 +291,28 @@ def unsubscribe(self, subscription_id: UUID) -> None: | |||
del self._subscriptions[topic_name] | |||
return | |||
|
|||
@trace_method("SqlEmbeddingsQueue.ack", OpenTelemetryGranularity.ALL) | |||
@override | |||
def ack(self, subscription_id: UUID, up_to_seq_id: SeqId) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels weird as we re inverting the responsibility of who maintains the max_seq_id table. In fact this duplicates what the metadata segment already does. Doesn't it make sense to let segments maintain their own max_seq_ids and have just an in-memory representation held by the embedding queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, not quite understanding--are you proposing that segments maintain max_seq_id
themselves and the embeddings queue calls max_seq_id()
or something on them during clean_log()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Segments already maintain their max_seq_id
:
- Metadata -
chroma/chromadb/segment/impl/metadata/sqlite.py
Lines 483 to 495 in 28b3739
with self._db.tx() as cur: for record in records: q = ( self._db.querybuilder() .into(Table("max_seq_id")) .columns("segment_id", "seq_id") .insert( ParameterValue(self._db.uuid_to_db(self._id)), ParameterValue(_encode_seq_id(record["log_offset"])), ) ) sql, params = get_sql(q) sql = sql.replace("INSERT", "INSERT OR REPLACE") - HNSW with the pickled metadata -
chroma/chromadb/segment/impl/vector/local_persistent_hnsw.py
Lines 213 to 222 in 28b3739
self._persist_data.max_seq_id = self._max_seq_id # TODO: This should really be stored in sqlite, the index itself, or a better # storage format self._persist_data.id_to_label = self._id_to_label self._persist_data.label_to_id = self._label_to_id self._persist_data.id_to_seq_id = self._id_to_seq_id with open(self._get_metadata_file(), "wb") as metadata_file: pickle.dump(self._persist_data, metadata_file, pickle.HIGHEST_PROTOCOL)
My concern was whether it is the embedding queue responsibility to update max_seq_id
table or leave that to each segment. Instead, let segments report their max sequence IDs either via the ack()
or simply the return of the notify_one
and keep these in-memory (aka as attributes for each subscriber).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I see. There's two main issues with keeping max sequence IDs in memory on the embeddings queue:
- It assumes all segments have subscribed to the embeddings queue before
clean_log()
is called. This may be true in the vast majority of cases but it feels like that's a correctness bug waiting to happen. You could add an assertion thatlen(subscribers) == 2
but imo that's not materially better than just persisting the max sequence ID. - It requires segments to be loaded into memory for
clean_log()
to work. In most cases the segments will probably already be in-memory, but this could makechroma vacuum
slower and implementation a little annoying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't have to keep max_seq_id in memory, but only query it and let segments update max_seq_id
table on their own (aka responsibility for persisting the max_seq_id lies within the owner of the counter - each segment).
Here's a hypothetical situation situation:
- Add a large batch (that overflows the threshold)
- Vector segment updates its metadata to store the max_seq_id
- Embedding queue fails to update the
max_seq_id
- not very likely, but also not impossible. We roll back the whole shenanigan, but you still have vectors added to the index. This, too, is a correctness bug waiting to happen :)
I hope you see my point about the responsibility of segments to manage their own max_seq_id
and make the embedding queue only a consumer (aka only query) the state upon clean_log()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Embedding queue fails to update the max_seq_id - not very likely, but also not impossible. We roll back the whole shenanigan, but you still have vectors added to the index. This, too, is a correctness bug waiting to happen :)
Sorry, not sure how I see this could result in a correctness bug--at worst the embeddings queue won't prune processed WAL records?
I hope you see my point about the responsibility of segments to manage their own max_seq_id and make the embedding queue only a consumer (aka only query) the state upon clean_log().
To clarify, in this implementation would clean_log()
directly inspect the max_seq_id
table and/or the pickled metadata file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking through this some more, I'd be fine having segments be responsible for updating the max_seq_id
table themselves. We then don't need the ack()
method. The only thing that's a little weird then is that clean_log()
will be directly accessing the max_seq_id
table.
Maybe we have a new component, LogPosition
, that owns reads/writes for max_seq_id
and we put clean_log()
there? That seems fairly clean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...segments be responsible for updating the max_seq_id table themselves. We then don't need the ack() method. The only thing that's a little weird then is that clean_log() will be directly accessing the max_seq_id table.
I think this is a slight improvement. Comparison in 41a7131. End of the comment has some of the considerations I've had; I think this approach does probably balance them slightly better.
Maybe we have a new component, LogPosition, that owns reads/writes for max_seq_id and we put clean_log() there? That seems fairly clean.
Spent a few minutes prototyping this:
- Added a new
SqliteDB
mixin,SqlSegmentLogPosition
. - The mixin requires
clean_log()
andack()
to be defined on the abstractSqlDB
to avoid a circular import (👎)
For the sake of writing things down, some of the things I've been considering throughout the iterations of this implementation:
clean_log()
assumes thatmax_seq_id
entries are kept up-to-date (suggests that mutation logic formax_seq_id
should be co-located withclean_log()
clean_log()
andmax_seq_id
are only applicable for single node (suggests that Consumer/Producer interfaces should not be modified)- "subscribers" are ephemeral, while "segments" are persistent--
max_seq_id
has one row per segment, not one row per subscriber - we should be able to get the
max_seq_id
of segments without having to load them into memory
I'd really like to just commit to a path at the point and stop bike shedding. It's not solely the fault of this thread, this feature has just been very delayed after chasing down the various bugs that fell out and the multiple permutations of the CIP (although I do think the user-facing side of this is far better after going through those permutations!).
This stack of pull requests is managed by Graphite. Learn more about stacking. Join @codetheweb and the rest of your teammates on Graphite |
aed02f9
to
4d920d6
Compare
388bfb3
to
a4c4180
Compare
4d920d6
to
d84489d
Compare
ccffe26
to
41a7131
Compare
a42d715
to
be69dde
Compare
d84489d
to
7c1d6d0
Compare
be69dde
to
ce949c8
Compare
embeddings_queue, system.instance(SegmentManager) | ||
) | ||
|
||
embeddings_queue.clean_log(coll.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exercises the max_seq_id migration--if the migration doesn't happen or fails, .clean_log()
will have no effect (because the max_seq_id for one of the segments is missing) and the invariant check immediately below will fail if the # of embeddings added is > sync threshold
7c1d6d0
to
baa1850
Compare
4e05b68
to
0d87067
Compare
rename clean_log -> purge_log |
ec1b8ea
to
901d9b0
Compare
6257bab
to
ad0eb64
Compare
…q_id` up-to-date
ad0eb64
to
39d23bd
Compare
Merge activity
|
Depends on #2545.
Changes:
clean_log()
method to producers (not called automatically in this PR).max_seq_id
is now used to track the maximum seen sequence ID for both metadata and vector segments (formerly only used by metadata segments).max_seq_id
table themselves.max_seq_id
field from the old pickled metadata file source into the database upon init.In this PR, log entries are deleted on a per-collection basis. The next PR in this stack deletes entries globally.