Skip to content

Commit

Permalink
Merge #186
Browse files Browse the repository at this point in the history
186: Safe grow_hashtable r=Amanieu a=faern

Calling `grow_hashtable` can't do unsafe stuff unless `HASHTABLE` has been wrongfully manipulated. So there are no safety invariants to uphold as a caller. So it does not need to be `unsafe`. The little unsafe Rust used inside it has instead been contained and documented.

Moving the hardest to understand/read part, linked list handling, into a separate function for isolation and to make it easier to understand on its own.

Co-authored-by: Linus Färnstrand <[email protected]>
  • Loading branch information
bors[bot] and faern authored Oct 10, 2019
2 parents 4f61bbe + 617c988 commit 99e7e46
Showing 1 changed file with 57 additions and 47 deletions.
104 changes: 57 additions & 47 deletions core/src/parking_lot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,7 @@ impl ThreadData {
// Keep track of the total number of live ThreadData objects and resize
// the hash table accordingly.
let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
unsafe {
grow_hashtable(num_threads);
}
grow_hashtable(num_threads);

ThreadData {
parker: ThreadParker::new(),
Expand Down Expand Up @@ -239,57 +237,44 @@ fn create_hashtable() -> &'static HashTable {
// Grow the hash table so that it is big enough for the given number of threads.
// This isn't performance-critical since it is only done when a ThreadData is
// created, which only happens once per thread.
unsafe fn grow_hashtable(num_threads: usize) {
let mut old_table = get_hashtable();
loop {
fn grow_hashtable(num_threads: usize) {
// Lock all buckets in the existing table and get a reference to it
let old_table = loop {
let table = get_hashtable();

// Check if we need to resize the existing table
if (*old_table).entries.len() >= LOAD_FACTOR * num_threads {
if table.entries.len() >= LOAD_FACTOR * num_threads {
return;
}

// Lock all buckets in the old table
for b in &(*old_table).entries[..] {
b.mutex.lock();
for bucket in &table.entries[..] {
bucket.mutex.lock();
}

// Now check if our table is still the latest one. Another thread could
// have grown the hash table between us reading HASHTABLE and locking
// the buckets.
if HASHTABLE.load(Ordering::Relaxed) == old_table as *const _ as *mut _ {
break;
if HASHTABLE.load(Ordering::Relaxed) == table as *const _ as *mut _ {
break table;
}

// Unlock buckets and try again
for b in &(*old_table).entries[..] {
for bucket in &table.entries[..] {
// SAFETY: We hold the lock here, as required
b.mutex.unlock();
unsafe { bucket.mutex.unlock() };
}

// SAFETY: when not null, `HASHTABLE` always points to a `HashTable` that is never freed.
// Here we know `get_hashtable` above must have initialized `HASHTABLE`
old_table = &*HASHTABLE.load(Ordering::Acquire);
}
};

// Create the new table
let new_table = HashTable::new(num_threads, old_table);
let mut new_table = HashTable::new(num_threads, old_table);

// Move the entries from the old table to the new one
for b in &(*old_table).entries[..] {
let mut current = b.queue_head.get();
while !current.is_null() {
let next = (*current).next_in_queue.get();
let hash = hash((*current).key.load(Ordering::Relaxed), new_table.hash_bits);
if new_table.entries[hash].queue_tail.get().is_null() {
new_table.entries[hash].queue_head.set(current);
} else {
(*new_table.entries[hash].queue_tail.get())
.next_in_queue
.set(current);
}
new_table.entries[hash].queue_tail.set(current);
(*current).next_in_queue.set(ptr::null());
current = next;
}
for bucket in &old_table.entries[..] {
// SAFETY: The park, unpark* and check_wait_graph_fast functions create only correct linked
// lists. All `ThreadData` instances in these lists will remain valid as long as they are
// present in the lists, meaning as long as their threads are parked.
unsafe { rehash_bucket_into(bucket, &mut new_table) };
}

// Publish the new table. No races are possible at this point because
Expand All @@ -298,9 +283,36 @@ unsafe fn grow_hashtable(num_threads: usize) {
HASHTABLE.store(Box::into_raw(new_table), Ordering::Release);

// Unlock all buckets in the old table
for b in &(*old_table).entries[..] {
for bucket in &old_table.entries[..] {
// SAFETY: We hold the lock here, as required
b.mutex.unlock();
unsafe { bucket.mutex.unlock() };
}
}

/// Iterate through all `ThreadData` objects in the bucket and insert them into the given table
/// in the bucket their key correspond to for this table.
///
/// # Safety
///
/// The given `bucket` must have a correctly constructed linked list under `queue_head`, containing
/// `ThreadData` instances that must stay valid at least as long as the given `table` is in use.
///
/// The given `table` must only contain buckets with correctly constructed linked lists.
unsafe fn rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable) {
let mut current: *const ThreadData = bucket.queue_head.get();
while !current.is_null() {
let next = (*current).next_in_queue.get();
let hash = hash((*current).key.load(Ordering::Relaxed), table.hash_bits);
if table.entries[hash].queue_tail.get().is_null() {
table.entries[hash].queue_head.set(current);
} else {
(*table.entries[hash].queue_tail.get())
.next_in_queue
.set(current);
}
table.entries[hash].queue_tail.set(current);
(*current).next_in_queue.set(ptr::null());
current = next;
}
}

Expand All @@ -320,12 +332,11 @@ fn hash(key: usize, bits: u32) -> usize {
/// The returned bucket must be unlocked again in order to not cause deadlocks.
#[inline]
fn lock_bucket(key: usize) -> &'static Bucket {
let mut bucket;
loop {
let hashtable = get_hashtable();

let hash = hash(key, hashtable.hash_bits);
bucket = &hashtable.entries[hash];
let bucket = &hashtable.entries[hash];

// Lock the bucket
bucket.mutex.lock();
Expand All @@ -347,13 +358,12 @@ fn lock_bucket(key: usize) -> &'static Bucket {
/// The returned bucket must be unlocked again in order to not cause deadlocks.
#[inline]
fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) {
let mut bucket;
loop {
let hashtable = get_hashtable();
let current_key = key.load(Ordering::Relaxed);

let hash = hash(current_key, hashtable.hash_bits);
bucket = &hashtable.entries[hash];
let bucket = &hashtable.entries[hash];

// Lock the bucket
bucket.mutex.lock();
Expand All @@ -380,18 +390,18 @@ fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) {
/// careful to only unlock it once in this case, always use `unlock_bucket_pair`.
#[inline]
fn lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket) {
let mut bucket1;
loop {
let hashtable = get_hashtable();

// Get the lowest bucket first
let hash1 = hash(key1, hashtable.hash_bits);
let hash2 = hash(key2, hashtable.hash_bits);
if hash1 <= hash2 {
bucket1 = &hashtable.entries[hash1];

// Get the bucket at the lowest hash/index first
let bucket1 = if hash1 <= hash2 {
&hashtable.entries[hash1]
} else {
bucket1 = &hashtable.entries[hash2];
}
&hashtable.entries[hash2]
};

// Lock the first bucket
bucket1.mutex.lock();
Expand Down

0 comments on commit 99e7e46

Please sign in to comment.