Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Automatic and CLI Log Purging not working with multiple collections #2605

Closed
tazarov opened this issue Jul 31, 2024 · 4 comments · Fixed by #2617
Closed

[Bug]: Automatic and CLI Log Purging not working with multiple collections #2605

tazarov opened this issue Jul 31, 2024 · 4 comments · Fixed by #2617
Assignees
Labels
bug Something isn't working

Comments

@tazarov
Copy link
Contributor

tazarov commented Jul 31, 2024

What happened?

Automatic and CLI log purging does not work correctly when there are multiple collections. purge_log() will pick the lowest seqId amongst all collections' segments and will clean things up from WAL up to that point leaving the rest of the WAL intact. This is problematic when users have multiple collections as more active collections will grow the WAL while a slow-moving or inactive collection will prevent purge_log() from properly cleaning the WAL.

image

To reproduce:

import threading
import random
import uuid
from concurrent.futures import ThreadPoolExecutor
import chromadb
import numpy as np

client = chromadb.PersistentClient("to_vacuum_or_not_to_vacuum2")

def add_and_delete(cid):
    tid = threading.get_ident()
    collection = client.get_or_create_collection(f"vac-{cid}")
    ids = [f"{uuid.uuid4()}" for _ in range(1000)]
    collection.add(ids=ids, embeddings=np.random.uniform(-1, 1, (1000, 1536)))
    print(f"{tid}: Added {len(ids)} embeddings to collection {collection.name}")
    collection.delete(ids=list(set([random.choice(ids) for _ in range(100)])))
    print(f"{tid}: Deleted 100 embeddings from collection {collection.name}")
    print(f"{tid} Done")

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(add_and_delete,i) for  i in range(10)]
    for future in futures:
        future.result()  # Ensure all threads complete
(venv) [chroma-taz-21]sqlite3 to_vacuum_or_not_to_vacuum2/chroma.sqlite3 'select count(*) from embeddings_queue'                                                                                          20:30:49  ☁  main ☂ ⚡ ✭
9956
(venv) [chroma-taz-21]python -m chromadb.cli.cli utils vacuum --path to_vacuum_or_not_to_vacuum2                                                                                                          20:32:37  ☁  main ☂ ⚡ ✭
Are you sure you want to vacuum the database? This will block both reads and writes to the database and may take a while. We recommend shutting down the server before running this command. Continue? [y/N]: y

🧼 vacuum complete! Database size reduced by 8.1MiB (⬇ 5.7%).
(venv) [chroma-taz-21]sqlite3 to_vacuum_or_not_to_vacuum2/chroma.sqlite3 'select count(*) from embeddings_queue'                                                                                          20:35:09  ☁  main ☂ ⚡ ✭
9956

Versions

ea3ec08 onwards

Relevant log output

No response

@tazarov tazarov added the bug Something isn't working label Jul 31, 2024
@tazarov
Copy link
Contributor Author

tazarov commented Jul 31, 2024

This is reproducible on a Chroma server:

docker build -t chroma:main .
docker run -it -v ./to_vacuum_or_not_to_vacuum3:/chroma/chroma -p 8000:8000 -e ALLOW_RESET=TRUE --rm chroma:main
import threading
import random
import uuid
from concurrent.futures import ThreadPoolExecutor
import chromadb
import numpy as np

client = chromadb.HttpClient()

def add_and_delete(cid):
    tid = threading.get_ident()
    collection = client.get_or_create_collection(f"vac-{cid}")
    ids = [f"{uuid.uuid4()}" for _ in range(1000)]
    collection.add(ids=ids, embeddings=np.random.uniform(-1, 1, (1000, 1536)))
    print(f"{tid}: Added {len(ids)} embeddings to collection {collection.name}")
    collection.delete(ids=list(set([random.choice(ids) for _ in range(100)])))
    print(f"{tid}: Deleted 100 embeddings from collection {collection.name}")
    print(f"{tid} Done")

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(add_and_delete,i) for  i in range(10)]
    for future in futures:
        future.result()  # Ensure all threads complete
(venv) [chroma-taz-21]sqlite3 to_vacuum_or_not_to_vacuum3/chroma.sqlite3 'select count(*) from embeddings_queue'                                                                                          20:53:53  ☁  main ☂ ⚡ ✭
9953
(venv) [chroma-taz-21] python -m chromadb.cli.cli utils vacuum --path to_vacuum_or_not_to_vacuum3                                                                                                         20:53:59  ☁  main ☂ ⚡ ✭
Are you sure you want to vacuum the database? This will block both reads and writes to the database and may take a while. We recommend shutting down the server before running this command. Continue? [y/N]: y

🧼 vacuum complete! Database size reduced by 8.1MiB (⬇ 5.7%).
(venv) [chroma-taz-21]sqlite3 to_vacuum_or_not_to_vacuum3/chroma.sqlite3 'select count(*) from embeddings_queue'                                                                                          20:54:17  ☁  main ☂ ⚡ ✭
9953

(venv) [chroma-taz-21] sqlite3 to_vacuum_or_not_to_vacuum3/chroma.sqlite3 'select count(*) from embeddings_queue group by topic'                                                                          20:55:01  ☁  main ☂ ⚡ ✭
1094
1095
99
1094
1096
1092
1093
1097
1097
1096

@tazarov
Copy link
Contributor Author

tazarov commented Jul 31, 2024

Actually upon further investigation the issue seems that automatic purging does not work when users have more than 1 collection:

(venv) [chroma-taz-21]sqlite3 to_vacuum_or_not_to_vacuum4/chroma.sqlite3 'select topic,count(*) from embeddings_queue group by topic'                                                                     21:00:45  ☁  main ☂ ⚡ ✭
persistent://default/default/61ca040c-b8f6-4d24-9565-0da4d7bbc962|1
persistent://default/default/8b785a0f-9dc3-47fe-8383-443806489eec|1000
persistent://default/default/a7a182b0-19f2-43a4-a52b-b596471f94ef|1000

@tazarov tazarov changed the title [Bug]: Multi-threading seems to break log_purge [Bug]: Log Purging with multiple collections Jul 31, 2024
@tazarov
Copy link
Contributor Author

tazarov commented Jul 31, 2024

Culprit:

min_seq_id = min(self.decode_seq_id(row[0]) for row in results)
else:
return
t = Table("embeddings_queue")
q = (
self.querybuilder()
.from_(t)
.where(t.seq_id < ParameterValue(min_seq_id))
.delete()
)
sql, params = get_sql(q, self.parameter_format())
cur.execute(sql, params)

It seems that choosing the min for multiple seq_ids and then deleting that would clean the WAL up to the collection with min seq_id, leaving the rest.

@atroyn atroyn assigned atroyn and codetheweb and unassigned atroyn Jul 31, 2024
@atroyn
Copy link
Contributor

atroyn commented Jul 31, 2024

Neither the description of the issue, nor the title, nor any of the comments explains what the issue actually is, only inputs and outputs. Please explicitly state the problem instead of creating a little puzzle for me to figure out to see if there's even a bug.

@tazarov tazarov changed the title [Bug]: Log Purging with multiple collections [Bug]: Automatic and CLI Log Purging not working with multiple collections Jul 31, 2024
@codetheweb codetheweb linked a pull request Aug 2, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants