Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add wasm-bindgen-futures as an optional runtime for AsyncDB, target wasm32-unknown-unknown support #49

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,25 @@ fs2 = { optional = true, version = "0.4.3" }

tokio = { optional = true, features = ["rt", "sync"], version = "1.39.3" }
async-std = { optional = true, version = "1.12.0" }
wasm-bindgen-futures = { optional = true, version = "0.4.24" }
getrandom = { optional = true, version = "0.2.15", features = ["js"] }

[features]
default = ["fs"]
default = ["fs", "asyncdb-wasm-bindgen-futures"]
async = ["asyncdb-tokio"]
asyncdb-tokio = ["tokio"]
asyncdb-async-std = ["async-std"]
asyncdb-wasm-bindgen-futures = [
"wasm-bindgen-futures",
"async-std/async-channel",
"getrandom",
]
fs = ["errno", "fs2"]

[dev-dependencies]
time-test = "0.2"
bencher = "0.1"
wasm-bindgen-test = "0.3.0"

[[bench]]
name = "maps_bench"
Expand Down
81 changes: 78 additions & 3 deletions src/asyncdb.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::collections::hash_map::HashMap;

use crate::{
send_response, send_response_result, AsyncDB, Message, Result, Status, StatusCode, WriteBatch,
DB,
send_response, send_response_result, snapshot::Snapshot, AsyncDB, Message, Result, Status,
StatusCode, WriteBatch, DB,
};

pub(crate) const CHANNEL_BUFFER_SIZE: usize = 32;

#[derive(Clone, Copy)]
pub struct SnapshotRef(usize);
pub struct SnapshotRef(pub(crate) usize);

/// A request sent to the database thread.
pub(crate) enum Request {
Expand Down Expand Up @@ -151,6 +151,78 @@ impl AsyncDB {
}
}

pub(crate) fn match_message(
db: &mut DB,
mut recv: impl ReceiverExt<Message>,
snapshots: &mut HashMap<usize, Snapshot>,
snapshot_counter: &mut usize,
message: Message,
) {
match message.req {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you refactor run_server() to use match_message()?

Request::Close => {
send_response(message.resp_channel, Response::OK);
recv.close();
return;
}
Request::Put { key, val } => {
let ok = db.put(&key, &val);
send_response_result(message.resp_channel, ok);
}
Request::Delete { key } => {
let ok = db.delete(&key);
send_response_result(message.resp_channel, ok);
}
Request::Write { batch, sync } => {
let ok = db.write(batch, sync);
send_response_result(message.resp_channel, ok);
}
Request::Flush => {
let ok = db.flush();
send_response_result(message.resp_channel, ok);
}
Request::GetAt { snapshot, key } => {
let snapshot_id = snapshot.0;
if let Some(snapshot) = snapshots.get(&snapshot_id) {
let ok = db.get_at(snapshot, &key);
match ok {
Err(e) => {
send_response(message.resp_channel, Response::Error(e));
}
Ok(v) => {
send_response(message.resp_channel, Response::Value(v));
}
};
} else {
send_response(
message.resp_channel,
Response::Error(Status {
code: StatusCode::AsyncError,
err: "Unknown snapshot reference: this is a bug".to_string(),
}),
);
}
}
Request::Get { key } => {
let r = db.get(&key);
send_response(message.resp_channel, Response::Value(r));
}
Request::GetSnapshot => {
snapshots.insert(*snapshot_counter, db.get_snapshot());
let sref = SnapshotRef(*snapshot_counter);
*snapshot_counter += 1;
send_response(message.resp_channel, Response::Snapshot(sref));
}
Request::DropSnapshot { snapshot } => {
snapshots.remove(&snapshot.0);
send_response_result(message.resp_channel, Ok(()));
}
Request::CompactRange { from, to } => {
let ok = db.compact_range(&from, &to);
send_response_result(message.resp_channel, ok);
}
}
}

