From 501c48a58f768317e635ebd376b950f8047a4637 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Thu, 1 Aug 2024 17:48:01 -0700 Subject: [PATCH 1/4] [BUG] fix multi collection log purge --- chromadb/db/mixins/embeddings_queue.py | 7 +++++-- chromadb/ingest/__init__.py | 2 +- chromadb/logservice/logservice.py | 2 +- chromadb/test/db/test_log_purge.py | 26 ++++++++++++++++++++++++++ chromadb/test/property/invariants.py | 21 ++++++++++----------- 5 files changed, 43 insertions(+), 15 deletions(-) create mode 100644 chromadb/test/db/test_log_purge.py diff --git a/chromadb/db/mixins/embeddings_queue.py b/chromadb/db/mixins/embeddings_queue.py index ef8d7b348de..914e14e27a1 100644 --- a/chromadb/db/mixins/embeddings_queue.py +++ b/chromadb/db/mixins/embeddings_queue.py @@ -130,7 +130,7 @@ def delete_log(self, collection_id: UUID) -> None: @trace_method("SqlEmbeddingsQueue.purge_log", OpenTelemetryGranularity.ALL) @override - def purge_log(self) -> None: + def purge_log(self, collection_id: UUID) -> None: segments_t = Table("segments") segment_ids_q = ( self.querybuilder() @@ -140,6 +140,9 @@ def purge_log(self) -> None: # - > 1 has not never written to the max_seq_id table # In that case, we should not delete any WAL entries as we can't be sure that the all segments are caught up. .select(functions.Coalesce(Table("max_seq_id").seq_id, -1)) + .where( + segments_t.collection == ParameterValue(self.uuid_to_db(collection_id)) + ) .left_join(Table("max_seq_id")) .on(segments_t.id == Table("max_seq_id").segment_id) ) @@ -255,7 +258,7 @@ def submit_embeddings( self._notify_all(topic_name, embedding_records) if self.config.get_parameter("automatically_purge").value: - self.purge_log() + self.purge_log(collection_id) return seq_ids diff --git a/chromadb/ingest/__init__.py b/chromadb/ingest/__init__.py index 179e9e09eda..7d6d2a46e7b 100644 --- a/chromadb/ingest/__init__.py +++ b/chromadb/ingest/__init__.py @@ -42,7 +42,7 @@ def delete_log(self, collection_id: UUID) -> None: pass @abstractmethod - def purge_log(self) -> None: + def purge_log(self, collection_id: UUID) -> None: """Truncates the log for the given collection, removing all seen records.""" pass diff --git a/chromadb/logservice/logservice.py b/chromadb/logservice/logservice.py index 1c9c3477b53..78b7b8adeda 100644 --- a/chromadb/logservice/logservice.py +++ b/chromadb/logservice/logservice.py @@ -76,7 +76,7 @@ def delete_log(self, collection_id: UUID) -> None: @trace_method("LogService.purge_log", OpenTelemetryGranularity.ALL) @override - def purge_log(self) -> None: + def purge_log(self, collection_id: UUID) -> None: raise NotImplementedError("Not implemented") @trace_method("LogService.submit_embedding", OpenTelemetryGranularity.ALL) diff --git a/chromadb/test/db/test_log_purge.py b/chromadb/test/db/test_log_purge.py new file mode 100644 index 00000000000..3ceb635aa69 --- /dev/null +++ b/chromadb/test/db/test_log_purge.py @@ -0,0 +1,26 @@ +from chromadb.api.client import Client +from chromadb.config import System +from chromadb.test.property import invariants + + +def test_log_purge(sqlite_persistent: System) -> None: + client = Client.from_system(sqlite_persistent) + + first_collection = client.create_collection( + "first_collection", metadata={"hnsw:sync_threshold": 10, "hnsw:batch_size": 10} + ) + second_collection = client.create_collection( + "second_collection", metadata={"hnsw:sync_threshold": 10, "hnsw:batch_size": 10} + ) + collections = [first_collection, second_collection] + + # (Does not trigger a purge) + for i in range(5): + first_collection.add(ids=str(i), embeddings=[i, i]) + + # (Should trigger a purge) + for i in range(100): + second_collection.add(ids=str(i), embeddings=[i, i]) + + # The purge of the second collection should not be blocked by the first + invariants.log_size_below_max(client._system, collections, True) diff --git a/chromadb/test/property/invariants.py b/chromadb/test/property/invariants.py index 69987234a84..4bf83a1baf9 100644 --- a/chromadb/test/property/invariants.py +++ b/chromadb/test/property/invariants.py @@ -1,6 +1,5 @@ import gc import math -from chromadb.api.configuration import HNSWConfigurationInternal from chromadb.config import System from chromadb.db.base import get_sql from chromadb.db.impl.sqlite import SqliteDB @@ -334,7 +333,7 @@ def _total_embedding_queue_log_size(sqlite: SqliteDB) -> int: def log_size_below_max( - system: System, collection: Collection, has_collection_mutated: bool + system: System, collections: List[Collection], has_collection_mutated: bool ) -> None: sqlite = system.instance(SqliteDB) @@ -342,21 +341,21 @@ def log_size_below_max( # Must always keep one entry to avoid reusing seq_ids assert _total_embedding_queue_log_size(sqlite) >= 1 - hnsw_config = cast( - HNSWConfigurationInternal, - collection.get_model() - .get_configuration() - .get_parameter("hnsw_configuration") - .value, + # We purge per-collection as the sync_threshold is a per-collection setting + sync_threshold_sum = sum( + collection.metadata.get("hnsw:sync_threshold", 1000) + for collection in collections + ) + batch_size_sum = sum( + collection.metadata.get("hnsw:batch_size", 1000) + for collection in collections ) - sync_threshold = cast(int, hnsw_config.get_parameter("sync_threshold").value) - batch_size = cast(int, hnsw_config.get_parameter("batch_size").value) # -1 is used because the queue is always at least 1 entry long, so deletion stops before the max ack'ed sequence ID. # And if the batch_size != sync_threshold, the queue can have up to batch_size - 1 more entries. assert ( _total_embedding_queue_log_size(sqlite) - 1 - <= sync_threshold + batch_size - 1 + <= sync_threshold_sum + batch_size_sum - 1 ) else: assert _total_embedding_queue_log_size(sqlite) == 0 From e019169645086f2d992487f8b310208536847496 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Fri, 2 Aug 2024 13:27:43 -0700 Subject: [PATCH 2/4] Fix tests --- chromadb/db/mixins/embeddings_queue.py | 2 ++ chromadb/test/property/test_cross_version_persist.py | 6 +++--- chromadb/test/property/test_embeddings.py | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/chromadb/db/mixins/embeddings_queue.py b/chromadb/db/mixins/embeddings_queue.py index 914e14e27a1..324d6d0d894 100644 --- a/chromadb/db/mixins/embeddings_queue.py +++ b/chromadb/db/mixins/embeddings_queue.py @@ -131,6 +131,8 @@ def delete_log(self, collection_id: UUID) -> None: @trace_method("SqlEmbeddingsQueue.purge_log", OpenTelemetryGranularity.ALL) @override def purge_log(self, collection_id: UUID) -> None: + # (We need to purge on a per topic/collection basis, because the maximum sequence ID is tracked on a per topic/collection basis.) + segments_t = Table("segments") segment_ids_q = ( self.querybuilder() diff --git a/chromadb/test/property/test_cross_version_persist.py b/chromadb/test/property/test_cross_version_persist.py index fa44088ee35..91ea7bb76c1 100644 --- a/chromadb/test/property/test_cross_version_persist.py +++ b/chromadb/test/property/test_cross_version_persist.py @@ -372,8 +372,8 @@ def test_cycle_versions( embeddings_queue, system.instance(SegmentManager) ) - embeddings_queue.purge_log() - invariants.log_size_below_max(system, coll, True) + embeddings_queue.purge_log(coll.id) + invariants.log_size_below_max(system, [coll], True) # Should be able to add embeddings coll.add(**embeddings_strategy) # type: ignore @@ -383,7 +383,7 @@ def test_cycle_versions( invariants.documents_match(coll, embeddings_strategy) invariants.ids_match(coll, embeddings_strategy) invariants.ann_accuracy(coll, embeddings_strategy) - invariants.log_size_below_max(system, coll, True) + invariants.log_size_below_max(system, [coll], True) # Shutdown system system.stop() diff --git a/chromadb/test/property/test_embeddings.py b/chromadb/test/property/test_embeddings.py index 1076a96ab3e..dc53bbc52d7 100644 --- a/chromadb/test/property/test_embeddings.py +++ b/chromadb/test/property/test_embeddings.py @@ -222,7 +222,7 @@ def fields_match(self) -> None: def log_size_below_max(self) -> None: system: System = self.client._system # type: ignore invariants.log_size_below_max( - system, self.collection, self.has_collection_mutated + system, [self.collection], self.has_collection_mutated ) def _upsert_embeddings(self, record_set: strategies.RecordSet) -> None: From 088615b97aa465ab399455028a873b033ea76ff5 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Fri, 2 Aug 2024 13:47:24 -0700 Subject: [PATCH 3/4] Fix failing instance of chroma utils vacuum --- chromadb/cli/cli.py | 13 ++++++++++--- chromadb/test/property/invariants.py | 6 +++++- chromadb/test/test_cli.py | 24 ++++++++++++++++++++++++ 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/chromadb/cli/cli.py b/chromadb/cli/cli.py index 7bc6f116af0..bd44dfdda81 100644 --- a/chromadb/cli/cli.py +++ b/chromadb/cli/cli.py @@ -131,6 +131,7 @@ def vacuum( settings.is_persistent = True settings.persist_directory = path system = System(settings=settings) + system.start() sqlite = system.instance(SqliteDB) directory_size_before_vacuum = get_directory_size(path) @@ -142,18 +143,24 @@ def vacuum( TextColumn("[progress.description]{task.description}"), transient=True, ) as progress: - task = progress.add_task("Purging the log...") + with sqlite.tx() as cur: + cur.execute("SELECT id FROM collections") + collection_ids = [row[0] for row in cur.fetchall()] + + task = progress.add_task("Purging the log...", total=len(collection_ids)) try: # Cleaning the log after upgrading to >=0.6 is dependent on vector segments migrating their max_seq_id from the pickled metadata file to SQLite. # Vector segments migrate this field automatically on init, but at this point the segment has not been loaded yet. trigger_vector_segments_max_seq_id_migration( sqlite, system.instance(SegmentManager) ) - sqlite.purge_log() + + for collection_id in collection_ids: + sqlite.purge_log(collection_id) + progress.update(task, advance=1) except Exception as e: console.print(f"[bold red]Error purging the log:[/bold red] {e}") raise typer.Exit(code=1) - progress.update(task, advance=100) task = progress.add_task("Vacuuming (this may take a while)...") try: diff --git a/chromadb/test/property/invariants.py b/chromadb/test/property/invariants.py index 4bf83a1baf9..0a3abe7bc6f 100644 --- a/chromadb/test/property/invariants.py +++ b/chromadb/test/property/invariants.py @@ -344,10 +344,14 @@ def log_size_below_max( # We purge per-collection as the sync_threshold is a per-collection setting sync_threshold_sum = sum( collection.metadata.get("hnsw:sync_threshold", 1000) + if collection.metadata is not None + else 1000 for collection in collections ) batch_size_sum = sum( - collection.metadata.get("hnsw:batch_size", 1000) + collection.metadata.get("hnsw:batch_size", 100) + if collection.metadata is not None + else 100 for collection in collections ) diff --git a/chromadb/test/test_cli.py b/chromadb/test/test_cli.py index 5eb593a4098..d6c82a27db5 100644 --- a/chromadb/test/test_cli.py +++ b/chromadb/test/test_cli.py @@ -4,12 +4,17 @@ from typer.testing import CliRunner +from chromadb.api.client import Client +from chromadb.api.models.Collection import Collection from chromadb.cli.cli import app from chromadb.cli.utils import set_log_file_path from chromadb.config import Settings, System from chromadb.db.base import get_sql from chromadb.db.impl.sqlite import SqliteDB from pypika import Table +import numpy as np + +from chromadb.test.property import invariants runner = CliRunner() @@ -44,6 +49,19 @@ def test_vacuum(sqlite_persistent: System) -> None: config.set_parameter("automatically_purge", False) sqlite.set_config(config) + # Add some data + client = Client.from_system(system) + collection1 = client.create_collection("collection1") + collection2 = client.create_collection("collection2") + + def add_records(collection: Collection, num: int) -> None: + ids = [str(i) for i in range(num)] + embeddings = np.random.rand(num, 2) + collection.add(ids=ids, embeddings=embeddings) + + add_records(collection1, 100) + add_records(collection2, 2_000) + # Maintenance log should be empty with sqlite.tx() as cur: t = Table("maintenance_log") @@ -70,8 +88,14 @@ def test_vacuum(sqlite_persistent: System) -> None: assert rows[0][2] == "vacuum" # Automatic pruning should have been enabled + del ( + sqlite.config + ) # the CLI will end up starting a new instance of sqlite, so we need to force-refresh the cached config here assert sqlite.config.get_parameter("automatically_purge").value + # Log should be clean + invariants.log_size_below_max(system, [collection1, collection2], True) + def simulate_transactional_write( settings: Settings, ready_event: Event, shutdown_event: Event From bca409fe6ae6f081351428abcfcf0de55fc95714 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Tue, 6 Aug 2024 16:04:15 -0700 Subject: [PATCH 4/4] Use list_collections() --- chromadb/cli/cli.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/chromadb/cli/cli.py b/chromadb/cli/cli.py index bd44dfdda81..14ab1bc34a3 100644 --- a/chromadb/cli/cli.py +++ b/chromadb/cli/cli.py @@ -9,6 +9,7 @@ import os import webbrowser +from chromadb.api.client import Client from chromadb.cli.utils import get_directory_size, set_log_file_path, sizeof_fmt from chromadb.config import Settings, System from chromadb.db.impl.sqlite import SqliteDB @@ -132,6 +133,7 @@ def vacuum( settings.persist_directory = path system = System(settings=settings) system.start() + client = Client.from_system(system) sqlite = system.instance(SqliteDB) directory_size_before_vacuum = get_directory_size(path) @@ -143,11 +145,8 @@ def vacuum( TextColumn("[progress.description]{task.description}"), transient=True, ) as progress: - with sqlite.tx() as cur: - cur.execute("SELECT id FROM collections") - collection_ids = [row[0] for row in cur.fetchall()] - - task = progress.add_task("Purging the log...", total=len(collection_ids)) + collections = client.list_collections() + task = progress.add_task("Purging the log...", total=len(collections)) try: # Cleaning the log after upgrading to >=0.6 is dependent on vector segments migrating their max_seq_id from the pickled metadata file to SQLite. # Vector segments migrate this field automatically on init, but at this point the segment has not been loaded yet. @@ -155,8 +154,8 @@ def vacuum( sqlite, system.instance(SegmentManager) ) - for collection_id in collection_ids: - sqlite.purge_log(collection_id) + for collection in collections: + sqlite.purge_log(collection_id=collection.id) progress.update(task, advance=1) except Exception as e: console.print(f"[bold red]Error purging the log:[/bold red] {e}")