Skip to content

Commit

Permalink
[ENH] Evict other versions of hnsw index of the collection when anoth…
Browse files Browse the repository at this point in the history
…er version is fetched (#2707)

## Description of changes

*Summarize the changes made by this PR.*
It was seen that query nodes OOM in load tests. One area where we could
optimize the memory footprint is the HNSW index cache. This PR restricts
the cache to keep one entry per collection. Ideally, we would like the
index version of this entry to be of the latest but there is no
guarantee with this PR. Once the versioning changes land, we could
improve this. For e.g. one bad case could be:
For the same collection:
1. get index version v1
2. get index version v2 (> v1)
3. get index version v1 (can happen due to an inflight query that
started before compaction of v2 occured) -- this will evict v2 even
though it is more recent and will be used again in future

## Test plan
- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
None
  • Loading branch information
sanketkedia authored Aug 22, 2024
1 parent 003bf67 commit 9661326
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
33 changes: 27 additions & 6 deletions rust/index/src/hnsw_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use chroma_types::Segment;
use futures::stream;
use futures::stream::StreamExt;
use parking_lot::RwLock;
use rand::seq::index;
use std::fmt::Debug;
use std::path::Path;
use std::{path::PathBuf, sync::Arc};
Expand All @@ -34,6 +35,19 @@ const FILES: [&'static str; 4] = [
"link_lists.bin",
];

// The key of the cache is the collection id and the value is
// the HNSW index for that collection. This restricts the cache to
// contain atmost one index per collection. Ideally, we would like
// this index to be the latest index for that collection but rn it
// is not guaranteed. For e.g. one case could be:
// 1. get index version v1
// 2. get index version v2 (> v1)
// 3. get index version v1 (can happen due to an inflight query
// that started before compaction of v2 occured) -- this will
// evict v2 even though it is more recent and will be used again in future.
// Once we have versioning propagated throughout the system we can make
// this better. We can also do a deferred eviction for such entries when
// their ref count goes to 0.
#[derive(Clone)]
pub struct HnswIndexProvider {
cache: Cache<Uuid, Arc<RwLock<HnswIndex>>>,
Expand Down Expand Up @@ -79,9 +93,16 @@ impl HnswIndexProvider {
}
}

pub fn get(&self, id: &Uuid) -> Option<Arc<RwLock<HnswIndex>>> {
match self.cache.get(id) {
Some(index) => Some(index.clone()),
pub fn get(&self, index_id: &Uuid, collection_id: &Uuid) -> Option<Arc<RwLock<HnswIndex>>> {
match self.cache.get(collection_id) {
Some(index) => {
let index_with_lock = index.read();
if index_with_lock.id == *index_id {
// Clone is cheap because we are just cloning the Arc.
return Some(index.clone());
}
return None;
}
None => None,
}
}
Expand Down Expand Up @@ -144,7 +165,7 @@ impl HnswIndexProvider {
match HnswIndex::load(storage_path_str, &index_config, new_id) {
Ok(index) => {
let index = Arc::new(RwLock::new(index));
self.cache.insert(new_id, index.clone());
self.cache.insert(segment.collection, index.clone());
Ok(index)
}
Err(e) => Err(Box::new(HnswIndexProviderForkError::IndexLoadError(e))),
Expand Down Expand Up @@ -262,7 +283,7 @@ impl HnswIndexProvider {
match HnswIndex::load(index_storage_path.to_str().unwrap(), &index_config, *id) {
Ok(index) => {
let index = Arc::new(RwLock::new(index));
self.cache.insert(*id, index.clone());
self.cache.insert(segment.collection, index.clone());
Ok(index)
}
Err(e) => Err(Box::new(HnswIndexProviderOpenError::IndexLoadError(e))),
Expand Down Expand Up @@ -316,7 +337,7 @@ impl HnswIndexProvider {
}
};
let index = Arc::new(RwLock::new(index));
self.cache.insert(id, index.clone());
self.cache.insert(segment.collection, index.clone());
Ok(index)
}

Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/segment/distributed_hnsw_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ impl DistributedHNSWSegmentReader {
};

let index =
match hnsw_index_provider.get(&index_uuid) {
match hnsw_index_provider.get(&index_uuid, &segment.collection) {
Some(index) => index,
None => {
match hnsw_index_provider
Expand Down

0 comments on commit 9661326

Please sign in to comment.