Skip to content
This repository has been archived by the owner on Sep 3, 2021. It is now read-only.

test idbstore against Store trait tests #58

Merged
merged 7 commits into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 118 additions & 59 deletions src/kv/idbstore.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::kv::{Read, Result, Store, StoreError, Write};
use async_std::sync::{Arc, Condvar, Mutex};
use async_std::sync::{Arc, Condvar, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use async_std::task;
use async_trait::async_trait;
use futures::channel::oneshot;
Expand All @@ -8,7 +8,7 @@ use log::warn;
use std::collections::HashMap;
use wasm_bindgen::closure::Closure;
use wasm_bindgen::{JsCast, JsValue};
use web_sys::{IdbDatabase, IdbObjectStore, IdbTransaction};
use web_sys::{IdbDatabase, IdbTransaction};

impl From<String> for StoreError {
fn from(err: String) -> StoreError {
Expand All @@ -30,7 +30,52 @@ impl From<futures::channel::oneshot::Canceled> for StoreError {
}

pub struct IdbStore {
db: IdbDatabase,
// We would like:
// - tests that verify essential behavior such as tx isolation.
// - Store to be strictly serializabile.
// - implementations of Store to work in as close to the same way as possible
// in order to ensure our tests are realistic and to make replicache easy
// to reason about.
//
// Idb v2 is (strictly I think) serializable but its API makes it a bit awkward to
// test: a tx can be created and begin accepting requests before the transaction
// actually starts, which happens asynchronously and opaquely. This
// means you can open 20 write txs in parallel and start
// sending them requests and while only one of them will actually start executing,
// you can't tell which one.
//
// Note:
// - per https://www.w3.org/TR/IndexedDB-2/#transaction-lifetime-concept
// read transacitons can be executed concurrently with readwrite txs
// if the read tx is snapshot isolated and started before the readwrite tx.
// Browsers however do not do this.
// - idb v1 api allowed read txs to be re-ordered before write txs, meaning
// that indexdb was potentially not read-after-write. Chrome apparently had this
// behavior: https://lists.w3.org/Archives/Public/public-webapps/2014JanMar/0586.html).
//
// The Memstore implementation we wrote had a simpler to implement interface: at most
// one write tx can be instantiated at any time and it must be exclusive of all other txs;
// callers wait asynchronosly to start txs until this constraint can be met.
//
// Here we use a RwLock around the underlying idb in order to bring the memstore
// behavior (caller asynchronously waits to open a tx until it can proceed safely) to idb
// (caller creates a tx and sends it requests and it starts asynchronously and opaquely).
// This RwLock makes the Idbstore work like the Memstore, makes it easy to test,
// and (in my mind) makes it easier to reason about. In principle adding this lock
// mirrors the constraints in play under the hood, so in principle nbd, but there
// are probably practical considerations that make this approach less efficient
// (e.g. if implementations increase concurrency with the snapshot isolation
// loophole above). It's also the case that we lose a measure of fairness implemented by
// idb, per the spec: "User agents must ensure a reasonable level of fairness across
// transactions to prevent starvation. For example, if multiple read-only transactions
// are started one after another the implementation must not indefinitely prevent a
// pending read/write transaction from starting." Using the RwLock means the IdbStore is
// serializable, but not strictly so because the RwLock is not fair and so we don't
// guarantee temporal ordering (anyone waiting might acquire the lock).
//
// It's possible we should have gone the other way and made memstore have the idb
// interface. However the thing we should not do is have memstore and idbstore work differently.
db: RwLock<IdbDatabase>,
}

const OBJECT_STORE: &str = "chunks";
Expand Down Expand Up @@ -67,7 +112,7 @@ impl IdbStore {
request.set_onupgradeneeded(Some(onupgradeneeded.as_ref().unchecked_ref()));
receiver.await?;
Ok(Some(IdbStore {
db: request.result()?.into(),
db: RwLock::new(request.result()?.into()),
}))
}

Expand All @@ -89,61 +134,71 @@ impl IdbStore {
#[async_trait(?Send)]
impl Store for IdbStore {
async fn read<'a>(&'a self) -> Result<Box<dyn Read + 'a>> {
Ok(Box::new(ReadTransaction::new(self)?))
let db_guard = self.db.read().await;
let tx = db_guard.transaction_with_str(OBJECT_STORE)?;
Ok(Box::new(ReadTransaction::new(db_guard, tx)?))
}

async fn write<'a>(&'a self) -> Result<Box<dyn Write + 'a>> {
Ok(Box::new(WriteTransaction::new(self)?))
let db_guard = self.db.write().await;
let tx = db_guard
.transaction_with_str_and_mode(OBJECT_STORE, web_sys::IdbTransactionMode::Readwrite)?;
Ok(Box::new(WriteTransaction::new(db_guard, tx)?))
}
}

struct ReadTransaction {
struct ReadTransaction<'a> {
#[allow(dead_code)]
db: RwLockReadGuard<'a, IdbDatabase>,
tx: IdbTransaction,
store: IdbObjectStore,
}

impl ReadTransaction {
fn new(store: &IdbStore) -> Result<ReadTransaction> {
let tx = store.db.transaction_with_str(OBJECT_STORE)?;
Ok(ReadTransaction {
store: tx.object_store(OBJECT_STORE)?,
tx,
})
impl ReadTransaction<'_> {
fn new(db: RwLockReadGuard<'_, IdbDatabase>, tx: IdbTransaction) -> Result<ReadTransaction> {
Ok(ReadTransaction { db, tx })
}
}

#[async_trait(?Send)]
impl Read for ReadTransaction {
impl Read for ReadTransaction<'_> {
async fn has(&self, key: &str) -> Result<bool> {
let request = self.store.count_with_key(&key.into())?;
let (callback, receiver) = IdbStore::oneshot_callback();
request.set_onsuccess(Some(callback.as_ref().unchecked_ref()));
request.set_onerror(Some(callback.as_ref().unchecked_ref()));
receiver.await?;
let result = request.result()?;
Ok(match result.as_f64() {
Some(v) if v >= 1.0 => true,
Some(_) => false,
_ => {
warn!("IdbStore.count returned non-float {:?}", result);
false
}
})
has_impl(&self.tx, key).await
}

async fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
let request = self.store.get(&key.into())?;
let (callback, receiver) = IdbStore::oneshot_callback();
request.set_onsuccess(Some(callback.as_ref().unchecked_ref()));
request.set_onerror(Some(callback.as_ref().unchecked_ref()));
receiver.await?;
Ok(match request.result()? {
v if v.is_undefined() => None,
v => Some(js_sys::Uint8Array::new(&v).to_vec()),
})
get_impl(&self.tx, key).await
}
}

async fn has_impl(tx: &IdbTransaction, key: &str) -> Result<bool> {
let request = tx.object_store(OBJECT_STORE)?.count_with_key(&key.into())?;
let (callback, receiver) = IdbStore::oneshot_callback();
request.set_onsuccess(Some(callback.as_ref().unchecked_ref()));
request.set_onerror(Some(callback.as_ref().unchecked_ref()));
receiver.await?;
let result = request.result()?;
Ok(match result.as_f64() {
Some(v) if v >= 1.0 => true,
Some(_) => false,
_ => {
warn!("IdbStore.count returned non-float {:?}", result);
false
}
})
}

async fn get_impl(tx: &IdbTransaction, key: &str) -> Result<Option<Vec<u8>>> {
let request = tx.object_store(OBJECT_STORE)?.get(&key.into())?;
let (callback, receiver) = IdbStore::oneshot_callback();
request.set_onsuccess(Some(callback.as_ref().unchecked_ref()));
request.set_onerror(Some(callback.as_ref().unchecked_ref()));
receiver.await?;
Ok(match request.result()? {
v if v.is_undefined() => None,
v => Some(js_sys::Uint8Array::new(&v).to_vec()),
})
}

