Skip to content

Commit

Permalink
*: use associate type File for Storage (#35)
Browse files Browse the repository at this point in the history
Signed-off-by: Fullstop000 <[email protected]>
  • Loading branch information
Fullstop000 authored Jan 13, 2020
1 parent ac2c819 commit 4654160
Show file tree
Hide file tree
Showing 16 changed files with 190 additions and 194 deletions.
117 changes: 58 additions & 59 deletions src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::db::format::{InternalKey, InternalKeyComparator};
use crate::iterator::{ConcatenateIterator, Iterator, MergingIterator};
use crate::options::{Options, ReadOptions};
use crate::sstable::table::TableBuilder;
use crate::storage::Storage;
use crate::storage::{File, Storage};
use crate::table_cache::TableCache;
use crate::util::comparator::Comparator;
use crate::version::version_edit::{FileMetaData, VersionEdit};
Expand All @@ -44,7 +44,7 @@ pub enum CompactionInputsRelation {
}

/// A Compaction encapsulates information about a compaction
pub struct Compaction {
pub struct Compaction<F: File> {
options: Arc<Options>,
// Target level to be compacted
pub level: usize,
Expand Down Expand Up @@ -84,13 +84,13 @@ pub struct Compaction {
// current table builder for output sst file
// we rotate a new builder when the inputs hit
// the `should_stop_before`
pub builder: Option<TableBuilder>,
pub builder: Option<TableBuilder<F>>,

// total bytes has been written
pub total_bytes: u64,
}

impl Compaction {
impl<O: File> Compaction<O> {
pub fn new(options: Arc<Options>, level: usize) -> Self {
let max_levels = options.max_levels as usize;
let mut level_ptrs = Vec::with_capacity(max_levels);
Expand All @@ -115,61 +115,6 @@ impl Compaction {
}
}

/// Returns the minimal range that covers all entries in `files`
pub fn base_range<'a>(
files: &'a [Arc<FileMetaData>],
level: usize,
icmp: &InternalKeyComparator,
) -> (&'a InternalKey, &'a InternalKey) {
assert!(
!files.is_empty(),
"[compaction] the input[0] shouldn't be empty when trying to get covered range"
);
if level == 0 {
// level 0 files are possible to overlaps with each other
let mut smallest = &files.first().unwrap().smallest;
let mut largest = &files.last().unwrap().largest;
for f in files.iter().skip(1) {
if icmp.compare(f.smallest.data(), smallest.data()) == CmpOrdering::Less {
smallest = &f.smallest;
}
if icmp.compare(f.largest.data(), largest.data()) == CmpOrdering::Greater {
largest = &f.largest;
}
}
(smallest, largest)
} else {
// no overlapping in level > 0 and file is ordered by smallest key
(
&files.first().unwrap().smallest,
&files.last().unwrap().largest,
)
}
}

/// Returns the minimal range that covers all key ranges in `current_l_files` and `next_l_files`
/// `current_l_files` means current level files to be compacted
/// `next_l_files` means next level files to be compacted
pub fn total_range<'a>(
current_l_files: &'a [Arc<FileMetaData>],
next_l_files: &'a [Arc<FileMetaData>],
level: usize,
icmp: &InternalKeyComparator,
) -> (&'a InternalKey, &'a InternalKey) {
let (mut smallest, mut largest) = Self::base_range(current_l_files, level, icmp);
if !next_l_files.is_empty() {
let first = next_l_files.first().unwrap();
if icmp.compare(first.smallest.data(), smallest.data()) == CmpOrdering::Less {
smallest = &first.smallest
}
let last = next_l_files.last().unwrap();
if icmp.compare(last.largest.data(), largest.data()) == CmpOrdering::Greater {
largest = &last.largest
}
}
(smallest, largest)
}

/// Is this a trivial compaction that can be implemented by just
/// moving a single input file to the next level (no merging or splitting)
pub fn is_trivial_move(&self) -> bool {
Expand Down Expand Up @@ -310,6 +255,60 @@ impl Compaction {
}
}

/// Returns the minimal range that covers all entries in `files`
pub fn base_range<'a>(
files: &'a [Arc<FileMetaData>],
level: usize,
icmp: &InternalKeyComparator,
) -> (&'a InternalKey, &'a InternalKey) {
assert!(
!files.is_empty(),
"[compaction] the input[0] shouldn't be empty when trying to get covered range"
);
if level == 0 {
// level 0 files are possible to overlaps with each other
let mut smallest = &files.first().unwrap().smallest;
let mut largest = &files.last().unwrap().largest;
for f in files.iter().skip(1) {
if icmp.compare(f.smallest.data(), smallest.data()) == CmpOrdering::Less {
smallest = &f.smallest;
}
if icmp.compare(f.largest.data(), largest.data()) == CmpOrdering::Greater {
largest = &f.largest;
}
}
(smallest, largest)
} else {
// no overlapping in level > 0 and file is ordered by smallest key
(
&files.first().unwrap().smallest,
&files.last().unwrap().largest,
)
}
}

/// Returns the minimal range that covers all key ranges in `current_l_files` and `next_l_files`
/// `current_l_files` means current level files to be compacted
/// `next_l_files` means next level files to be compacted
pub fn total_range<'a>(
current_l_files: &'a [Arc<FileMetaData>],
next_l_files: &'a [Arc<FileMetaData>],
level: usize,
icmp: &InternalKeyComparator,
) -> (&'a InternalKey, &'a InternalKey) {
let (mut smallest, mut largest) = base_range(current_l_files, level, icmp);
if !next_l_files.is_empty() {
let first = next_l_files.first().unwrap();
if icmp.compare(first.smallest.data(), smallest.data()) == CmpOrdering::Less {
smallest = &first.smallest
}
let last = next_l_files.last().unwrap();
if icmp.compare(last.largest.data(), largest.data()) == CmpOrdering::Greater {
largest = &last.largest
}
}
(smallest, largest)
}
/// A helper struct for recording the statistics in compactions
pub struct CompactionStats {
micros: u64,
Expand Down
2 changes: 1 addition & 1 deletion src/db/filename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub fn parse_filename<P: AsRef<Path>>(filename: P) -> Option<(FileType, u64)> {
}

/// Update the CURRENT file to point to new MANIFEST file
pub fn update_current(env: &dyn Storage, dbname: &str, manifest_file_num: u64) -> Result<()> {
pub fn update_current<S: Storage>(env: &S, dbname: &str, manifest_file_num: u64) -> Result<()> {
// Remove leading "dbname/" and add newline to manifest file nam
let mut manifest = generate_filename(dbname, FileType::Manifest, manifest_file_num);
manifest.drain(0..=dbname.len());
Expand Down
19 changes: 8 additions & 11 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub trait DB {

/// The wrapper of `DBImpl` for concurrency control.
/// `WickDB` is thread safe and is able to be shared by `clone()` in different threads.
#[derive(Clone)]
pub struct WickDB<S: Storage + Clone + 'static> {
inner: Arc<DBImpl<S>>,
}
Expand Down Expand Up @@ -333,21 +334,13 @@ impl<S: Storage + Clone> WickDB<S> {
}
}

impl<S: Storage + Clone> Clone for WickDB<S> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}

pub struct DBImpl<S: Storage + Clone> {
env: S,
internal_comparator: InternalKeyComparator,
options: Arc<Options>,
// The physical path of wickdb
db_name: &'static str,
db_lock: Option<Box<dyn File>>,
db_lock: Option<S::F>,

/*
* Fields for write batch scheduling
Expand Down Expand Up @@ -923,7 +916,7 @@ impl<S: Storage + Clone + 'static> DBImpl<S> {

// Merging files in level n into file in level n + 1 and
// keep the still-in-use files
fn do_compaction(&self, c: &mut Compaction) -> MutexGuard<VersionSet<S>> {
fn do_compaction(&self, c: &mut Compaction<S::F>) -> MutexGuard<VersionSet<S>> {
let now = SystemTime::now();
let mut input_iter =
c.new_input_iterator(self.internal_comparator.clone(), self.table_cache.clone());
Expand Down Expand Up @@ -1110,7 +1103,11 @@ impl<S: Storage + Clone + 'static> DBImpl<S> {
}

// Finish the current output file by calling `buidler.finish` and insert it into the table cache
fn finish_output_file(&self, compact: &mut Compaction, input_iter_valid: bool) -> Result<()> {
fn finish_output_file(
&self,
compact: &mut Compaction<S::F>,
input_iter_valid: bool,
) -> Result<()> {
assert!(!compact.outputs.is_empty());
assert!(compact.builder.is_some());
let current_entries = compact.builder.as_ref().unwrap().num_entries();
Expand Down
14 changes: 7 additions & 7 deletions src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@ use std::sync::Mutex;

/// A simple file based Logger
// TODO: maybe use slog-rs instead
pub struct Logger {
file: Mutex<Box<dyn File>>,
pub struct Logger<F: File> {
file: Mutex<F>,
level: LevelFilter,
}

unsafe impl Send for Logger {}
unsafe impl Sync for Logger {}
// unsafe impl Send for Logger {}
// unsafe impl Sync for Logger {}

impl Logger {
pub fn new(file: Box<dyn File>, level: LevelFilter) -> Self {
impl<F: File> Logger<F> {
pub fn new(file: F, level: LevelFilter) -> Self {
Self {
file: Mutex::new(file),
level,
}
}
}

impl Log for Logger {
impl<F: File> Log for Logger<F> {
fn enabled(&self, metadata: &Metadata) -> bool {
metadata.level() <= self.level
}
Expand Down
8 changes: 6 additions & 2 deletions src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::logger::Logger;
use crate::options::CompressionType::{NoCompression, SnappyCompression, Unknown};
use crate::snapshot::Snapshot;
use crate::sstable::block::Block;
use crate::storage::Storage;
use crate::storage::{File, Storage};
use crate::util::comparator::{BytewiseComparator, Comparator};
use crate::LevelFilter;
use crate::Log;
Expand Down Expand Up @@ -206,7 +206,11 @@ impl Options {
}

/// Initialize Options by limiting ranges of some flags, applying customized Logger and etc.
pub(crate) fn initialize(&mut self, db_name: String, storage: &dyn Storage) {
pub(crate) fn initialize<O: File + 'static, S: Storage<F = O>>(
&mut self,
db_name: String,
storage: &S,
) {
self.max_open_files =
Self::clip_range(self.max_open_files, 64 + self.non_table_cache_files, 50000);
self.write_buffer_size = Self::clip_range(self.write_buffer_size, 64 << 10, 1 << 30);
Expand Down
25 changes: 12 additions & 13 deletions src/record/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ mod tests {
returned_partial: bool,
}

// Just for satisfy rustc
unsafe impl Send for StringFile {}
unsafe impl Sync for StringFile {}

impl StringFile {
pub fn new(data: Rc<RefCell<Vec<u8>>>) -> Self {
Self {
Expand Down Expand Up @@ -206,8 +210,8 @@ mod tests {
read_source: StringFile,
reporter: ReportCollector,
reading: bool,
reader: Reader,
writer: Writer,
reader: Reader<StringFile>,
writer: Writer<StringFile>,
}
const INITIAL_OFFSET_RECORD_SIZES: [usize; 6] = [
10000,
Expand All @@ -233,25 +237,20 @@ mod tests {
pub fn new(reporter: ReportCollector) -> Self {
let data = Rc::new(RefCell::new(vec![]));
let f = StringFile::new(data.clone());
let writer = Writer::new(Box::new(f.clone()));
let writer = Writer::new(f.clone());
Self {
source: data.clone(),
read_source: f.clone(),
reporter: reporter.clone(),
reading: false,
reader: Reader::new(
Box::new(f.clone()),
Some(Box::new(reporter.clone())),
true,
0,
),
reader: Reader::new(f.clone(), Some(Box::new(reporter.clone())), true, 0),
writer,
}
}

// Replace the current writer with a new one created from the current StringFile
pub fn reopen_for_append(&mut self) {
let writer = Writer::new(Box::new(StringFile::new(self.source.clone())));
let writer = Writer::new(StringFile::new(self.source.clone()));
self.writer = writer;
}

Expand Down Expand Up @@ -330,7 +329,7 @@ mod tests {

pub fn start_reading_at(&mut self, initial_offset: u64) {
self.reader = Reader::new(
Box::new(self.read_source.clone()),
self.read_source.clone(),
Some(Box::new(self.reporter.clone())),
true,
initial_offset,
Expand All @@ -343,7 +342,7 @@ mod tests {
self.reading = true;
let size = self.written_bytes() as u64;
let mut reader = Reader::new(
Box::new(self.read_source.clone()),
self.read_source.clone(),
Some(Box::new(self.reporter.clone())),
true,
size + offset_past_end,
Expand All @@ -361,7 +360,7 @@ mod tests {
self.write_initial_offset_log();
self.reading = true;
let mut reader = Reader::new(
Box::new(self.read_source.clone()),
self.read_source.clone(),
Some(Box::new(self.reporter.clone())),
true,
initial_offset,
Expand Down
10 changes: 5 additions & 5 deletions src/record/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ pub trait Reporter {

/// A `Reader` is used for reading records from log file.
/// The `Reader` always starts reading the records at `initial_offset` of the `file`.
pub struct Reader {
pub struct Reader<F: File> {
// NOTICE: we probably mutate the underlying file in the FilePtr by calling `seek()` and this is not thread safe
file: Box<dyn File>,
file: F,
reporter: Option<Box<dyn Reporter>>,
// We should check sum for the record or not
checksum: bool,
Expand All @@ -74,9 +74,9 @@ pub struct Reader {
resyncing: bool,
}

impl Reader {
impl<F: File> Reader<F> {
pub fn new(
file: Box<dyn File>,
file: F,
reporter: Option<Box<dyn Reporter>>,
checksum: bool,
initial_offset: u64,
Expand All @@ -97,7 +97,7 @@ impl Reader {

/// Deliver the file's ownership
#[inline]
pub fn into_file(self) -> Box<dyn File> {
pub fn into_file(self) -> F {
self.file
}

Expand Down
Loading

0 comments on commit 4654160

Please sign in to comment.