Skip to content

Commit

Permalink
avoid unneeded copy on touching but non-overlaping byte ranges
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a committed Nov 30, 2023
1 parent 546df3d commit 904dfd9
Showing 1 changed file with 132 additions and 9 deletions.
141 changes: 132 additions & 9 deletions quickwit/quickwit-storage/src/cache/byte_range_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,23 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
let key = CacheKey::from_borrowed(tag, byte_range.start);
let (k, v) = if let Some((k, v)) = self.get_block(&key, byte_range.end) {
(k, v)
} else if let Some((k, v)) = self.merge_ranges(&key, byte_range.end) {
(k, v)
} else {
self.cache_counters.misses_num_items.inc();
return None;
};

let start = byte_range.start - k.range_start;
let end = byte_range.end - k.range_start;
let result = v.bytes.slice(start..end);

self.cache_counters.hits_num_items.inc();
self.cache_counters
.hits_num_bytes
.inc_by((end - start) as u64);

Some(v.bytes.slice(start..end))
Some(result)
}

fn put_slice(&mut self, tag: T::Owned, byte_range: Range<usize>, bytes: OwnedBytes) {
Expand All @@ -106,13 +109,13 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
return;
}

// try to find a block with which we overlap (and not just touch)
let start_key = CacheKey::from_borrowed(tag.borrow(), byte_range.start);
let end_key = CacheKey::from_borrowed(tag.borrow(), byte_range.end);

let first_matching_block = self
.get_block(&start_key, byte_range.start)
.get_block(&start_key, byte_range.start + 1)
.map(|(k, _v)| k);

let end_key = CacheKey::from_borrowed(tag.borrow(), byte_range.end - 1);
let last_matching_block = self.get_block(&end_key, byte_range.end).map(|(k, _v)| k);

if first_matching_block.is_some() && first_matching_block == last_matching_block {
Expand Down Expand Up @@ -202,17 +205,86 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
}

// Return a block that contain everything between query.range_start and range_end
fn get_block<'a, 'b: 'a>(
&'a self,
query: &CacheKey<'b, T>,
fn get_block<'a>(
&self,
query: &CacheKey<'a, T>,
range_end: usize,
) -> Option<(&CacheKey<'_, T>, &CacheValue)> {
) -> Option<(&CacheKey<'a, T>, &CacheValue)> {
self.cache
.range(..=query)
.next_back()
.filter(|(k, v)| k.tag == query.tag && range_end <= v.range_end)
}

/// Try to merge all blocks in the given range. Fails if some bytes were not already stored.
fn merge_ranges<'a>(
&mut self,
start: &CacheKey<'a, T>,
range_end: usize,
) -> Option<(&CacheKey<'a, T>, &CacheValue)> {
let own_key = |key: &CacheKey<T>| {
CacheKey::from_owned(T::borrow(&key.tag).to_owned(), key.range_start)
};

let first_block = self.get_block(start, start.range_start)?;

let overlapping_blocks = self
.cache
.range(first_block.0..)
.take_while(|(k, _)| k.tag == start.tag && k.range_start <= range_end);

let mut last_block = first_block;
for (k, v) in overlapping_blocks.clone().skip(1) {
if k.range_start != last_block.1.range_end {
// we can't have overlap by how we build, so we have a gap
return None;
}

last_block = (k, v);
}
if last_block.1.range_end < range_end {
// we got a gap at the end
return None;
}

// we have everything we need
let mut buffer = Vec::with_capacity(last_block.1.range_end - first_block.0.range_start);
let mut part_count = 0i64;
for (_, v) in overlapping_blocks {
part_count += 1;
buffer.extend_from_slice(&v.bytes);
}
assert_eq!(
buffer.len(),
(last_block.1.range_end - first_block.0.range_start)
);

let new_key = own_key(first_block.0);
let new_value = CacheValue {
range_end: last_block.1.range_end,
bytes: OwnedBytes::new(buffer),
};

// cleanup is sub-optimal, we'd need a BTreeMap::drain_range or something like that
let last_key = own_key(last_block.0);

let blocks_to_remove: Vec<_> = self
.cache
.range(&new_key..=&last_key)
.map(|(k, _)| own_key(k))
.collect();
for block in blocks_to_remove {
self.cache.remove(&block);
}

self.cache.insert(new_key, new_value);

self.num_items -= (part_count - 1) as u64;
self.cache_counters.in_cache_count.sub(part_count - 1);

self.get_block(start, range_end)
}

fn update_counter_record_item(&mut self, num_bytes: usize) {
self.num_items += 1;
self.num_bytes += num_bytes as u64;
Expand Down Expand Up @@ -352,7 +424,9 @@ mod tests {
count_items(tagged_state)
})
.sum();
assert_eq!(cache.inner.lock().unwrap().num_items, expected_item_count as u64);
// in some case we have ranges touching each other, count_items count them
// as only one, but cache count them as 2.
assert!(cache.inner.lock().unwrap().num_items >= expected_item_count as u64);

let expected_byte_count = state.values()
.flatten()
Expand Down Expand Up @@ -391,4 +465,53 @@ mod tests {
})
.1
}

#[test]
fn test_byte_range_cache_doesnt_merge_unnecessarily() {
let cache = ByteRangeCache::with_infinite_capacity(&CACHE_METRICS_FOR_TESTS);

let key: std::path::PathBuf = "key".into();

cache.put_slice(
key.clone(),
0..5,
OwnedBytes::new((0..5).collect::<Vec<_>>()),
);
cache.put_slice(
key.clone(),
5..10,
OwnedBytes::new((5..10).collect::<Vec<_>>()),
);
cache.put_slice(
key.clone(),
10..15,
OwnedBytes::new((10..15).collect::<Vec<_>>()),
);
cache.put_slice(
key.clone(),
15..20,
OwnedBytes::new((15..20).collect::<Vec<_>>()),
);

{
let mutable_cache = cache.inner.lock().unwrap();
assert_eq!(mutable_cache.cache.len(), 4);
assert_eq!(mutable_cache.num_items, 4);
assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 4);
assert_eq!(mutable_cache.num_bytes, 20);
assert_eq!(mutable_cache.cache_counters.in_cache_num_bytes.get(), 20);
}

cache.get_slice(&key, 3..12).unwrap();

{
// now they should've been merged, except the last one
let mutable_cache = cache.inner.lock().unwrap();
assert_eq!(mutable_cache.cache.len(), 2);
assert_eq!(mutable_cache.num_items, 2);
assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 2);
assert_eq!(mutable_cache.num_bytes, 20);
assert_eq!(mutable_cache.cache_counters.in_cache_num_bytes.get(), 20);
}
}
}

0 comments on commit 904dfd9

Please sign in to comment.