#[derive(PartialEq, Eq, Debug)]
enum WriteState {
Open,
Expand All @@ -152,29 +207,26 @@ enum WriteState {
Errored,
}

struct WriteTransaction {
rt: ReadTransaction,
struct WriteTransaction<'a> {
#[allow(dead_code)]
db: RwLockWriteGuard<'a, IdbDatabase>,
tx: IdbTransaction,
pending: Mutex<HashMap<String, Option<Vec<u8>>>>,
pair: Arc<(Mutex<WriteState>, Condvar)>,
callbacks: Vec<Closure<dyn FnMut()>>,
}

impl WriteTransaction {
fn new(store: &IdbStore) -> Result<WriteTransaction> {
let tx = store
.db
.transaction_with_str_and_mode(OBJECT_STORE, web_sys::IdbTransactionMode::Readwrite)?;
impl WriteTransaction<'_> {
fn new(db: RwLockWriteGuard<'_, IdbDatabase>, tx: IdbTransaction) -> Result<WriteTransaction> {
let mut wt = WriteTransaction {
rt: ReadTransaction {
store: tx.object_store(OBJECT_STORE)?,
tx,
},
db,
tx,
pair: Arc::new((Mutex::new(WriteState::Open), Condvar::new())),
pending: Mutex::new(HashMap::new()),
callbacks: Vec::with_capacity(3),
};

let tx = &wt.rt.tx;
let tx = &wt.tx;
let callback = wt.tx_callback(WriteState::Committed);
tx.set_oncomplete(Some(callback.as_ref().unchecked_ref()));
wt.callbacks.push(callback);
Expand Down Expand Up @@ -204,30 +256,32 @@ impl WriteTransaction {
}

#[async_trait(?Send)]
impl Read for WriteTransaction {
impl Read for WriteTransaction<'_> {
async fn has(&self, key: &str) -> Result<bool> {
match self.pending.lock().await.get(key) {
Some(Some(_)) => Ok(true),
Some(None) => Ok(false),
None => self.rt.has(key).await,
None => has_impl(&self.tx, key).await,
}
}

async fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
match self.pending.lock().await.get(key) {
Some(Some(v)) => Ok(Some(v.to_vec())),
Some(None) => Ok(None),
None => self.rt.get(key).await,
None => get_impl(&self.tx, key).await,
}
}
}

#[async_trait(?Send)]
impl Write for WriteTransaction {
impl Write for WriteTransaction<'_> {
fn as_read(&self) -> &dyn Read {
self
}

// We hold writes in memory until the API user calls commit
// to ensure that we don't let partial transactions auto-commit.
async fn put(&self, key: &str, value: &[u8]) -> Result<()> {
self.pending
.lock()
Expand All @@ -251,7 +305,7 @@ impl Write for WriteTransaction {
return Ok(());
}

let store = self.rt.tx.object_store(OBJECT_STORE)?;
let store = self.tx.object_store(OBJECT_STORE)?;
let mut callbacks = Vec::with_capacity(pending.len());
let mut requests: Vec<oneshot::Receiver<()>> = Vec::with_capacity(pending.len());
for (key, value) in pending.iter() {
Expand All @@ -270,7 +324,7 @@ impl Write for WriteTransaction {
let state = cv
.wait_until(lock.lock().await, |state| *state != WriteState::Open)
.await;
if let Some(e) = self.rt.tx.error() {
if let Some(e) = self.tx.error() {
return Err(format!("{:?}", e).into());
}
if *state != WriteState::Committed {
Expand All @@ -292,11 +346,11 @@ impl Write for WriteTransaction {
_ => (),
}

self.rt.tx.abort()?;
self.tx.abort()?;
let state = cv
.wait_until(lock.lock().await, |state| *state != WriteState::Open)
.await;
if let Some(e) = self.rt.tx.error() {
if let Some(e) = self.tx.error() {
return Err(format!("{:?}", e).into());
}
if *state != WriteState::Aborted {
Expand All @@ -305,3 +359,8 @@ impl Write for WriteTransaction {
Ok(())
}
}

mod tests {
// Idbstore is integration tested because web_sys only lives in browsers.
// See tests/ at top level.
}
Loading