Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid unneeded copy on touching but non-overlaping byte ranges #4219

Merged
merged 3 commits into from
Dec 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 150 additions & 10 deletions quickwit/quickwit-storage/src/cache/byte_range_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,23 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
let key = CacheKey::from_borrowed(tag, byte_range.start);
let (k, v) = if let Some((k, v)) = self.get_block(&key, byte_range.end) {
(k, v)
} else if let Some((k, v)) = self.merge_ranges(&key, byte_range.end) {
(k, v)
} else {
self.cache_counters.misses_num_items.inc();
return None;
};

let start = byte_range.start - k.range_start;
let end = byte_range.end - k.range_start;
let result = v.bytes.slice(start..end);

self.cache_counters.hits_num_items.inc();
self.cache_counters
.hits_num_bytes
.inc_by((end - start) as u64);

Some(v.bytes.slice(start..end))
Some(result)
}

fn put_slice(&mut self, tag: T::Owned, byte_range: Range<usize>, bytes: OwnedBytes) {
Expand All @@ -106,13 +109,13 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
return;
}

// try to find a block with which we overlap (and not just touch)
let start_key = CacheKey::from_borrowed(tag.borrow(), byte_range.start);
let end_key = CacheKey::from_borrowed(tag.borrow(), byte_range.end);

let first_matching_block = self
.get_block(&start_key, byte_range.start)
.get_block(&start_key, byte_range.start + 1)
.map(|(k, _v)| k);

