Skip to content

Commit

Permalink
Update code
Browse files Browse the repository at this point in the history
Signed-off-by: Manjusaka <[email protected]>
  • Loading branch information
Zheaoli committed Sep 29, 2023
1 parent 196c09f commit f2c973a
Showing 1 changed file with 66 additions and 43 deletions.
109 changes: 66 additions & 43 deletions core/src/services/sqlite/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,68 +204,91 @@ impl kv::Adapter for Adapter {
}

async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
let connection_string = self.connection_string.clone();
let value_field = self.value_field.clone();
let table = self.table.clone();
let key_field = self.key_field.clone();
let new_path = path.to_string();

task::spawn_blocking(move || {
self.blocking_get(path);
let query = format!(
"SELECT {} FROM {} WHERE `{}` = $1 LIMIT 1",
value_field, table, key_field
);
let conn = Connection::open(connection_string).map_err(|err| Error::from(err))?;
let mut statement = conn.prepare(&query).map_err(|err| Error::from(err))?;
let result = statement.query_row(&[new_path.as_str()], |row| Ok(row.get(0)?));
match result {
Ok(v) => Ok(Some(v)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(err) => Err(Error::from(err)),
}
})
.await
.unwrap();
}

fn blocking_get(&self, path: &str) -> Result<Option<Vec<u8>>> {
let query = format!(
"SELECT {} FROM {} WHERE `{}` = $1 LIMIT 1",
self.value_field, self.table, self.key_field
);
let conn = Connection::open(self.connection_string.clone()).map_err(|err| Error::from)?;
let mut statement = conn.prepare(&query).map_err(|err| Error::from)?;
let result = statement.query_row(&[path], |row| Ok(row.get(0)?));
match result {
Ok(v) => Ok(Some(v)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(err) => Error::from,
}
.map_err(|err| Error::from(err))
.and_then(|inner_result| inner_result)
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
let connection_string = self.connection_string.clone();
let table = self.table.clone();
let key_field = self.key_field.clone();
let value_field = self.value_field.clone();
let new_path = path.to_string();
let new_value = value.to_vec();

task::spawn_blocking(move || {
self.blocking_set(path, value);
let query = format!(
"INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)",
table, key_field, value_field
);
let conn = Connection::open(connection_string).map_err(|err| Error::from(err))?;
let mut statement = conn.prepare(&query).map_err(|err| Error::from(err))?;
statement
.execute(params![new_path, new_value])
.map_err(|err| Error::from(err))?;
Ok(())
})
.await
.unwrap();
}

fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
let query = format!(
"INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)",
self.table, self.key_field, self.value_field
);
let conn = Connection::open(self.connection_string.clone()).map_err(|err| Error::from)?;
let mut statement = conn.prepare(&query).map_err(|err| Error::from)?;
statement
.execute(params![path, value])
.map_err(|err| Error::from)?;
Ok(())
.map_err(|err| Error::from(err))
.and_then(|inner_result| inner_result)
}

async fn delete(&self, path: &str) -> Result<()> {
let connection_string = self.connection_string.clone();
let table = self.table.clone();
let key_field = self.key_field.clone();
let new_path = path.to_string();

task::spawn_blocking(move || {
self.blocking_delete(path);
let conn = Connection::open(connection_string).map_err(|err| {
Error::new(ErrorKind::Unexpected, "Sqlite open error").set_source(err)
})?;
let query = format!("DELETE FROM {} WHERE `{}` = $1", table, key_field);
let mut statement = conn.prepare(&query).map_err(|err| Error::from(err))?;
statement
.execute(&[new_path.as_str()])
.map_err(|err| Error::from(err))?;
Ok(())
})
.await
.unwrap();
.map_err(|err| Error::from(err))
.and_then(|inner_result| inner_result)
}
}

fn blocking_delete(&self, path: &str) -> Result<()> {
let conn = Connection::open(self.connection_string.clone()).map_err(|err| Error::from)?;
let query = format!("DELETE FROM {} WHERE `{}` = $1", self.table, self.key_field);
let mut statement = conn.prepare(&query).map_err(|err| Error::from)?;
statement.execute(&[path]).map_err(|err| Error::from)?;
Ok(())
impl From<rusqlite::Error> for Error {
fn from(value: rusqlite::Error) -> Error {
Error::new(ErrorKind::Unexpected, "unhandled error from sqlite").set_source(value)
}
}

impl From<rusqlite::Error> for Error {
fn from(err: rusqlite::Error) -> Self {
Error::new(ErrorKind::Unexpected, "Sqlite error").set_source(err)
impl From<task::JoinError> for Error {
fn from(value: task::JoinError) -> Error {
Error::new(
ErrorKind::Unexpected,
"unhandled error from sqlite when spawning task",
)
.set_source(value)
}
}

0 comments on commit f2c973a

Please sign in to comment.