pub(crate) fn run_server(mut db: DB, mut recv: impl ReceiverExt<Message>) {
let mut snapshots = HashMap::new();
let mut snapshot_counter: usize = 0;
Expand Down Expand Up @@ -225,5 +297,8 @@ impl AsyncDB {

pub(crate) trait ReceiverExt<T> {
fn blocking_recv(&mut self) -> Option<T>;
async fn recv(&mut self) -> Option<T> {
self.blocking_recv()
}
fn close(&mut self);
}
1 change: 1 addition & 0 deletions src/asyncdb_async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub(crate) struct Message {
pub(crate) req: Request,
pub(crate) resp_channel: channel::Sender<Response>,
}

/// `AsyncDB` makes it easy to use LevelDB in a async-std runtime.
/// The methods follow very closely the main API (see `DB` type). Iteration is not yet implemented.
#[derive(Clone)]
Expand Down
151 changes: 151 additions & 0 deletions src/asyncdb_wasm_bindgen_futures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use std::collections::HashMap;
use std::path::Path;

use async_std::channel::{self, TryRecvError};
use wasm_bindgen_futures::spawn_local;

use crate::asyncdb::{ReceiverExt, Request, Response, CHANNEL_BUFFER_SIZE};
use crate::snapshot::Snapshot;
use crate::{Options, Result, Status, StatusCode, DB};

pub(crate) struct Message {
pub(crate) req: Request,
pub(crate) resp_channel: channel::Sender<Response>,
}

/// `AsyncDB` makes it easy to use LevelDB in a async-std runtime.
/// The methods follow very closely the main API (see `DB` type). Iteration is not yet implemented.
#[derive(Clone)]
pub struct AsyncDB {
shutdown: channel::Sender<()>,
send: channel::Sender<Message>,
}

impl AsyncDB {
/// Create a new or open an existing database.
pub fn new<P: AsRef<Path>>(name: P, opts: Options) -> Result<AsyncDB> {
let db = DB::open(name, opts)?;

let (send, recv) = channel::bounded(CHANNEL_BUFFER_SIZE);
let (shutdown, shutdown_recv) = channel::bounded(1);

spawn_local(async move {
AsyncDB::run_server_async(db, recv, shutdown_recv, HashMap::new(), 0).await;
});

Ok(AsyncDB { shutdown, send })
}

pub(crate) async fn process_request(&self, req: Request) -> Result<Response> {
let (tx, rx) = channel::bounded(1);

let m = Message {
req,
resp_channel: tx,
};
if let Err(e) = self.send.send(m).await {
return Err(Status {
code: StatusCode::AsyncError,
err: e.to_string(),
});
}
let resp = rx.recv().await;
match resp {
Err(e) => Err(Status {
code: StatusCode::AsyncError,
err: e.to_string(),
}),
Ok(r) => Ok(r),
}
}

pub(crate) async fn run_server_async(
mut db: DB,
mut recv: impl ReceiverExt<Message> + Clone + 'static,
mut shutdown: impl ReceiverExt<()> + Clone + 'static,
mut snapshots: HashMap<usize, Snapshot>,
mut snapshot_counter: usize,
) {
if let Some(message) = recv.recv().await {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the plan/idea behind this? For AsyncDB, it's clear: run the blocking I/O bound code on a background thread to keep the main loop responsive.

Why then implement the background thread using asynchronous I/O? If

  1. you don't have real threads in whatever runtime you're running your code, and run this in a background task on Tokio/Async-std/whatever, you can do all DB operations in-line in your tasks, with less overhead (no channel communication etc.);
  2. if you do have real threads in your runtime, you can just use the original implementation.

From what I can see here it look like we're dealing with case 1, that's how spawn_local() is documented in the wasm_bindgen_futures crate.

What am I missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's a bit weird to be honest (and even weirder when you try an implement an Env that will work in WASM/the browser).

My goal was to get this to work with wasm-bindgen-futures, so that it can run in a WASM environment in the browser. Right now, that would mean in-memory only, although I was starting to poke at what a WASM-based Env would look like to run it e.g. in a browser.

The JS runtime is single-threaded, but uses Promises for asynchronous scheduling, so that the browser doesn't block on waiting for something. Though, working through the Env trait, I think it might be more challenging to implement (e.g. how do we sleep in WASM when we don't have time available), so any wasm32-unknown-unknown target would be in-memory only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and ultimately it would be backed by IndexedDB, which is LevelDB at it's core so it's a bit of a :spider-man-pointing: situation -- maybe it's better to build what I'm building in IndexedDB directly for WASM and use this for the non-WASM Rust stuff)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your last point: unless the IndexedDB API is so restrictive as to be useless (I don't know it myself), it is probably best to use it directly, and instead abstract LevelDB under a common database interface (instead of abstracting storage below LevelDB). It might save you some time and sweat.

Self::match_message(
&mut db,
recv.clone(),
&mut snapshots,
&mut snapshot_counter,
message,
);
}

spawn_local(async move {
// check shutdown
if let Some(()) = shutdown.recv().await {
return;
} else {
AsyncDB::run_server_async(db, recv, shutdown, snapshots, snapshot_counter).await
};
});
}

pub(crate) async fn stop_server_async(&self) {
self.shutdown.close();
}
}

pub(crate) fn send_response_result(ch: channel::Sender<Response>, result: Result<()>) {
if let Err(e) = result {
ch.try_send(Response::Error(e)).ok();
} else {
ch.try_send(Response::OK).ok();
}
}

pub(crate) fn send_response(ch: channel::Sender<Response>, res: Response) {
ch.send_blocking(res).ok();
}

impl ReceiverExt<Message> for channel::Receiver<Message> {
fn blocking_recv(&mut self) -> Option<Message> {
self.recv_blocking().ok()
}

fn close(&mut self) {
channel::Receiver::close(self);
}

async fn recv(&mut self) -> Option<Message> {
channel::Receiver::recv(&self).await.ok()
}
}

impl ReceiverExt<()> for channel::Receiver<()> {
fn blocking_recv(&mut self) -> Option<()> {
self.recv_blocking().ok()
}

fn close(&mut self) {
channel::Receiver::close(self);
}

async fn recv(&mut self) -> Option<()> {
match channel::Receiver::try_recv(&self) {
Ok(_) => Some(()),
Err(TryRecvError::Empty) => None,
Err(TryRecvError::Closed) => Some(()),
}
}
}

#[cfg(test)]
pub mod tests {
use crate::{in_memory, AsyncDB};
use wasm_bindgen_test::wasm_bindgen_test;

#[wasm_bindgen_test]
async fn test_asyncdb() {
let db = AsyncDB::new("test.db", in_memory()).unwrap();
db.put(b"key".to_vec(), b"value".to_vec()).await.unwrap();
let val = db.get(b"key".to_vec()).await.unwrap();
assert_eq!(val, Some(b"value".to_vec()));
db.stop_server_async().await;
}
}
13 changes: 12 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ extern crate time_test;
#[macro_use]
mod infolog;

#[cfg(any(feature = "asyncdb-tokio", feature = "asyncdb-async-std"))]
#[cfg(any(
feature = "asyncdb-tokio",
feature = "asyncdb-async-std",
feature = "asyncdb-wasm-bindgen-futures"
))]
mod asyncdb;

#[cfg(feature = "asyncdb-tokio")]
Expand All @@ -53,6 +57,11 @@ mod asyncdb_async_std;
#[cfg(feature = "asyncdb-async-std")]
use asyncdb_async_std::{send_response, send_response_result, Message};

#[cfg(feature = "asyncdb-wasm-bindgen-futures")]
mod asyncdb_wasm_bindgen_futures;
#[cfg(feature = "asyncdb-wasm-bindgen-futures")]
use self::asyncdb_wasm_bindgen_futures::{send_response, send_response_result, Message};

mod block;
mod block_builder;
mod blockhandle;
Expand Down Expand Up @@ -96,6 +105,8 @@ pub mod env;
pub use asyncdb_async_std::AsyncDB;
#[cfg(feature = "asyncdb-tokio")]
pub use asyncdb_tokio::AsyncDB;
#[cfg(feature = "asyncdb-wasm-bindgen-futures")]
pub use asyncdb_wasm_bindgen_futures::AsyncDB;
pub use cmp::{Cmp, DefaultCmp};
pub use compressor::{Compressor, CompressorId};
pub use db_impl::DB;
Expand Down
Loading