From 5be8b477a472e35a31cbe054649badeeb3747356 Mon Sep 17 00:00:00 2001 From: Fullstop000 Date: Sat, 18 Jan 2020 23:14:11 +0800 Subject: [PATCH] add some basic db test Signed-off-by: Fullstop000 --- src/batch.rs | 37 +- src/cache/mod.rs | 1 - src/compaction.rs | 13 +- src/db/format.rs | 2 +- src/db/mod.rs | 780 ++++++++++++++++++++++++++++++------- src/record/mod.rs | 1 - src/record/writer.rs | 2 +- src/sstable/mod.rs | 2 +- src/sstable/table.rs | 1 - src/storage/mem.rs | 33 +- src/table_cache.rs | 9 +- src/version/mod.rs | 86 +++- src/version/version_set.rs | 19 +- 13 files changed, 763 insertions(+), 223 deletions(-) diff --git a/src/batch.rs b/src/batch.rs index a7eb8bd..77149a1 100644 --- a/src/batch.rs +++ b/src/batch.rs @@ -54,17 +54,19 @@ pub const HEADER_SIZE: usize = 12; /// non-const method, all threads accessing the same WriteBatch must use /// external synchronization. /// -#[derive(Clone, Default)] +#[derive(Clone)] pub struct WriteBatch { pub(super) contents: Vec, } -impl WriteBatch { - pub fn new() -> Self { +impl Default for WriteBatch { + fn default() -> Self { let contents = vec![0; HEADER_SIZE]; Self { contents } } +} +impl WriteBatch { #[inline] pub fn data(&self) -> &[u8] { self.contents.as_slice() @@ -168,30 +170,33 @@ impl WriteBatch { self.contents.clear(); self.contents.append(src); } + + /// Returns the number of entires included in this entry #[inline] pub fn get_count(&self) -> u32 { - decode_fixed_32(&self.contents.as_slice()[8..]) + decode_fixed_32(&self.contents[8..]) } #[inline] pub(crate) fn set_count(&mut self, count: u32) { - let s = self.contents.as_mut_slice(); - encode_fixed_32(&mut s[8..], count) + encode_fixed_32(&mut self.contents[8..], count) } #[inline] pub(crate) fn set_sequence(&mut self, seq: u64) { - encode_fixed_64(self.contents.as_mut_slice(), seq) + encode_fixed_64(&mut self.contents, seq) } + /// Returns the seq number of this batch #[inline] pub fn get_sequence(&self) -> u64 { - decode_fixed_64(self.contents.as_slice()) + decode_fixed_64(&self.contents) } + /// Returns false when this batch contains no entries to be written #[inline] pub fn is_empty(&self) -> bool { - self.contents.is_empty() + self.get_count() == 0 } } @@ -244,14 +249,14 @@ mod tests { #[test] fn test_empty_batch() { - let b = WriteBatch::new(); + let b = WriteBatch::default(); assert_eq!("", print_contents(&b).as_str()); - assert_eq!(0, b.get_count()); + assert!(b.is_empty()); } #[test] fn test_multiple_records() { - let mut b = WriteBatch::new(); + let mut b = WriteBatch::default(); b.put("foo".as_bytes(), "bar".as_bytes()); b.delete("box".as_bytes()); b.put("baz".as_bytes(), "boo".as_bytes()); @@ -266,7 +271,7 @@ mod tests { #[test] fn test_corrupted_batch() { - let mut b = WriteBatch::new(); + let mut b = WriteBatch::default(); b.put("foo".as_bytes(), "bar".as_bytes()); b.delete("box".as_bytes()); b.set_sequence(200); @@ -279,8 +284,8 @@ mod tests { #[test] fn test_append_batch() { - let mut b1 = WriteBatch::new(); - let mut b2 = WriteBatch::new(); + let mut b1 = WriteBatch::default(); + let mut b2 = WriteBatch::default(); b1.set_sequence(200); b2.set_sequence(300); b1.append(b2.clone()); @@ -302,7 +307,7 @@ mod tests { #[test] fn test_approximate_size() { - let mut b = WriteBatch::new(); + let mut b = WriteBatch::default(); let empty_size = b.approximate_size(); b.put("foo".as_bytes(), "bar".as_bytes()); let one_key_size = b.approximate_size(); diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 7ac9644..25e2908 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -68,7 +68,6 @@ pub trait Cache { /// Release a mapping returned by a previous `look_up()`. /// REQUIRES: handle must not have been released yet. - /// REQUIRES: handle must have been returned by a method on *this. fn release(&self, handle: HandleRef); /// If the cache contains entry for key, erase it. Note that the diff --git a/src/compaction.rs b/src/compaction.rs index 262199b..ecc58d2 100644 --- a/src/compaction.rs +++ b/src/compaction.rs @@ -26,16 +26,27 @@ use crate::version::version_edit::{FileMetaData, VersionEdit}; use crate::version::version_set::{total_file_size, FileIterFactory}; use crate::version::{LevelFileNumIterator, Version}; use std::cmp::Ordering as CmpOrdering; +use std::sync::atomic::AtomicBool; use std::sync::Arc; /// Information for a manual compaction +#[derive(Clone)] pub struct ManualCompaction { pub level: usize, - pub done: bool, + pub done: Arc, pub begin: Option, // None means beginning of key range pub end: Option, // None means end of key range } +impl PartialEq for ManualCompaction { + fn eq(&self, other: &ManualCompaction) -> bool { + Arc::ptr_eq(&self.done, &other.done) + && self.level == other.level + && self.begin == other.begin + && self.end == other.end + } +} + /// A helper enum describing relations between the indexes of `inputs` in `Compaction` // TODO: use const instead pub enum CompactionInputsRelation { diff --git a/src/db/format.rs b/src/db/format.rs index e5dc8f6..19d6bc4 100644 --- a/src/db/format.rs +++ b/src/db/format.rs @@ -170,7 +170,7 @@ impl InternalKey { #[inline] pub fn user_key(&self) -> &[u8] { let length = self.data.len(); - &self.data.as_slice()[..length - INTERNAL_KEY_TAIL] + &self.data[..length - INTERNAL_KEY_TAIL] } /// Returns a `ParsedInternalKey` diff --git a/src/db/mod.rs b/src/db/mod.rs index fcecd65..c8ad9d4 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -16,10 +16,11 @@ pub mod format; pub mod iterator; use crate::batch::{WriteBatch, HEADER_SIZE}; -use crate::compaction::{Compaction, CompactionInputsRelation}; +use crate::compaction::{Compaction, CompactionInputsRelation, ManualCompaction}; use crate::db::filename::{generate_filename, parse_filename, update_current, FileType}; use crate::db::format::{ - InternalKey, InternalKeyComparator, LookupKey, ParsedInternalKey, ValueType, + InternalKey, InternalKeyComparator, LookupKey, ParsedInternalKey, ValueType, MAX_KEY_SEQUENCE, + VALUE_TYPE_FOR_SEEK, }; use crate::db::iterator::DBIterator; use crate::iterator::{Iterator, MergingIterator}; @@ -35,6 +36,7 @@ use crate::util::reporter::LogReporter; use crate::util::slice::Slice; use crate::version::version_edit::{FileMetaData, VersionEdit}; use crate::version::version_set::VersionSet; +use crate::version::Version; use crate::{Error, Result}; use crossbeam_channel::{Receiver, Sender}; use crossbeam_utils::sync::ShardedLock; @@ -67,7 +69,7 @@ pub trait DB { /// `delete` deletes the value for the given key. It returns `Status::NotFound` if /// the DB does not contain the key. - fn delete(&self, write_opt: WriteOptions, key: Slice) -> Result<()>; + fn delete(&self, write_opt: WriteOptions, key: &[u8]) -> Result<()>; /// `write` applies the operations contained in the `WriteBatch` to the DB atomically. fn write(&self, write_opt: WriteOptions, batch: WriteBatch) -> Result<()>; @@ -95,7 +97,7 @@ impl DB for WickDB { type Iterator = DBIterator, S>; fn put(&self, options: WriteOptions, key: &[u8], value: &[u8]) -> Result<()> { - let mut batch = WriteBatch::new(); + let mut batch = WriteBatch::default(); batch.put(key, value); self.write(options, batch) } @@ -127,14 +129,14 @@ impl DB for WickDB { DBIterator::new(iter, self.inner.clone(), sequence, ucmp) } - fn delete(&self, options: WriteOptions, key: Slice) -> Result<()> { - let mut batch = WriteBatch::new(); - batch.delete(key.as_slice()); + fn delete(&self, options: WriteOptions, key: &[u8]) -> Result<()> { + let mut batch = WriteBatch::default(); + batch.delete(key); self.write(options, batch) } fn write(&self, options: WriteOptions, batch: WriteBatch) -> Result<()> { - self.inner.schedule_batch_and_wait(options, batch) + self.inner.schedule_batch_and_wait(options, batch, false) } fn close(&mut self) -> Result<()> { @@ -178,16 +180,27 @@ impl WickDB { versions.log_and_apply(&mut edit)?; } + let current = versions.current(); db.delete_obsolete_files(versions); let wick_db = WickDB { inner: Arc::new(db), }; wick_db.process_compaction(); wick_db.process_batch(); - wick_db.inner.maybe_schedule_compaction(); + wick_db.inner.maybe_schedule_compaction(current); Ok(wick_db) } + /// Compact the underlying storage for the key range `[begin, end]`. + pub fn compact_range(&self, begin: Option<&[u8]>, end: Option<&[u8]>) -> Result<()> { + self.inner.compact_range(begin, end) + } + + /// Returns the inner background error + pub fn take_background_err(&self) -> Option { + self.inner.take_bg_error() + } + // The thread take batches from the queue and apples them into memtable and WAL. // // Steps: @@ -204,103 +217,87 @@ impl WickDB { if db.is_shutting_down.load(Ordering::Acquire) { break; } - let mut queue = db.batch_queue.lock().unwrap(); - while queue.is_empty() { - queue = db.process_batch_sem.wait(queue).unwrap(); - } - let first = queue.pop_front().unwrap(); - let mut size = first.batch.approximate_size(); - - // Allow the group to grow up to a maximum size, but if the - // original write is small, limit the growth so we do not slow - // down the small write too much - let mut max_size = 1 << 20; - if size <= 128 << 10 { - max_size = size + (128 << 10) - } - let mut signals = vec![]; - signals.push(first.signal.clone()); - let mut grouped = first; - - // Group several batches from queue - while !queue.is_empty() { - let current = queue.pop_front().unwrap(); - if current.options.sync && !grouped.options.sync { - // Do not include a sync write into a batch handled by a non-sync write. - queue.push_front(current); - break; + let first = { + let mut queue = db.batch_queue.lock().unwrap(); + while queue.is_empty() { + // yields current thread and unlock queue + queue = db.process_batch_sem.wait(queue).unwrap(); } - size += current.batch.approximate_size(); - if size > max_size { - // Do not make batch too big - break; - } - grouped.batch.append(current.batch); - signals.push(current.signal.clone()); - } - // Release the queue lock - mem::drop(queue); - match db.make_room_for_write(false) { + queue.pop_front().unwrap() + }; + let force = first.force_mem_compaction; + // TODO: The VersionSet is locked when processing `make_room_for_write` + match db.make_room_for_write(force) { Ok(mut versions) => { - let mut last_seq = versions.last_sequence(); - grouped.batch.set_sequence(last_seq + 1); - last_seq += u64::from(grouped.batch.get_count()); - // must initialize the WAL writer after `make_room_for_write` - let writer = versions.record_writer.as_mut().unwrap(); - let mut res = writer.add_record(grouped.batch.data()); - let mut sync_err = false; - if res.is_ok() && grouped.options.sync { - res = writer.sync(); - if res.is_err() { - sync_err = true; + let (mut grouped, signals) = db.group_batches(first); + if !grouped.batch.is_empty() { + let mut last_seq = versions.last_sequence(); + grouped.batch.set_sequence(last_seq + 1); + last_seq += u64::from(grouped.batch.get_count()); + // `record_writer` must be initialized here + let writer = versions.record_writer.as_mut().unwrap(); + let mut res = writer.add_record(grouped.batch.data()); + let mut sync_err = false; + if res.is_ok() && grouped.options.sync { + res = writer.sync(); + if res.is_err() { + sync_err = true; + } } - } - if res.is_ok() { - let memtable = db.mem.read().unwrap(); - // Might encounter corruption err here - res = grouped.batch.insert_into(&*memtable); - } - match res { - Ok(_) => { - for signal in signals { - if let Err(e) = signal.send(Ok(())) { - error!( - "[process batch] Fail sending finshing signal to waiting batch: {}", e - ) + if res.is_ok() { + let memtable = db.mem.read().unwrap(); + // Might encounter corruption err here + res = grouped.batch.insert_into(&*memtable); + } + match res { + Ok(_) => { + for signal in signals { + if let Err(e) = signal.send(Ok(())) { + error!( + "[process batch] Fail sending finshing signal to waiting batch: {}", e + ) + } } } - } - Err(e) => { - warn!("[process batch] write batch failed: {}", e); - for signal in signals { - if let Err(e) = signal.send(Err(Error::Customized( - "[process batch] write batch failed".to_owned(), - ))) { - error!( - "[process batch] Fail sending finshing signal to waiting batch: {}", e - ) + Err(e) => { + warn!("[process batch] write batch failed: {}", e); + for signal in signals { + if let Err(e) = signal.send(Err(Error::Customized( + "[process batch] write batch failed".to_owned(), + ))) { + error!( + "[process batch] Fail sending finshing signal to waiting batch: {}", e + ) + } + } + if sync_err { + // The state of the log file is indeterminate: the log record we + // just added may or may not show up when the DB is re-opened. + // So we force the DB into a mode where all future writes fail. + db.record_bg_error(e); } } - if sync_err { - // The state of the log file is indeterminate: the log record we - // just added may or may not show up when the DB is re-opened. - // So we force the DB into a mode where all future writes fail. - db.record_bg_error(e); + } + versions.set_last_sequence(last_seq); + } else { + // Notify waiting batches + for signal in signals { + if let Err(e) = signal.send(Ok(())) { + error!( + "[process batch] Fail sending finshing signal to waiting batch: {}", e + ) } } } - versions.set_last_sequence(last_seq); } Err(e) => { - warn!("[process batch] no room for write requests: {}", e); - for signal in signals.iter() { - if let Err(e) = signal.send(Err(Error::Customized( - "[process batch] no room for write requests".to_owned(), - ))) { - error!( - "[process batch] fail to send finishing signal to waiting batch: {}", e - ) - } + if let Err(e) = first.signal.send(Err(Error::Customized(format!( + "[process batch] Error making room for write requests: {}", + e + )))) { + error!( + "[process batch] fail to send finishing signal to waiting batch: {}", e + ) } } } @@ -314,6 +311,7 @@ impl WickDB { let db = self.inner.clone(); thread::spawn(move || { while let Ok(()) = db.do_compaction.1.recv() { + dbg!("receive compaction"); if db.is_shutting_down.load(Ordering::Acquire) { // No more background work when shutting down break; @@ -327,7 +325,9 @@ impl WickDB { // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed - db.maybe_schedule_compaction(); + let current = db.versions.lock().unwrap().current(); + db.maybe_schedule_compaction(current); + dbg!("notify compaction finished"); db.background_work_finished_signal.notify_all(); } }); @@ -353,6 +353,8 @@ pub struct DBImpl { // The version set versions: Mutex>, + // ManualCompaction config + manual_compaction: ShardedLock>, // signal of compaction finished background_work_finished_signal: Condvar, @@ -398,6 +400,7 @@ impl DBImpl { process_batch_sem: Condvar::new(), table_cache: TableCache::new(db_name, o.clone(), o.table_cache_size(), storage.clone()), versions: Mutex::new(VersionSet::new(db_name, o.clone(), storage)), + manual_compaction: ShardedLock::new(None), background_work_finished_signal: Condvar::new(), background_compaction_scheduled: AtomicBool::new(false), do_compaction: crossbeam_channel::unbounded(), @@ -437,10 +440,11 @@ impl DBImpl { } } } + dbg!("nothing in memtable"); let current = self.versions.lock().unwrap().current(); let (value, seek_stats) = current.get(options, lookup_key, &self.table_cache)?; if current.update_stats(seek_stats) { - self.maybe_schedule_compaction() + self.maybe_schedule_compaction(current) } Ok(value) } @@ -448,14 +452,9 @@ impl DBImpl { // Record a sample of bytes read at the specified internal key // Might schedule a background compaction. fn record_read_sample(&self, internal_key: &[u8]) { - if self - .versions - .lock() - .unwrap() - .current() - .record_read_sample(internal_key) - { - self.maybe_schedule_compaction() + let current = self.versions.lock().unwrap().current(); + if current.record_read_sample(internal_key) { + self.maybe_schedule_compaction(current) } } @@ -594,7 +593,7 @@ impl DBImpl { // Read all the records and add to a memtable let mut mem = None; let mut record_buf = vec![]; - let mut batch = WriteBatch::new(); + let mut batch = WriteBatch::default(); let mut max_sequence = 0; let mut have_compacted = false; // indicates that maybe we need while reader.read_record(&mut record_buf) { @@ -697,30 +696,75 @@ impl DBImpl { // Schedule the WriteBatch and wait for the result from the receiver. // This function wakes up the thread in `process_batch`. - fn schedule_batch_and_wait(&self, options: WriteOptions, batch: WriteBatch) -> Result<()> { + // An empty `WriteBatch` will trigger a force memtable compaction. + fn schedule_batch_and_wait( + &self, + options: WriteOptions, + batch: WriteBatch, + force_mem_compaction: bool, + ) -> Result<()> { if self.is_shutting_down.load(Ordering::Acquire) { return Err(Error::DBClosed("schedule WriteBatch".to_owned())); } - if batch.is_empty() { + if batch.is_empty() && !force_mem_compaction { return Ok(()); } let (send, recv) = crossbeam_channel::bounded(0); - let task = BatchTask::new(batch, send, options); + let task = BatchTask { + force_mem_compaction, + batch, + signal: send, + options, + }; self.batch_queue.lock().unwrap().push_back(task); self.process_batch_sem.notify_all(); - match recv.recv() { - Ok(m) => m, - Err(e) => Err(Error::RecvError(e)), + recv.recv().unwrap_or_else(|e| Err(Error::RecvError(e))) + } + + // Group a bunch of batches in the waiting queue + // This will ignore the task with `force_mem_compaction` after batched + fn group_batches(&self, first: BatchTask) -> (BatchTask, Vec>>) { + let mut size = first.batch.approximate_size(); + // Allow the group to grow up to a maximum size, but if the + // original write is small, limit the growth so we do not slow + // down the small write too much + let mut max_size = 1 << 20; + if size <= 128 << 10 { + max_size = size + (128 << 10) } + let mut signals = vec![]; + signals.push(first.signal.clone()); + let mut grouped = first; + + let mut queue = self.batch_queue.lock().unwrap(); + // Group several batches from queue + while !queue.is_empty() { + let current = queue.pop_front().unwrap(); + if current.options.sync && !grouped.options.sync { + // Do not include a sync write into a batch handled by a non-sync write. + queue.push_front(current); + break; + } + size += current.batch.approximate_size(); + if size > max_size { + // Do not make batch too big + break; + } + grouped.batch.append(current.batch); + signals.push(current.signal.clone()); + } + (grouped, signals) } // Make sure there is enough space in memtable. - // This method acquires the mutex of VersionSet and deliver it to the caller. + // This method acquires the mutex of `VersionSet` and deliver it to the caller. + // The `force` flag is used for forcing to compact current memtable into level 0 + // sst files fn make_room_for_write(&self, mut force: bool) -> Result>> { let mut allow_delay = !force; let mut versions = self.versions.lock().unwrap(); loop { - if let Some(e) = { self.bg_error.write().unwrap().take() } { + if let Some(e) = self.take_bg_error() { return Err(e); } else if allow_delay && versions.level_files_count(0) >= self.options.l0_slowdown_writes_threshold @@ -754,13 +798,16 @@ impl DBImpl { versions.set_next_file_number(new_log_num + 1); versions.record_writer = Some(Writer::new(log_file)); // rotate the mem to immutable mem - let mut mem = self.mem.write().unwrap(); - let memtable = - mem::replace(&mut *mem, MemTable::new(self.internal_comparator.clone())); - let mut im_mem = self.im_mem.write().unwrap(); - *im_mem = Some(memtable); - force = false; // do not force another compaction if have room - self.maybe_schedule_compaction(); + { + let mut mem = self.mem.write().unwrap(); + let memtable = + mem::replace(&mut *mem, MemTable::new(self.internal_comparator.clone())); + let mut im_mem = self.im_mem.write().unwrap(); + *im_mem = Some(memtable); + force = false; // do not force another compaction if have room + } + dbg!("immutable rotate"); + self.maybe_schedule_compaction(versions.current()); } } Ok(versions) @@ -801,19 +848,99 @@ impl DBImpl { } } + // Force current memtable contents to be compacted into level 0 sst files + fn force_compact_mem_table(&self) -> Result<()> { + let empty_batch = WriteBatch::default(); + // Schedule a force memory compaction + self.schedule_batch_and_wait(WriteOptions::default(), empty_batch, true)?; + // Waiting for compaction finishing + let l = Mutex::new(()); + let _ = self.background_work_finished_signal.wait(l.lock().unwrap()); + + if self.im_mem.read().unwrap().is_some() { + return self.take_bg_error().map_or(Ok(()), |e| Err(e)); + } + Ok(()) + } + + // Compact the underlying storage for the key range `[begin, end]`. + // + // In particular, deleted and overwritten versions are discarded, + // and the data is rearranged to reduce the cost of operations + // needed to access the data. + // + // This operation should typically only be invoked by users + // who understand the underlying implementation. + // + // A `None` is treated as a key before all keys for `begin` + // and a key after all keys for `end` in the database. + fn compact_range(&self, begin: Option<&[u8]>, end: Option<&[u8]>) -> Result<()> { + let mut max_level_with_files = 1; + let versions = self.versions.lock().unwrap(); + let current = versions.current(); + for l in 1..self.options.max_levels as usize { + if current.overlap_in_level(l, begin, end) { + max_level_with_files = l; + } + } + self.force_compact_mem_table()?; + for l in 0..max_level_with_files { + self.manual_compact_range(l, begin, end) + } + Ok(()) + } + + // Schedules a manual compaction for the key range `[begin, end]` and waits util the + // compaction completes + fn manual_compact_range(&self, level: usize, begin: Option<&[u8]>, end: Option<&[u8]>) { + assert!(level + 1 < self.options.max_levels as usize); + let manual = ManualCompaction { + level, + done: Arc::new(AtomicBool::new(false)), + begin: begin.map(|k| InternalKey::new(k, MAX_KEY_SEQUENCE, VALUE_TYPE_FOR_SEEK)), + end: end.map(|k| InternalKey::new(k, 0, ValueType::Value)), + }; + + let l = Mutex::new(()); + // Waiting for next compaction window + while !manual.done.load(Ordering::Acquire) + && !self.background_compaction_scheduled.load(Ordering::Acquire) + && self.has_bg_error() + { + if self.manual_compaction.read().unwrap().is_none() { + *self.manual_compaction.write().unwrap() = Some(manual.clone()); + let versions = self.versions.lock().unwrap(); + self.maybe_schedule_compaction(versions.current()); + } else { + let _ = self + .background_work_finished_signal + .wait(l.lock().unwrap()) + .unwrap(); + } + } + if let Some(m) = &*self.manual_compaction.read().unwrap() { + if m == &manual { + *self.manual_compaction.write().unwrap() = None; + } + } + } + // The complete compaction process + // This is a sync function call fn background_compaction(&self) { if self.im_mem.read().unwrap().is_some() { + dbg!("compact memtable"); // minor compaction self.compact_mem_table(); + dbg!("compact memtable complete"); } else { let mut is_manual = false; let mut versions = self.versions.lock().unwrap(); if let Some(mut compaction) = { - match versions.manual_compaction.take() { - // manul compaction - Some(mut manual) => { - if manual.done { + match &*self.manual_compaction.read().unwrap() { + // manual compaction + Some(manual) => { + if manual.done.load(Ordering::Acquire) { versions.pick_compaction() } else { let compaction = versions.compact_range( @@ -821,7 +948,7 @@ impl DBImpl { manual.begin.as_ref(), manual.end.as_ref(), ); - manual.done = compaction.is_none(); + manual.done.store(compaction.is_none(), Ordering::Release); let begin = if let Some(begin) = &manual.begin { format!("{:?}", begin) } else { @@ -849,14 +976,13 @@ impl DBImpl { manual.level, begin, end, stop ); is_manual = true; - versions.manual_compaction = Some(manual); compaction } } None => versions.pick_compaction(), } } { - if is_manual && compaction.is_trivial_move() { + if !is_manual && compaction.is_trivial_move() { // just move file to next level let f = compaction.inputs[CompactionInputsRelation::Source as usize] .first() @@ -908,7 +1034,13 @@ impl DBImpl { } } if is_manual { - versions.manual_compaction.as_mut().unwrap().done = true; + self.manual_compaction + .read() + .unwrap() + .as_ref() + .unwrap() + .done + .store(true, Ordering::Release); } } } @@ -1065,32 +1197,40 @@ impl DBImpl { // Replace the `bg_error` with new `Error` if it's `None` fn record_bg_error(&self, e: Error) { - let old = self.bg_error.read().unwrap(); - if old.is_none() { - mem::drop(old); + if !self.has_bg_error() { let mut x = self.bg_error.write().unwrap(); *x = Some(e); self.background_work_finished_signal.notify_all(); } } + // Takes the background error + fn take_bg_error(&self) -> Option { + self.bg_error.write().unwrap().take() + } + + fn has_bg_error(&self) -> bool { + self.bg_error.read().unwrap().is_some() + } + // Check whether db needs to run a compaction. DB will run a compaction when: // 1. no background compaction is running // 2. DB is not shutting down // 3. no error has been encountered // 4. there is an immutable table or a manual compaction request or current version needs to be compacted - fn maybe_schedule_compaction(&self) { + fn maybe_schedule_compaction(&self, version: Arc) { if self.background_compaction_scheduled.load(Ordering::Acquire) // Already scheduled || self.is_shutting_down.load(Ordering::Acquire) // DB is being shutting down - || self.bg_error.read().unwrap().is_some() + || self.has_bg_error() // Got err || (self.im_mem.read().unwrap().is_none() - && !self.versions.lock().unwrap().needs_compaction()) + && self.manual_compaction.read().unwrap().is_none() && !version.needs_compaction()) { // No work needs to be done } else { + dbg!("send compaction signal"); self.background_compaction_scheduled .store(true, Ordering::Release); if let Err(e) = self.do_compaction.0.send(()) { @@ -1137,30 +1277,46 @@ impl DBImpl { } status } + + // Returns the approximate file system space used by keys in "[start .. end)" in + // level <= `n`. + // + // Note that the returned sizes measure file system space usage, so + // if the user data compresses by a factor of ten, the returned + // sizes will be one-tenth the size of the corresponding user data size. + // + // The results may not include the sizes of recently written data. + fn get_approximate_sizes(&self, start: &[u8], end: &[u8], n: usize) -> Vec { + let current = self.versions.lock().unwrap().current(); + let start_ikey = InternalKey::new(start, MAX_KEY_SEQUENCE, VALUE_TYPE_FOR_SEEK); + let end_ikey = InternalKey::new(end, MAX_KEY_SEQUENCE, VALUE_TYPE_FOR_SEEK); + (0..n) + .map(|_| { + let start = current.approximate_offset_of(&start_ikey, &self.table_cache); + let limit = current.approximate_offset_of(&end_ikey, &self.table_cache); + if limit >= start { + limit - start + } else { + 0 + } + }) + .collect::>() + } } // A wrapper struct for scheduling `WriteBatch` struct BatchTask { + force_mem_compaction: bool, batch: WriteBatch, signal: Sender>, options: WriteOptions, } -impl BatchTask { - fn new(batch: WriteBatch, signal: Sender>, options: WriteOptions) -> Self { - Self { - batch, - signal, - options, - } - } -} - -/// Build a Table file from the contents of `iter`. The generated file -/// will be named according to `meta.number`. On success, the rest of -/// meta will be filled with metadata about the generated table. -/// If no data is present in iter, `meta.file_size` will be set to -/// zero, and no Table file will be produced. +// Build a Table file from the contents of `iter`. The generated file +// will be named according to `meta.number`. On success, the rest of +// meta will be filled with metadata about the generated table. +// If no data is present in iter, `meta.file_size` will be set to +// zero, and no Table file will be produced. pub(crate) fn build_table( options: Arc, storage: &S, @@ -1213,3 +1369,329 @@ pub(crate) fn build_table( Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::storage::mem::MemStorage; + use crate::{BloomFilter, CompressionType}; + use std::rc::Rc; + + impl WickDB { + fn options(&self) -> Arc { + self.inner.options.clone() + } + + fn files_count_at_level(&self, level: usize) -> usize { + self.inner.versions.lock().unwrap().level_files_count(level) + } + + fn total_sst_files(&self) -> usize { + let versions = self.inner.versions.lock().unwrap(); + let mut res = 0; + for l in 0..self.options().max_levels as usize { + res += versions.level_files_count(l); + } + res + } + + fn file_count_per_level(&self) -> String { + let mut res = String::new(); + let versions = self.inner.versions.lock().unwrap(); + for l in 0..self.options().max_levels as usize { + let count = versions.level_files_count(l); + res.push_str(&count.to_string()); + res.push(','); + } + res.trim_end_matches("0,").trim_end_matches(",").to_owned() + } + + fn wait_for_compaction_complete(&self) -> Receiver<()> { + let db = self.inner.clone(); + let (sender, recv) = crossbeam_channel::bounded(0); + thread::spawn(move || { + let l = db.versions.lock().unwrap(); + let _ = db.background_work_finished_signal.wait(l); + sender.send(()).unwrap(); + }); + recv + } + } + #[derive(Debug, Clone, Copy, FromPrimitive)] + enum TestOption { + Default = 1, + // Enable `reuse_log` + Reuse = 2, + // Use Bloom Filter as the filter policy + FilterPolicy = 3, + // No compression enabled + UnCompressed = 4, + } + + impl From for TestOption { + fn from(src: u8) -> TestOption { + num_traits::FromPrimitive::from_u8(src).unwrap() + } + } + + fn new_test_options(o: TestOption) -> Options { + match o { + TestOption::Default => Options::default(), + TestOption::Reuse => { + let mut o = Options::default(); + o.reuse_logs = true; + o + } + TestOption::FilterPolicy => { + let filter = BloomFilter::new(10); + let mut o = Options::default(); + o.filter_policy = Some(Rc::new(filter)); + o + } + TestOption::UnCompressed => { + let mut o = Options::default(); + o.compression = CompressionType::NoCompression; + o + } + } + } + struct DBTest { + store: MemStorage, // With the same db's inner storage + db: WickDB, + } + + fn iter_to_string(iter: &dyn Iterator) -> String { + if iter.valid() { + format!("{:?}->{:?}", iter.key(), iter.value()) + } else { + "(invalid)".to_owned() + } + } + + fn default_cases() -> Vec { + cases(|opt| opt) + } + + fn cases(mut opt_hook: F) -> Vec + where + F: FnMut(Options) -> Options, + { + vec![ + TestOption::Default, + TestOption::Reuse, + TestOption::FilterPolicy, + TestOption::UnCompressed, + ] + .into_iter() + .map(|opt| { + let options = opt_hook(new_test_options(opt)); + DBTest::new(options) + }) + .collect() + } + + impl DBTest { + fn new(opt: Options) -> Self { + let store = MemStorage::default(); + let name = "db_test"; + let db = WickDB::open_db(opt, name, store.clone()).unwrap(); + DBTest { store, db } + } + + fn put(&self, k: &str, v: &str) -> Result<()> { + self.db + .put(WriteOptions::default(), k.as_bytes(), v.as_bytes()) + } + + fn delete(&self, k: &str) -> Result<()> { + self.db.delete(WriteOptions::default(), k.as_bytes()) + } + + fn get(&self, k: &str, snapshot: Option) -> Option { + let mut read_opt = ReadOptions::default(); + read_opt.snapshot = snapshot; + match self.db.get(read_opt, k.as_bytes()) { + Ok(v) => v.and_then(|v| Some(v.as_str().to_owned())), + Err(_) => None, + } + } + + // Return a string that contains all key,value pairs in order, + // formatted like "(k1->v1)(k2->v2)...". + // Also checks the db iterator works fine in both forward and backward direction + fn assert_contents(&self) -> String { + let mut iter = self.db.iter(ReadOptions::default()); + iter.seek_to_first(); + let mut result = String::new(); + let mut backward = vec![]; + while iter.valid() { + let s = iter_to_string(&iter); + result.push('('); + result.push_str(&s); + result.push(')'); + backward.push(s); + iter.next(); + } + + // Chech reverse iteration results are reverse of forward results + backward.reverse(); + iter.seek_to_last(); + let mut matched = 0; + while iter.valid() { + assert!(matched < backward.len()); + assert_eq!(iter_to_string(&iter), backward[matched]); + iter.prev(); + matched += 1 + } + assert_eq!(matched, backward.len()); + result + } + + // Return all the values for the given `user_key` + fn all_entires_for(&self, user_key: &[u8]) -> String { + let mut iter = self.db.iter(ReadOptions::default()); + let ikey = InternalKey::new(user_key, MAX_KEY_SEQUENCE, ValueType::Value); + iter.seek(ikey.data()); + let mut result = String::new(); + if iter.valid() { + result.push_str("[ "); + let mut first = true; + while iter.valid() { + match ParsedInternalKey::decode_from(iter.key().as_slice()) { + None => result.push_str("CORRUPTED"), + Some(pkey) => { + if self + .db + .options() + .comparator + .compare(&pkey.user_key, user_key) + != CmpOrdering::Equal + { + break; + } + if !first { + result.push_str(", "); + } + first = false; + match pkey.value_type { + ValueType::Value => result.push_str(iter.value().as_str()), + ValueType::Deletion => result.push_str("DEL"), + ValueType::Unknown => result.push_str("UNKNOWN"), + } + } + } + iter.next(); + } + if !first { + result.push_str(" "); + } + result.push_str("]"); + } else { + result = iter.status().unwrap_err().to_string(); + } + result + } + + fn compact(&self, begin: Option<&[u8]>, end: Option<&[u8]>) { + self.db.inner.compact_range(begin, end).unwrap() + } + + // Do `n` memtable compactions, each of which produces an sstable + // covering the key range `[begin,end]`. + fn make_sst_files(&self, n: usize, begin: &str, end: &str) { + for _ in 0..n { + self.put(begin, "begin").unwrap(); + self.put(end, "end").unwrap(); + self.db.inner.force_compact_mem_table().unwrap(); + } + } + + // Prevent pushing of new sstables into deeper levels by adding + // tables that cover a specified range to all levels + fn fill_levels(&self, begin: &str, end: &str) { + self.make_sst_files(self.db.options().max_levels as usize, begin, end) + } + + fn assert_put_get(&self, key: &str, value: &str) { + self.put(key, value).unwrap(); + assert_eq!(value, self.get(key, None).unwrap()); + } + } + + impl Default for DBTest { + fn default() -> Self { + let store = MemStorage::default(); + let name = "db_test"; + let opt = new_test_options(TestOption::Default); + let db = WickDB::open_db(opt, name, store.clone()).unwrap(); + DBTest { store, db } + } + } + + #[test] + fn test_empty_db() { + for t in default_cases() { + assert_eq!(None, t.get("foo", None)) + } + } + + #[test] + fn test_empty_key() { + for t in default_cases() { + t.assert_put_get("", "v1"); + t.assert_put_get("", "v2"); + } + } + + #[test] + fn test_empty_value() { + for t in default_cases() { + t.assert_put_get("key", "v1"); + t.assert_put_get("key", ""); + t.assert_put_get("key", "v2"); + } + } + + #[test] + fn test_read_write() { + for t in default_cases() { + t.assert_put_get("foo", "v1"); + t.put("bar", "v2").unwrap(); + t.put("foo", "v3").unwrap(); + assert_eq!("v3", t.get("foo", None).unwrap()); + assert_eq!("v2", t.get("bar", None).unwrap()); + } + } + + #[test] + fn test_put_delete_get() { + for t in default_cases() { + t.assert_put_get("foo", "v1"); + t.assert_put_get("foo", "v2"); + t.delete("foo").unwrap(); + assert_eq!(None, t.get("foo", None)); + } + } + + #[test] + fn test_get_from_immutable_layer() { + for t in cases(|mut opt| { + opt.write_buffer_size = 100000; // Small write buffer + opt + }) { + t.assert_put_get("foo", "v1"); + // block `flush()` + let r = t.db.wait_for_compaction_complete(); + t.store.delay_data_sync.store(true, Ordering::Release); + dbg!("k1"); + t.put("k1", &"x".repeat(100000)).unwrap(); // fill memtable + dbg!("k2"); + t.put("k2", &"y".repeat(100000)).unwrap(); // trigger compaction + // Waiting for compaction finish + r.recv().unwrap(); + // Try to retrieve key "foo" from level 0 files + assert_eq!("v1", dbg!(t.get("foo", None)).unwrap()); + dbg!("end"); + } + } +} diff --git a/src/record/mod.rs b/src/record/mod.rs index 6183e39..338de2a 100644 --- a/src/record/mod.rs +++ b/src/record/mod.rs @@ -311,7 +311,6 @@ mod tests { } pub fn match_error(&self, msg: &str) -> bool { - dbg!(self.reporter.message.borrow()); match self.reporter.message.borrow().find(msg) { Some(_) => true, None => false, diff --git a/src/record/writer.rs b/src/record/writer.rs index 8504d61..ac75309 100644 --- a/src/record/writer.rs +++ b/src/record/writer.rs @@ -133,7 +133,7 @@ impl Writer { // write the header and the data self.dest.write(&buf)?; self.dest.write(data)?; - self.dest.flush()?; + // self.dest.flush()?; // update block_offset self.block_offset += HEADER_SIZE + size; Ok(()) diff --git a/src/sstable/mod.rs b/src/sstable/mod.rs index 43c3188..b4910ae 100644 --- a/src/sstable/mod.rs +++ b/src/sstable/mod.rs @@ -744,7 +744,7 @@ mod tests { data: &[(Vec, Vec)], ) -> Result<()> { for (key, value) in data.iter() { - let mut batch = WriteBatch::new(); + let mut batch = WriteBatch::default(); batch.put(key.as_slice(), value.as_slice()); self.inner .write(WriteOptions::default(), batch) diff --git a/src/sstable/table.rs b/src/sstable/table.rs index e6ae5b9..07b657a 100644 --- a/src/sstable/table.rs +++ b/src/sstable/table.rs @@ -188,7 +188,6 @@ impl Table { /// E.g., the approximate offset of the last key in the table will /// be close to the file length. /// Temporary only used in tests. - #[allow(dead_code)] pub(crate) fn approximate_offset_of(&self, key: &[u8]) -> u64 { let mut index_iter = self.index_block.iter(self.options.comparator.clone()); index_iter.seek(key); diff --git a/src/storage/mem.rs b/src/storage/mem.rs index b04b8f0..127a9bf 100644 --- a/src/storage/mem.rs +++ b/src/storage/mem.rs @@ -41,25 +41,25 @@ pub struct MemStorage { inner: Arc>>, // ---- Parameters for fault injection - // sstable/log `flush()` calls are blocked. - delay_data_sync: Arc, + /// sstable/log `flush()` calls are blocked. + pub delay_data_sync: Arc, - // sstable/log `flush()` calls return an error - data_sync_error: Arc, + /// sstable/log `flush()` calls return an error + pub data_sync_error: Arc, - // Simulate no-space errors - no_space: Arc, + /// Simulate no-space errors + pub no_space: Arc, - // Simulate non-writable file system - non_writable: Arc, + /// Simulate non-writable file system + pub non_writable: Arc, - // Force sync of manifest files to fail - manifest_sync_error: Arc, + /// Force sync of manifest files to fail + pub manifest_sync_error: Arc, - // Force write to manifest files to fail - manifest_write_error: Arc, + /// Force write to manifest files to fail + pub manifest_write_error: Arc, - count_random_reads: bool, + pub count_random_reads: bool, } impl Default for MemStorage { @@ -154,7 +154,12 @@ impl Storage for MemStorage { let path = clean(name); self.is_ok_to_create(path.as_path())?; let name = path.to_str().unwrap().to_owned(); - let file_node = FileNode::new(&name); + let mut file_node = FileNode::new(&name); + file_node.delay_data_sync = self.delay_data_sync.clone(); + file_node.data_sync_error = self.data_sync_error.clone(); + file_node.no_space = self.no_space.clone(); + file_node.manifest_sync_error = self.manifest_sync_error.clone(); + file_node.manifest_write_error = self.manifest_write_error.clone(); self.inner .write() .unwrap() diff --git a/src/table_cache.rs b/src/table_cache.rs index d6d4ee2..8499dcd 100644 --- a/src/table_cache.rs +++ b/src/table_cache.rs @@ -48,11 +48,18 @@ impl TableCache { } } + /// Returns + pub fn get_table(&self, file_number: u64, file_size: u64) -> Result>> { + let handle = self.find_table(file_number, file_size)?; + // The `unwrap()` must be safe + Ok(handle.value().unwrap()) + } + // Try to find the sst file from cache. If not found, try to find the file from storage and insert it into the cache fn find_table(&self, file_number: u64, file_size: u64) -> Result>>> { let mut key = vec![]; VarintU64::put_varint(&mut key, file_number); - match self.cache.look_up(key.as_slice()) { + match self.cache.look_up(&key) { Some(handle) => Ok(handle), None => { let filename = generate_filename(self.db_name, FileType::Table, file_number); diff --git a/src/version/mod.rs b/src/version/mod.rs index 5811934..616f4f8 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -16,7 +16,7 @@ // found in the LICENSE file. use crate::db::format::{ - InternalKey, InternalKeyComparator, LookupKey, ParsedInternalKey, ValueType, + InternalKey, InternalKeyComparator, LookupKey, ParsedInternalKey, ValueType, MAX_KEY_SEQUENCE, VALUE_TYPE_FOR_SEEK, }; use crate::iterator::Iterator; @@ -135,13 +135,14 @@ impl Version { // overlap user_key and process them in order from newest to oldest because // the last level-0 file always has the newest entries. for f in files.iter().rev() { - if ucmp.compare(ukey, f.largest.data()) != CmpOrdering::Greater - && ucmp.compare(ukey, f.smallest.data()) != CmpOrdering::Less + if ucmp.compare(ukey, f.largest.user_key()) != CmpOrdering::Greater + && ucmp.compare(ukey, f.smallest.user_key()) != CmpOrdering::Less { files_to_seek.push(f.clone()); } } files_to_seek.sort_by(|a, b| b.number.cmp(&a.number)); + dbg!(files_to_seek.len()); } else { let index = Self::find_file(self.icmp.clone(), self.files[level].as_slice(), &ikey); if index >= files.len() { @@ -202,6 +203,11 @@ impl Version { false } + /// Whether the version needs to be compacted + pub fn needs_compaction(&self) -> bool { + self.compaction_score > 1.0 || self.file_to_compact.read().unwrap().is_some() + } + /// Return a String includes number of files in every level pub fn level_summary(&self) -> String { let mut s = String::from("files[ "); @@ -247,14 +253,14 @@ impl Version { largest_ukey: &[u8], ) -> usize { let mut level = 0; - if !self.overlap_in_level(level, smallest_ukey, largest_ukey) { + if !self.overlap_in_level(level, Some(smallest_ukey), Some(largest_ukey)) { // No overlapping in level 0 // we might directly push files to next level if there is no overlap in next level let smallest_ikey = - InternalKey::new(smallest_ukey, u64::max_value(), VALUE_TYPE_FOR_SEEK); + InternalKey::new(smallest_ukey, MAX_KEY_SEQUENCE, VALUE_TYPE_FOR_SEEK); let largest_ikey = InternalKey::new(largest_ukey, 0, ValueType::Deletion); while level < self.options.max_mem_compact_level { - if self.overlap_in_level(level + 1, smallest_ukey, largest_ukey) { + if self.overlap_in_level(level + 1, Some(smallest_ukey), Some(largest_ukey)) { break; } if level + 2 < self.options.max_levels as usize { @@ -419,17 +425,23 @@ impl Version { false } - // Returns true iff some file in the specified level overlaps - // some part of `[smallest_ukey,largest_ukey]`. - // `smallest_ukey` is empty represents a key smaller than all the DB's keys. - // `largest_ukey` is empty represents a key largest than all the DB's keys. - fn overlap_in_level(&self, level: usize, smallest_ukey: &[u8], largest_ukey: &[u8]) -> bool { + /// Returns true iff some file in the specified level overlaps + /// some part of `[smallest_ukey,largest_ukey]`. + /// `smallest_ukey` is `None` represents a key smaller than all the DB's keys. + /// `largest_ukey` is `None` represents a key largest than all the DB's keys. + pub fn overlap_in_level( + &self, + level: usize, + smallest_ukey: Option<&[u8]>, + largest_ukey: Option<&[u8]>, + ) -> bool { if level == 0 { // need to check against all files in level 0 for file in self.files[0].iter() { if self.key_is_after_file(file.clone(), smallest_ukey) || self.key_is_before_file(file.clone(), largest_ukey) { + // No overlap continue; } else { return true; @@ -439,9 +451,8 @@ impl Version { } // binary search in level > 0 let index = { - if !smallest_ukey.is_empty() { - let smallest_ikey = - InternalKey::new(smallest_ukey, u64::max_value(), VALUE_TYPE_FOR_SEEK); + if let Some(s_ukey) = smallest_ukey { + let smallest_ikey = InternalKey::new(s_ukey, MAX_KEY_SEQUENCE, VALUE_TYPE_FOR_SEEK); Self::find_file(self.icmp.clone(), &self.files[level], smallest_ikey.data()) } else { 0 @@ -455,21 +466,56 @@ impl Version { !self.key_is_before_file(self.files[level][index].clone(), largest_ukey) } - fn key_is_after_file(&self, file: Arc, ukey: &[u8]) -> bool { - !ukey.is_empty() + /// Return the approximate offset in the database of the data for + /// given `ikey` in this version + pub fn approximate_offset_of( + &self, + ikey: &InternalKey, + table_cache: &TableCache, + ) -> u64 { + let mut result = 0; + for (level, files) in self.files.iter().enumerate() { + for f in files { + if self.icmp.compare(f.largest.data(), ikey.data()) != CmpOrdering::Greater { + // Entire file is before "ikey", so just add the file size + result += f.file_size; + } else if self.icmp.compare(f.smallest.data(), ikey.data()) == CmpOrdering::Greater + { + // Entire file is after "ikey", so ignore + if level > 0 { + // Files other than level 0 are sorted by `smallest`, so + // no further files in this level will contain data for + // "ikey" + break; + } + } else { + // "ikey" falls in the range for this table. Add the + // approximate offset of "ikey" within the table. + if let Ok(table) = table_cache.get_table(f.number, f.file_size) { + result += table.approximate_offset_of(ikey.data()); + } + } + } + } + result + } + // used for smallest user key + fn key_is_after_file(&self, file: Arc, ukey: Option<&[u8]>) -> bool { + ukey.is_some() && self .icmp .user_comparator - .compare(ukey, file.largest.user_key()) + .compare(ukey.unwrap(), file.largest.user_key()) == CmpOrdering::Greater } - fn key_is_before_file(&self, file: Arc, ukey: &[u8]) -> bool { - !ukey.is_empty() + // used for biggest user key + fn key_is_before_file(&self, file: Arc, ukey: Option<&[u8]>) -> bool { + ukey.is_some() && self .icmp .user_comparator - .compare(ukey, file.smallest.user_key()) + .compare(ukey.unwrap(), file.smallest.user_key()) == CmpOrdering::Less } diff --git a/src/version/version_set.rs b/src/version/version_set.rs index 912e9ed..97b9ceb 100644 --- a/src/version/version_set.rs +++ b/src/version/version_set.rs @@ -15,7 +15,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use crate::compaction::{base_range, total_range, Compaction, CompactionStats, ManualCompaction}; +use crate::compaction::{base_range, total_range, Compaction, CompactionStats}; use crate::db::build_table; use crate::db::filename::{generate_filename, parse_filename, update_current, FileType}; use crate::db::format::{InternalKey, InternalKeyComparator}; @@ -158,8 +158,6 @@ pub struct VersionSet { pub compaction_stats: Vec, // Set of table files to protect from deletion because they are part of ongoing compaction pub pending_outputs: HashSet, - // Represent a manual compaction, temporarily just for test - pub manual_compaction: Option, // WAL writer pub record_writer: Option>, @@ -199,7 +197,6 @@ impl VersionSet { snapshots: SnapshotList::new(), compaction_stats, pending_outputs: HashSet::default(), - manual_compaction: None, db_name, storage, record_writer: None, @@ -219,7 +216,8 @@ impl VersionSet { #[inline] pub fn level_files_count(&self, level: usize) -> usize { assert!(level < self.options.max_levels as usize); - self.versions.front().unwrap().files[level].len() + let level_files = &self.versions.front().unwrap().files; + level_files.get(level).map_or(0, |files| files.len()) } /// Returns `prev_log_number` @@ -240,17 +238,6 @@ impl VersionSet { self.log_number = log_num; } - /// Whether the current version needs to be compacted - #[inline] - pub fn needs_compaction(&self) -> bool { - if self.manual_compaction.is_some() { - true - } else { - let current = self.current(); - current.compaction_score > 1.0 || current.file_to_compact.read().unwrap().is_some() - } - } - /// Returns the next file number #[inline] pub fn get_next_file_number(&self) -> u64 {