From 904dfd99f4a01d5ef6d522df6bb3227f3c1e5c5a Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Thu, 30 Nov 2023 14:30:29 +0100 Subject: [PATCH] avoid unneeded copy on touching but non-overlaping byte ranges --- .../src/cache/byte_range_cache.rs | 141 ++++++++++++++++-- 1 file changed, 132 insertions(+), 9 deletions(-) diff --git a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs index 9a6200f3cb2..309b8256711 100644 --- a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs +++ b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs @@ -83,6 +83,8 @@ impl NeedMutByteRangeCache { let key = CacheKey::from_borrowed(tag, byte_range.start); let (k, v) = if let Some((k, v)) = self.get_block(&key, byte_range.end) { (k, v) + } else if let Some((k, v)) = self.merge_ranges(&key, byte_range.end) { + (k, v) } else { self.cache_counters.misses_num_items.inc(); return None; @@ -90,13 +92,14 @@ impl NeedMutByteRangeCache { let start = byte_range.start - k.range_start; let end = byte_range.end - k.range_start; + let result = v.bytes.slice(start..end); self.cache_counters.hits_num_items.inc(); self.cache_counters .hits_num_bytes .inc_by((end - start) as u64); - Some(v.bytes.slice(start..end)) + Some(result) } fn put_slice(&mut self, tag: T::Owned, byte_range: Range, bytes: OwnedBytes) { @@ -106,13 +109,13 @@ impl NeedMutByteRangeCache { return; } + // try to find a block with which we overlap (and not just touch) let start_key = CacheKey::from_borrowed(tag.borrow(), byte_range.start); - let end_key = CacheKey::from_borrowed(tag.borrow(), byte_range.end); - let first_matching_block = self - .get_block(&start_key, byte_range.start) + .get_block(&start_key, byte_range.start + 1) .map(|(k, _v)| k); + let end_key = CacheKey::from_borrowed(tag.borrow(), byte_range.end - 1); let last_matching_block = self.get_block(&end_key, byte_range.end).map(|(k, _v)| k); if first_matching_block.is_some() && first_matching_block == last_matching_block { @@ -202,17 +205,86 @@ impl NeedMutByteRangeCache { } // Return a block that contain everything between query.range_start and range_end - fn get_block<'a, 'b: 'a>( - &'a self, - query: &CacheKey<'b, T>, + fn get_block<'a>( + &self, + query: &CacheKey<'a, T>, range_end: usize, - ) -> Option<(&CacheKey<'_, T>, &CacheValue)> { + ) -> Option<(&CacheKey<'a, T>, &CacheValue)> { self.cache .range(..=query) .next_back() .filter(|(k, v)| k.tag == query.tag && range_end <= v.range_end) } + /// Try to merge all blocks in the given range. Fails if some bytes were not already stored. + fn merge_ranges<'a>( + &mut self, + start: &CacheKey<'a, T>, + range_end: usize, + ) -> Option<(&CacheKey<'a, T>, &CacheValue)> { + let own_key = |key: &CacheKey| { + CacheKey::from_owned(T::borrow(&key.tag).to_owned(), key.range_start) + }; + + let first_block = self.get_block(start, start.range_start)?; + + let overlapping_blocks = self + .cache + .range(first_block.0..) + .take_while(|(k, _)| k.tag == start.tag && k.range_start <= range_end); + + let mut last_block = first_block; + for (k, v) in overlapping_blocks.clone().skip(1) { + if k.range_start != last_block.1.range_end { + // we can't have overlap by how we build, so we have a gap + return None; + } + + last_block = (k, v); + } + if last_block.1.range_end < range_end { + // we got a gap at the end + return None; + } + + // we have everything we need + let mut buffer = Vec::with_capacity(last_block.1.range_end - first_block.0.range_start); + let mut part_count = 0i64; + for (_, v) in overlapping_blocks { + part_count += 1; + buffer.extend_from_slice(&v.bytes); + } + assert_eq!( + buffer.len(), + (last_block.1.range_end - first_block.0.range_start) + ); + + let new_key = own_key(first_block.0); + let new_value = CacheValue { + range_end: last_block.1.range_end, + bytes: OwnedBytes::new(buffer), + }; + + // cleanup is sub-optimal, we'd need a BTreeMap::drain_range or something like that + let last_key = own_key(last_block.0); + + let blocks_to_remove: Vec<_> = self + .cache + .range(&new_key..=&last_key) + .map(|(k, _)| own_key(k)) + .collect(); + for block in blocks_to_remove { + self.cache.remove(&block); + } + + self.cache.insert(new_key, new_value); + + self.num_items -= (part_count - 1) as u64; + self.cache_counters.in_cache_count.sub(part_count - 1); + + self.get_block(start, range_end) + } + fn update_counter_record_item(&mut self, num_bytes: usize) { self.num_items += 1; self.num_bytes += num_bytes as u64; @@ -352,7 +424,9 @@ mod tests { count_items(tagged_state) }) .sum(); - assert_eq!(cache.inner.lock().unwrap().num_items, expected_item_count as u64); + // in some case we have ranges touching each other, count_items count them + // as only one, but cache count them as 2. + assert!(cache.inner.lock().unwrap().num_items >= expected_item_count as u64); let expected_byte_count = state.values() .flatten() @@ -391,4 +465,53 @@ mod tests { }) .1 } + + #[test] + fn test_byte_range_cache_doesnt_merge_unnecessarily() { + let cache = ByteRangeCache::with_infinite_capacity(&CACHE_METRICS_FOR_TESTS); + + let key: std::path::PathBuf = "key".into(); + + cache.put_slice( + key.clone(), + 0..5, + OwnedBytes::new((0..5).collect::>()), + ); + cache.put_slice( + key.clone(), + 5..10, + OwnedBytes::new((5..10).collect::>()), + ); + cache.put_slice( + key.clone(), + 10..15, + OwnedBytes::new((10..15).collect::>()), + ); + cache.put_slice( + key.clone(), + 15..20, + OwnedBytes::new((15..20).collect::>()), + ); + + { + let mutable_cache = cache.inner.lock().unwrap(); + assert_eq!(mutable_cache.cache.len(), 4); + assert_eq!(mutable_cache.num_items, 4); + assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 4); + assert_eq!(mutable_cache.num_bytes, 20); + assert_eq!(mutable_cache.cache_counters.in_cache_num_bytes.get(), 20); + } + + cache.get_slice(&key, 3..12).unwrap(); + + { + // now they should've been merged, except the last one + let mutable_cache = cache.inner.lock().unwrap(); + assert_eq!(mutable_cache.cache.len(), 2); + assert_eq!(mutable_cache.num_items, 2); + assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 2); + assert_eq!(mutable_cache.num_bytes, 20); + assert_eq!(mutable_cache.cache_counters.in_cache_num_bytes.get(), 20); + } + } }