Skip to content

Commit

Permalink
support inline skiplist (#85)
Browse files Browse the repository at this point in the history
Introduce a concurrent skiplist to support lock-free ops for memtable. This is the first step for scaling the throughput of processing write requests.

Signed-off-by: accelsao <[email protected]>
Co-authored-by: Fullstop000 <[email protected]>
  • Loading branch information
jayzhan211 and Fullstop000 authored Nov 9, 2020
1 parent 4808826 commit 9014de2
Show file tree
Hide file tree
Showing 12 changed files with 1,012 additions and 366 deletions.
521 changes: 263 additions & 258 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ name = "wickdb"
version = "0.1.0"

[dependencies]
bytes = "0.5.6"
crc32c = "0.4.0"
crossbeam-channel = "0.4.0"
crossbeam-utils = "0.7.0"
Expand Down
16 changes: 8 additions & 8 deletions benches/mem/arena.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use wickdb::mem::arena::*;

static CHUNK_SIZE: [usize; 6] = [256, 1024, 4096, 10000, 16384, 33333];

fn bench_allocate(c: &mut Criterion) {
fn bench_block_arena_allocate(c: &mut Criterion) {
let mut group = c.benchmark_group("BlockArena::allocate");
for size in CHUNK_SIZE.clone().iter() {
group.bench_with_input(
Expand All @@ -12,24 +12,24 @@ fn bench_allocate(c: &mut Criterion) {
|b: &mut Bencher, size| {
b.iter_batched(
|| BlockArena::default(),
|arena| arena.allocate(*size),
|arena| arena.allocate::<u8>(*size, 8),
BatchSize::PerIteration,
);
},
);
}
}

fn bench_allocate_aligned(c: &mut Criterion) {
let mut group = c.benchmark_group("BlockArena::allocate_aligned");
fn bench_offset_arena_allocate(c: &mut Criterion) {
let mut group = c.benchmark_group("OffsetArena::allocate");
for size in CHUNK_SIZE.clone().iter() {
group.bench_with_input(
BenchmarkId::from_parameter(size),
size,
|b: &mut Bencher, size| {
b.iter_batched(
|| BlockArena::default(),
|arena| arena.allocate_aligned(*size),
|| ArenaV2::with_capacity(1 << 10),
|arena| arena.allocate::<u8>(*size, 8),
BatchSize::PerIteration,
);
},
Expand All @@ -38,6 +38,6 @@ fn bench_allocate_aligned(c: &mut Criterion) {
}

pub fn bench_arena(c: &mut Criterion) {
bench_allocate(c);
bench_allocate_aligned(c);
bench_block_arena_allocate(c);
bench_offset_arena_allocate(c);
}
2 changes: 1 addition & 1 deletion benches/mem/skiplist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn bench_insert(c: &mut Criterion) {
vec![0u8; *length],
)
},
|(s, key)| s.insert(&key),
|(s, key)| s.insert(key),
BatchSize::PerIteration,
)
},
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
stable
nightly
3 changes: 3 additions & 0 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ impl<S: Storage + Clone, C: Comparator + 'static> WickDB<S, C> {
wick_db.process_compaction();
wick_db.process_batch();
// Schedule a compaction to current version for potential unfinished work
debug!("Try to schedule a compaction on opening db");
wick_db.inner.maybe_schedule_compaction(current);
Ok(wick_db)
}
Expand Down Expand Up @@ -1887,7 +1888,9 @@ mod tests {
thread::sleep(Duration::from_secs(2));
t.assert_file_num_at_level(2, 1);
// Try to retrieve key "foo" from level 0 files
t.assert_get("k1", Some(&"x".repeat(100000)));
assert_eq!("v1", t.get("foo", None).unwrap()); // "v1" on SST files
t.assert_get("k2", Some(&"y".repeat(100000)));
}
}

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ extern crate slog_async;
extern crate slog_term;
#[macro_use]
extern crate num_derive;
extern crate bytes;
extern crate quick_error;
extern crate rand;
extern crate snap;
Expand Down
126 changes: 86 additions & 40 deletions src/mem/arena.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,97 @@

use std::cell::RefCell;
use std::mem;
use std::ptr;
use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use std::sync::Arc;

const BLOCK_SIZE: usize = 4096;

// TODO(fullstop000): Add Send + Sync constrait?
pub trait Arena {
/// Return a pointer to a newly allocated memory block of 'chunk' bytes.
fn allocate(&self, chunk: usize) -> *mut u8;

/// Allocate memory with the normal alignment guarantees provided by underlying allocator.
/// NOTE: the implementation is aligned with usize (32 or 64)
fn allocate_aligned(&self, aligned: usize) -> *mut u8;
/// Return the start pointer to a newly allocated memory block of 'chunk' bytes .
fn allocate<T>(&self, chunk: usize, align: usize) -> *mut T;

/// Return the size of memory that has been allocated.
fn memory_used(&self) -> usize;
}

struct ArenaInner {
len: AtomicUsize,
cap: usize,
ptr: *mut u8,
}

#[derive(Clone)]
pub struct ArenaV2 {
inner: Arc<ArenaInner>,
}

impl Drop for ArenaInner {
fn drop(&mut self) {
// manully drop ArenaInner
if !self.ptr.is_null() {
unsafe {
let ptr = self.ptr as *mut u64;
let cap = self.cap / 8;
Vec::from_raw_parts(ptr, 0, cap);
}
}
}
}

impl Arena for ArenaV2 {
fn allocate<T>(&self, chunk: usize, align: usize) -> *mut T {
let offset = self.alloc(align, chunk);
unsafe { self.get_mut(offset) }
}

/// Return the size of memory that has been allocated.
fn memory_used(&self) -> usize {
self.inner.len.load(Ordering::SeqCst)
}
}

unsafe impl Send for ArenaV2 {}
unsafe impl Sync for ArenaV2 {}

impl ArenaV2 {
// The real cap will be aligned with 8
pub fn with_capacity(cap: usize) -> Self {
let mut buf: Vec<u64> = Vec::with_capacity(cap / 8);
let ptr = buf.as_mut_ptr() as *mut u8;
let cap = buf.capacity() * 8;
mem::forget(buf);
ArenaV2 {
inner: Arc::new(ArenaInner {
len: AtomicUsize::new(1),
cap,
ptr,
}),
}
}

// Allocates `size` bytes aligned with `align`
fn alloc(&self, align: usize, size: usize) -> usize {
let align_mask = align - 1;
// Leave enough padding for align.
let size = size + align_mask;
let offset = self.inner.len.fetch_add(size, Ordering::SeqCst);
// (offset + align_mask) / align * align.
let ptr_offset = (offset + align_mask) & !align_mask;
assert!(offset + size <= self.inner.cap);
ptr_offset
}

// Returns a raw pointer with given arena offset
unsafe fn get_mut<N>(&self, offset: usize) -> *mut N {
if offset == 0 {
return ptr::null_mut();
}
self.inner.ptr.add(offset) as _
}
}

/// `BlockArena` is a memory pool for allocating and handling Node memory dynamically.
/// It's caller's responsibility to ensure the room before allocating.
///
Expand All @@ -47,7 +122,7 @@ pub struct BlockArena {
}

impl BlockArena {
pub(super) fn allocate_fallback(&self, size: usize) -> *mut u8 {
fn allocate_fallback(&self, size: usize) -> *mut u8 {
if size > BLOCK_SIZE / 4 {
// Object is more than a quarter of our block size. Allocate it separately
// to avoid wasting too much space in leftover bytes.
Expand All @@ -74,27 +149,9 @@ impl BlockArena {
}

impl Arena for BlockArena {
fn allocate(&self, chunk: usize) -> *mut u8 {
// The semantics of what to return are a bit messy if we allow
// 0-byte allocations, so we disallow them here (we don't need
// them for our internal use).
assert!(chunk > 0);
if chunk <= self.bytes_remaining.load(Ordering::Acquire) {
let p = self.ptr.load(Ordering::Acquire);
unsafe {
self.ptr.store(p.add(chunk), Ordering::Release);
self.bytes_remaining.fetch_sub(chunk, Ordering::SeqCst);
}
p
} else {
self.allocate_fallback(chunk)
}
}

fn allocate_aligned(&self, chunk: usize) -> *mut u8 {
fn allocate<T>(&self, chunk: usize, align: usize) -> *mut T {
assert!(chunk > 0);
let ptr_size = mem::size_of::<usize>();
let align = if ptr_size > 8 { ptr_size } else { 8 };
// the align should be a pow(2)
assert_eq!(align & (align - 1), 0);

Expand Down Expand Up @@ -124,7 +181,7 @@ impl Arena for BlockArena {
"allocated memory should be aligned with {}",
ptr_size
);
result
result as *mut T
}

#[inline]
Expand Down Expand Up @@ -153,14 +210,7 @@ mod tests {
#[should_panic]
fn test_allocate_empty_should_panic() {
let a = BlockArena::default();
a.allocate(0);
}

#[test]
#[should_panic]
fn test_allocate_empty_aligned_should_panic() {
let a = BlockArena::default();
a.allocate_aligned(0);
a.allocate::<u8>(0, 0);
}

#[test]
Expand Down Expand Up @@ -210,11 +260,7 @@ mod tests {
r.gen_range(1, i)
}
};
let ptr = if i % 2 == 0 {
a.allocate_aligned(size)
} else {
a.allocate(size)
};
let ptr = a.allocate::<u8>(size, 8);
unsafe {
for j in 0..size {
let np = ptr.add(j);
Expand Down
Loading

0 comments on commit 9014de2

Please sign in to comment.