Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] fix multi collection log purge #2617

Merged
merged 4 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions chromadb/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def vacuum(
settings.is_persistent = True
settings.persist_directory = path
system = System(settings=settings)
system.start()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixes a bug where if trigger_vector_segments_max_seq_id_migration() below attempted to load a segment, it would throw because the SQLite component wasn't technically started
this scenario happens when upgrading from an old version or when any collection had not yet hit its first sync_threshold persist trigger
was not caught because the CLI test didn't add any collections, I updated the test to trigger this path

sqlite = system.instance(SqliteDB)

directory_size_before_vacuum = get_directory_size(path)
Expand All @@ -142,18 +143,24 @@ def vacuum(
TextColumn("[progress.description]{task.description}"),
transient=True,
) as progress:
task = progress.add_task("Purging the log...")
with sqlite.tx() as cur:
cur.execute("SELECT id FROM collections")
collection_ids = [row[0] for row in cur.fetchall()]

task = progress.add_task("Purging the log...", total=len(collection_ids))
codetheweb marked this conversation as resolved.
Show resolved Hide resolved
try:
# Cleaning the log after upgrading to >=0.6 is dependent on vector segments migrating their max_seq_id from the pickled metadata file to SQLite.
# Vector segments migrate this field automatically on init, but at this point the segment has not been loaded yet.
trigger_vector_segments_max_seq_id_migration(
sqlite, system.instance(SegmentManager)
)
sqlite.purge_log()

for collection_id in collection_ids:
sqlite.purge_log(collection_id)
codetheweb marked this conversation as resolved.
Show resolved Hide resolved
progress.update(task, advance=1)
except Exception as e:
console.print(f"[bold red]Error purging the log:[/bold red] {e}")
raise typer.Exit(code=1)
progress.update(task, advance=100)

task = progress.add_task("Vacuuming (this may take a while)...")
try:
Expand Down
9 changes: 7 additions & 2 deletions chromadb/db/mixins/embeddings_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ def delete_log(self, collection_id: UUID) -> None:

@trace_method("SqlEmbeddingsQueue.purge_log", OpenTelemetryGranularity.ALL)
@override
def purge_log(self) -> None:
def purge_log(self, collection_id: UUID) -> None:
# (We need to purge on a per topic/collection basis, because the maximum sequence ID is tracked on a per topic/collection basis.)

segments_t = Table("segments")
segment_ids_q = (
self.querybuilder()
Expand All @@ -140,6 +142,9 @@ def purge_log(self) -> None:
# - > 1 has not never written to the max_seq_id table
# In that case, we should not delete any WAL entries as we can't be sure that the all segments are caught up.
.select(functions.Coalesce(Table("max_seq_id").seq_id, -1))
.where(
segments_t.collection == ParameterValue(self.uuid_to_db(collection_id))
)
.left_join(Table("max_seq_id"))
.on(segments_t.id == Table("max_seq_id").segment_id)
)
Expand Down Expand Up @@ -255,7 +260,7 @@ def submit_embeddings(
self._notify_all(topic_name, embedding_records)

if self.config.get_parameter("automatically_purge").value:
self.purge_log()
self.purge_log(collection_id)

return seq_ids

Expand Down
2 changes: 1 addition & 1 deletion chromadb/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def delete_log(self, collection_id: UUID) -> None:
pass

@abstractmethod
def purge_log(self) -> None:
def purge_log(self, collection_id: UUID) -> None:
"""Truncates the log for the given collection, removing all seen records."""
pass

Expand Down
2 changes: 1 addition & 1 deletion chromadb/logservice/logservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def delete_log(self, collection_id: UUID) -> None:

@trace_method("LogService.purge_log", OpenTelemetryGranularity.ALL)
@override
def purge_log(self) -> None:
def purge_log(self, collection_id: UUID) -> None:
raise NotImplementedError("Not implemented")

@trace_method("LogService.submit_embedding", OpenTelemetryGranularity.ALL)
Expand Down
26 changes: 26 additions & 0 deletions chromadb/test/db/test_log_purge.py
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alternative: a new multi-collection state machine that creates records
(this is harder than just modifying the existing multi-collection state machine because invariants can't inspect bundles)

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from chromadb.api.client import Client
from chromadb.config import System
from chromadb.test.property import invariants


def test_log_purge(sqlite_persistent: System) -> None:
client = Client.from_system(sqlite_persistent)

first_collection = client.create_collection(
"first_collection", metadata={"hnsw:sync_threshold": 10, "hnsw:batch_size": 10}
)
second_collection = client.create_collection(
"second_collection", metadata={"hnsw:sync_threshold": 10, "hnsw:batch_size": 10}
)
collections = [first_collection, second_collection]

# (Does not trigger a purge)
for i in range(5):
first_collection.add(ids=str(i), embeddings=[i, i])

# (Should trigger a purge)
for i in range(100):
second_collection.add(ids=str(i), embeddings=[i, i])

# The purge of the second collection should not be blocked by the first
invariants.log_size_below_max(client._system, collections, True)
25 changes: 14 additions & 11 deletions chromadb/test/property/invariants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import gc
import math
from chromadb.api.configuration import HNSWConfigurationInternal
from chromadb.config import System
from chromadb.db.base import get_sql
from chromadb.db.impl.sqlite import SqliteDB
Expand Down Expand Up @@ -334,29 +333,33 @@ def _total_embedding_queue_log_size(sqlite: SqliteDB) -> int:


def log_size_below_max(
system: System, collection: Collection, has_collection_mutated: bool
system: System, collections: List[Collection], has_collection_mutated: bool
) -> None:
sqlite = system.instance(SqliteDB)

if has_collection_mutated:
# Must always keep one entry to avoid reusing seq_ids
assert _total_embedding_queue_log_size(sqlite) >= 1

hnsw_config = cast(
HNSWConfigurationInternal,
collection.get_model()
.get_configuration()
.get_parameter("hnsw_configuration")
.value,
Comment on lines -346 to -350
Copy link
Contributor Author

@codetheweb codetheweb Aug 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using .get_model().get_configuration() was always returning the default config
not sure if this was due to a recent change because I'm pretty sure it was working correctly when I added it

# We purge per-collection as the sync_threshold is a per-collection setting
sync_threshold_sum = sum(
collection.metadata.get("hnsw:sync_threshold", 1000)
if collection.metadata is not None
else 1000
for collection in collections
)
batch_size_sum = sum(
collection.metadata.get("hnsw:batch_size", 100)
if collection.metadata is not None
else 100
for collection in collections
)
sync_threshold = cast(int, hnsw_config.get_parameter("sync_threshold").value)
batch_size = cast(int, hnsw_config.get_parameter("batch_size").value)

# -1 is used because the queue is always at least 1 entry long, so deletion stops before the max ack'ed sequence ID.
# And if the batch_size != sync_threshold, the queue can have up to batch_size - 1 more entries.
assert (
_total_embedding_queue_log_size(sqlite) - 1
<= sync_threshold + batch_size - 1
<= sync_threshold_sum + batch_size_sum - 1
)
else:
assert _total_embedding_queue_log_size(sqlite) == 0
6 changes: 3 additions & 3 deletions chromadb/test/property/test_cross_version_persist.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ def test_cycle_versions(
embeddings_queue, system.instance(SegmentManager)
)

embeddings_queue.purge_log()
invariants.log_size_below_max(system, coll, True)
embeddings_queue.purge_log(coll.id)
invariants.log_size_below_max(system, [coll], True)

# Should be able to add embeddings
coll.add(**embeddings_strategy) # type: ignore
Expand All @@ -383,7 +383,7 @@ def test_cycle_versions(
invariants.documents_match(coll, embeddings_strategy)
invariants.ids_match(coll, embeddings_strategy)
invariants.ann_accuracy(coll, embeddings_strategy)
invariants.log_size_below_max(system, coll, True)
invariants.log_size_below_max(system, [coll], True)

# Shutdown system
system.stop()
2 changes: 1 addition & 1 deletion chromadb/test/property/test_embeddings.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def fields_match(self) -> None:
def log_size_below_max(self) -> None:
system: System = self.client._system # type: ignore
invariants.log_size_below_max(
system, self.collection, self.has_collection_mutated
system, [self.collection], self.has_collection_mutated
)

def _upsert_embeddings(self, record_set: strategies.RecordSet) -> None:
Expand Down
24 changes: 24 additions & 0 deletions chromadb/test/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@

from typer.testing import CliRunner

from chromadb.api.client import Client
from chromadb.api.models.Collection import Collection
from chromadb.cli.cli import app
from chromadb.cli.utils import set_log_file_path
from chromadb.config import Settings, System
from chromadb.db.base import get_sql
from chromadb.db.impl.sqlite import SqliteDB
from pypika import Table
import numpy as np

from chromadb.test.property import invariants

runner = CliRunner()

Expand Down Expand Up @@ -44,6 +49,19 @@ def test_vacuum(sqlite_persistent: System) -> None:
config.set_parameter("automatically_purge", False)
sqlite.set_config(config)

# Add some data
client = Client.from_system(system)
collection1 = client.create_collection("collection1")
collection2 = client.create_collection("collection2")

def add_records(collection: Collection, num: int) -> None:
ids = [str(i) for i in range(num)]
embeddings = np.random.rand(num, 2)
collection.add(ids=ids, embeddings=embeddings)

add_records(collection1, 100)
add_records(collection2, 2_000)

# Maintenance log should be empty
with sqlite.tx() as cur:
t = Table("maintenance_log")
Expand All @@ -70,8 +88,14 @@ def test_vacuum(sqlite_persistent: System) -> None:
assert rows[0][2] == "vacuum"

# Automatic pruning should have been enabled
del (
sqlite.config
) # the CLI will end up starting a new instance of sqlite, so we need to force-refresh the cached config here
assert sqlite.config.get_parameter("automatically_purge").value

# Log should be clean
invariants.log_size_below_max(system, [collection1, collection2], True)


def simulate_transactional_write(
settings: Settings, ready_event: Event, shutdown_event: Event
Expand Down
Loading