Skip to content

Commit

Permalink
feat(services/rocksdb): change blocking_x in async_x call to tokio::t…
Browse files Browse the repository at this point in the history
…ask::blocking_spawn (#3279)
  • Loading branch information
shauvet authored Oct 14, 2023
1 parent 378d786 commit c5ea00a
Showing 1 changed file with 27 additions and 4 deletions.
31 changes: 27 additions & 4 deletions core/src/services/rocksdb/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ use std::sync::Arc;

use async_trait::async_trait;
use rocksdb::DB;
use tokio::task;

use crate::raw::adapters::kv;
use crate::raw::*;
use crate::Result;
use crate::*;

Expand Down Expand Up @@ -118,31 +120,52 @@ impl kv::Adapter for Adapter {
}

async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
self.blocking_get(path)
let cloned_self = self.clone();
let cloned_path = path.to_string();

task::spawn_blocking(move || cloned_self.blocking_get(cloned_path.as_str()))
.await
.map_err(new_task_join_error)?
}

fn blocking_get(&self, path: &str) -> Result<Option<Vec<u8>>> {
Ok(self.db.get(path)?)
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
self.blocking_set(path, value)
let cloned_self = self.clone();
let cloned_path = path.to_string();
let cloned_value = value.to_vec();

task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), &cloned_value))
.await
.map_err(new_task_join_error)?
}

fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
Ok(self.db.put(path, value)?)
}

async fn delete(&self, path: &str) -> Result<()> {
self.blocking_delete(path)
let cloned_self = self.clone();
let cloned_path = path.to_string();

task::spawn_blocking(move || cloned_self.blocking_delete(cloned_path.as_str()))
.await
.map_err(new_task_join_error)?
}

fn blocking_delete(&self, path: &str) -> Result<()> {
Ok(self.db.delete(path)?)
}

async fn scan(&self, path: &str) -> Result<Vec<String>> {
self.blocking_scan(path)
let cloned_self = self.clone();
let cloned_path = path.to_string();

task::spawn_blocking(move || cloned_self.blocking_scan(cloned_path.as_str()))
.await
.map_err(new_task_join_error)?
}

/// TODO: we only need key here.
Expand Down

0 comments on commit c5ea00a

Please sign in to comment.