let end_key = CacheKey::from_borrowed(tag.borrow(), byte_range.end - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my understanding is that with this, we remove (as opposed to merge) any data that is overlapping (not just is included in) with the slice we are trying to put.

Is my understanding correct?
If so can we comment this more clearly?

possibly isolate in a different method:
fn remove_overlapping_slices(&mut self, ....)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand your query.
In this function, the only change is to what we consider for merge.
Before, if someone would put [10..15), we would search for something where either starting or ending bound (or both) is in [10..=15]. This includes [5..10) and [15..20).
After, we only consider things in [11..=14], that is, things with which we share at least one byte. If we share something, the merge happens as before, if the new rule consider we don't overlap, the range is added to the cache without merging anything

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes: correction.
we merge any data that is overlapping (not just touches) with the slice we are trying to put.

could we comment what we are doing, or even better isolate it into a method?

let last_matching_block = self.get_block(&end_key, byte_range.end).map(|(k, _v)| k);

if first_matching_block.is_some() && first_matching_block == last_matching_block {
Expand Down Expand Up @@ -140,15 +143,19 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
.unwrap_or(true);

let (final_range, final_bytes) = if can_drop_first && can_drop_last {
// if we are here, either ther was no overlapping block, or there was, but this buffer
// covers entirely every block it overlapped with. There is no merging to do.
(byte_range, bytes)
} else {
// if we are here, we have to do some merging

// first find the final buffer start and end position.
let start = if can_drop_first {
byte_range.start
} else {
// if no first, can_drop_first is true
overlapping.first().unwrap().start
};

let end = if can_drop_last {
byte_range.end
} else {
Expand All @@ -158,6 +165,8 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {

let mut buffer = Vec::with_capacity(end - start);

// if this buffer overlap, but does not contain the 1st buffer, copy the
// non-overlapping part at the start of the final buffer.
if !can_drop_first {
let first_range = overlapping.first().unwrap();
let key = CacheKey::from_borrowed(tag.borrow(), first_range.start);
Expand All @@ -167,8 +176,11 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
buffer.extend_from_slice(&block.bytes[..len]);
}

// copy the entire current buffer
buffer.extend_from_slice(&bytes);

// if this buffer overlap, but does not contain the last buffer, copy the
// non-overlapping part ad the end of the final buffer.
if !can_drop_last {
let last_range = overlapping.last().unwrap();
let key = CacheKey::from_borrowed(tag.borrow(), last_range.start);
Expand All @@ -178,6 +190,7 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
buffer.extend_from_slice(&block.bytes[start..]);
}

// sanity check, we copied as much as expected
debug_assert_eq!(end - start, buffer.len());

(start..end, OwnedBytes::new(buffer))
Expand All @@ -187,11 +200,14 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
// in the loop. It works with .get() instead of .remove() (?).
let mut key = CacheKey::from_owned(tag, 0);
for range in overlapping.into_iter() {
// remove every block with which we overlapped, including the 1st and last, as they
// were included as prefix/suffix to the final block.
key.range_start = range.start;
self.cache.remove(&key);
self.update_counter_drop_item(range.end - range.start);
}

// and finaly insert the newly added buffer
key.range_start = final_range.start;
let value = CacheValue {
range_end: final_range.end,
Expand All @@ -202,17 +218,90 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
}

// Return a block that contain everything between query.range_start and range_end
fn get_block<'a, 'b: 'a>(
&'a self,
query: &CacheKey<'b, T>,
fn get_block<'a>(
&self,
query: &CacheKey<'a, T>,
range_end: usize,
) -> Option<(&CacheKey<'_, T>, &CacheValue)> {
) -> Option<(&CacheKey<'a, T>, &CacheValue)> {
self.cache
.range(..=query)
.next_back()
.filter(|(k, v)| k.tag == query.tag && range_end <= v.range_end)
}

/// Try to merge all blocks in the given range. Fails if some bytes were not already stored.
fn merge_ranges<'a>(
&mut self,
start: &CacheKey<'a, T>,
range_end: usize,
) -> Option<(&CacheKey<'a, T>, &CacheValue)> {
let own_key = |key: &CacheKey<T>| {
CacheKey::from_owned(T::borrow(&key.tag).to_owned(), key.range_start)
};

let first_block = self.get_block(start, start.range_start)?;

// query cache for all blocks which overlap with our query
let overlapping_blocks = self
.cache
.range(first_block.0..)
.take_while(|(k, _)| k.tag == start.tag && k.range_start <= range_end);

// verify there are no hole, and each range touches the next one. There can't be overlap
// due to how we fill our data-structure.
let mut last_block = first_block;
for (k, v) in overlapping_blocks.clone().skip(1) {
if k.range_start != last_block.1.range_end {
return None;
}

last_block = (k, v);
}
if last_block.1.range_end < range_end {
// we got a gap at the end
return None;
}

// we have everything we need. Merge every sub-buffer into a single large buffer.
let mut buffer = Vec::with_capacity(last_block.1.range_end - first_block.0.range_start);
let mut part_count = 0i64;
for (_, v) in overlapping_blocks {
part_count += 1;
buffer.extend_from_slice(&v.bytes);
}
assert_eq!(
buffer.len(),
(last_block.1.range_end - first_block.0.range_start)
);

let new_key = own_key(first_block.0);
let new_value = CacheValue {
range_end: last_block.1.range_end,
bytes: OwnedBytes::new(buffer),
};

// cleanup is sub-optimal, we'd need a BTreeMap::drain_range or something like that
let last_key = own_key(last_block.0);

// remove previous buffers from the cache
let blocks_to_remove: Vec<_> = self
.cache
.range(&new_key..=&last_key)
.map(|(k, _)| own_key(k))
.collect();
for block in blocks_to_remove {
self.cache.remove(&block);
}

// and insert the new merged buffer
self.cache.insert(new_key, new_value);

self.num_items -= (part_count - 1) as u64;
self.cache_counters.in_cache_count.sub(part_count - 1);

self.get_block(start, range_end)
}

fn update_counter_record_item(&mut self, num_bytes: usize) {
self.num_items += 1;
self.num_bytes += num_bytes as u64;
Expand Down Expand Up @@ -352,7 +441,9 @@ mod tests {
count_items(tagged_state)
})
.sum();
assert_eq!(cache.inner.lock().unwrap().num_items, expected_item_count as u64);
// in some case we have ranges touching each other, count_items count them
// as only one, but cache count them as 2.
assert!(cache.inner.lock().unwrap().num_items >= expected_item_count as u64);

let expected_byte_count = state.values()
.flatten()
Expand Down Expand Up @@ -391,4 +482,53 @@ mod tests {
})
.1
}

#[test]
fn test_byte_range_cache_doesnt_merge_unnecessarily() {
let cache = ByteRangeCache::with_infinite_capacity(&CACHE_METRICS_FOR_TESTS);

let key: std::path::PathBuf = "key".into();

cache.put_slice(
key.clone(),
0..5,
OwnedBytes::new((0..5).collect::<Vec<_>>()),
);
cache.put_slice(
key.clone(),
5..10,
OwnedBytes::new((5..10).collect::<Vec<_>>()),
);
cache.put_slice(
key.clone(),
10..15,
OwnedBytes::new((10..15).collect::<Vec<_>>()),
);
cache.put_slice(
key.clone(),
15..20,
OwnedBytes::new((15..20).collect::<Vec<_>>()),
);

{
let mutable_cache = cache.inner.lock().unwrap();
assert_eq!(mutable_cache.cache.len(), 4);
assert_eq!(mutable_cache.num_items, 4);
assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 4);
assert_eq!(mutable_cache.num_bytes, 20);
assert_eq!(mutable_cache.cache_counters.in_cache_num_bytes.get(), 20);
}

cache.get_slice(&key, 3..12).unwrap();

{
// now they should've been merged, except the last one
let mutable_cache = cache.inner.lock().unwrap();
assert_eq!(mutable_cache.cache.len(), 2);
assert_eq!(mutable_cache.num_items, 2);
assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 2);
assert_eq!(mutable_cache.num_bytes, 20);
assert_eq!(mutable_cache.cache_counters.in_cache_num_bytes.get(), 20);
}
}
}