From 83a76fba99c104187a26bf6a73cf2733d91de385 Mon Sep 17 00:00:00 2001 From: Manjusaka Date: Thu, 28 Sep 2023 23:57:04 +0800 Subject: [PATCH] feat(service/sqlite) Support sqlite for opendal Signed-off-by: Manjusaka --- .github/workflows/service_test_sqlite.yml | 65 +++++ Cargo.lock | 51 ++++ core/Cargo.toml | 2 + core/src/services/mod.rs | 5 + core/src/services/sqlite/backend.rs | 294 ++++++++++++++++++++++ core/src/services/sqlite/docs.md | 48 ++++ core/src/services/sqlite/mod.rs | 19 ++ core/src/types/scheme.rs | 4 + core/tests/behavior/main.rs | 2 + fixtures/sqlite/data.sql | 23 ++ 10 files changed, 513 insertions(+) create mode 100644 .github/workflows/service_test_sqlite.yml create mode 100644 core/src/services/sqlite/backend.rs create mode 100644 core/src/services/sqlite/docs.md create mode 100644 core/src/services/sqlite/mod.rs create mode 100644 fixtures/sqlite/data.sql diff --git a/.github/workflows/service_test_sqlite.yml b/.github/workflows/service_test_sqlite.yml new file mode 100644 index 00000000000..02051442cc8 --- /dev/null +++ b/.github/workflows/service_test_sqlite.yml @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Service Test Sqlite + +on: + push: + branches: + - main + pull_request: + branches: + - main + paths: + - "core/src/**" + - "core/tests/**" + - "!core/src/docs/**" + - "!core/src/services/**" + - "core/src/services/sqlite/**" + - ".github/workflows/service_test_sqlite.yml" + - "fixtures/sqlite/**" + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} + cancel-in-progress: true + +jobs: + sqlite: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - name: Setup Sqlite + shell: bash + working-directory: fixtures/sqlite + run: mkdir -p /tmp/opendal && sqlite3 /tmp/opendal/test.db < data.sql + + - name: Setup Rust toolchain + uses: ./.github/actions/setup + with: + need-nextest: true + + - name: Test + shell: bash + working-directory: core + run: cargo nextest run sqlite --features services-sqlite + env: + OPENDAL_SQLITE_TEST: on + OPENDAL_SQLITE_CONNECTION_STRING: file:///tmp/opendal/test.db + OPENDAL_SQLITE_TABLE: data + OPENDAL_SQLITE_KEY_FIELD: key + OPENDAL_SQLITE_VALUE_FIELD: data diff --git a/Cargo.lock b/Cargo.lock index 2dedfb9cb18..ba94dd0c26a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1904,6 +1904,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "1.9.0" @@ -2340,6 +2346,15 @@ version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" +[[package]] +name = "hashbrown" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +dependencies = [ + "ahash 0.7.6", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -2368,6 +2383,15 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashlink" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" +dependencies = [ + "hashbrown 0.11.2", +] + [[package]] name = "hdfs-sys" version = "0.2.0" @@ -3018,6 +3042,17 @@ dependencies = [ "libz-sys", ] +[[package]] +name = "libsqlite3-sys" +version = "0.22.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290b64917f8b0cb885d9de0f9959fe1f775d7fa12f1da2db9001c1c8ab60f89d" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "libtest-mimic" version = "0.6.1" @@ -4010,6 +4045,7 @@ dependencies = [ "reqsign", "reqwest", "rocksdb", + "rusqlite", "serde", "serde_json", "sha2", @@ -5656,6 +5692,21 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rusqlite" +version = "0.25.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c4b1eaf239b47034fb450ee9cdedd7d0226571689d8823030c4b6c2cb407152" +dependencies = [ + "bitflags 1.3.2", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "memchr", + "smallvec", +] + [[package]] name = "rust-ini" version = "0.19.0" diff --git a/core/Cargo.toml b/core/Cargo.toml index 0686e24935e..aa01149677a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -177,6 +177,7 @@ services-wasabi = [ services-webdav = [] services-webhdfs = [] services-mysql = ["dep:mysql_async"] +services-sqlite = ["dep:rusqlite"] [lib] bench = false @@ -276,6 +277,7 @@ tracing = { version = "0.1", optional = true } uuid = { version = "1", features = ["serde", "v4"] } mysql_async = { version = "0.32.2", optional = true } bb8-postgres = { version = "0.8.1", optional = true } +rusqlite = { version = "0.25.0", optional = true, features = ["bundled"] } [dev-dependencies] criterion = { version = "0.4", features = ["async", "async_tokio"] } diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index d50c2b85354..4e00c19fcff 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -218,3 +218,8 @@ pub use self::atomicserver::Atomicserver; mod mysql; #[cfg(feature = "services-mysql")] pub use self::mysql::Mysql; + +#[cfg(feature = "services-sqlite")] +mod sqlite; +#[cfg(feature = "services-sqlite")] +pub use self::sqlite::Sqlite; diff --git a/core/src/services/sqlite/backend.rs b/core/src/services/sqlite/backend.rs new file mode 100644 index 00000000000..9e1ed937c2c --- /dev/null +++ b/core/src/services/sqlite/backend.rs @@ -0,0 +1,294 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; + +use async_trait::async_trait; +use rusqlite::{params, Connection}; +use tokio::task; + +use crate::raw::adapters::kv; +use crate::raw::*; +use crate::*; + +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct SqliteBuilder { + connection_string: Option, + + table: Option, + key_field: Option, + value_field: Option, + root: Option, +} + +impl Debug for SqliteBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("SqliteBuilder"); + ds.field("connection_string", &self.connection_string); + ds.field("table", &self.table); + ds.field("key_field", &self.key_field); + ds.field("value_field", &self.value_field); + ds.field("root", &self.root); + ds.finish() + } +} + +impl SqliteBuilder { + /// Set the connection_string of the sqlite service. + /// + /// This connection string is used to connect to the sqlite service. There are url based formats: + /// + /// ## Url + /// + /// This format resembles the url format of the sqlite client. The format is: file://[path]?flag + /// + /// - `file://data.db` + /// + /// For more information, please refer to [Opening A New Database Connection](http://www.sqlite.org/c3ref/open.html) + pub fn connection_string(&mut self, v: &str) -> &mut Self { + if !v.is_empty() { + self.connection_string = Some(v.to_string()); + } + self + } + + /// set the working directory, all operations will be performed under it. + /// + /// default: "/" + pub fn root(&mut self, root: &str) -> &mut Self { + if !root.is_empty() { + self.root = Some(root.to_owned()); + } + self + } + + /// Set the table name of the sqlite service to read/write. + pub fn table(&mut self, table: &str) -> &mut Self { + if !table.is_empty() { + self.table = Some(table.to_string()); + } + self + } + + /// Set the key field name of the sqlite service to read/write. + /// + /// Default to `key` if not specified. + pub fn key_field(&mut self, key_field: &str) -> &mut Self { + if !key_field.is_empty() { + self.key_field = Some(key_field.to_string()); + } + self + } + + /// Set the value field name of the sqlite service to read/write. + /// + /// Default to `value` if not specified. + pub fn value_field(&mut self, value_field: &str) -> &mut Self { + if !value_field.is_empty() { + self.value_field = Some(value_field.to_string()); + } + self + } +} + +impl Builder for SqliteBuilder { + const SCHEME: Scheme = Scheme::Sqlite; + type Accessor = SqliteBackend; + + fn from_map(map: HashMap) -> Self { + let mut builder = SqliteBuilder::default(); + map.get("connection_string") + .map(|v| builder.connection_string(v)); + map.get("table").map(|v| builder.table(v)); + map.get("key_field").map(|v| builder.key_field(v)); + map.get("value_field").map(|v| builder.value_field(v)); + map.get("root").map(|v| builder.root(v)); + builder + } + + fn build(&mut self) -> Result { + let connection_string = match self.connection_string.clone() { + Some(v) => v, + None => { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "connection_string is required but not set", + ) + .with_context("service", Scheme::Sqlite)) + } + }; + let table = match self.table.clone() { + Some(v) => v, + None => { + return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty") + .with_context("service", Scheme::Postgresql)) + } + }; + let key_field = match self.key_field.clone() { + Some(v) => v, + None => "key".to_string(), + }; + let value_field = match self.value_field.clone() { + Some(v) => v, + None => "value".to_string(), + }; + let root = normalize_root( + self.root + .clone() + .unwrap_or_else(|| "/".to_string()) + .as_str(), + ); + Ok(SqliteBackend::new(Adapter { + connection_string, + table, + key_field, + value_field, + }) + .with_root(&root)) + } +} + +pub type SqliteBackend = kv::Backend; + +#[derive(Clone)] +pub struct Adapter { + connection_string: String, + + table: String, + key_field: String, + value_field: String, +} + +impl Debug for Adapter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("SqliteAdapter"); + ds.field("connection_string", &self.connection_string); + ds.field("table", &self.table); + ds.field("key_field", &self.key_field); + ds.field("value_field", &self.value_field); + ds.finish() + } +} + +#[async_trait] +impl kv::Adapter for Adapter { + fn metadata(&self) -> kv::Metadata { + kv::Metadata::new( + Scheme::Sqlite, + &self.table, + Capability { + read: true, + write: true, + create_dir: true, + delete: true, + ..Default::default() + }, + ) + } + + async fn get(&self, path: &str) -> Result>> { + let connection_string = self.connection_string.clone(); + let value_field = self.value_field.clone(); + let table = self.table.clone(); + let key_field = self.key_field.clone(); + let cloned_path = path.to_string(); + + task::spawn_blocking(move || { + let query = format!( + "SELECT {} FROM {} WHERE `{}` = $1 LIMIT 1", + value_field, table, key_field + ); + let conn = Connection::open(connection_string).map_err(Error::from)?; + let mut statement = conn.prepare(&query).map_err(Error::from)?; + let result = statement.query_row([cloned_path.as_str()], |row| row.get(0)); + match result { + Ok(v) => Ok(Some(v)), + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), + Err(err) => Err(Error::from(err)), + } + }) + .await + .map_err(Error::from) + .and_then(|inner_result| inner_result) + } + + async fn set(&self, path: &str, value: &[u8]) -> Result<()> { + let connection_string = self.connection_string.clone(); + let table = self.table.clone(); + let key_field = self.key_field.clone(); + let value_field = self.value_field.clone(); + let cloned_path = path.to_string(); + let cloned_value = value.to_vec(); + + task::spawn_blocking(move || { + let query = format!( + "INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)", + table, key_field, value_field + ); + let conn = Connection::open(connection_string).map_err(Error::from)?; + let mut statement = conn.prepare(&query).map_err(Error::from)?; + statement + .execute(params![cloned_path, cloned_value]) + .map_err(Error::from)?; + Ok(()) + }) + .await + .map_err(Error::from) + .and_then(|inner_result| inner_result) + } + + async fn delete(&self, path: &str) -> Result<()> { + let connection_string = self.connection_string.clone(); + let table = self.table.clone(); + let key_field = self.key_field.clone(); + let cloned_path = path.to_string(); + + task::spawn_blocking(move || { + let conn = Connection::open(connection_string).map_err(|err| { + Error::new(ErrorKind::Unexpected, "Sqlite open error").set_source(err) + })?; + let query = format!("DELETE FROM {} WHERE `{}` = $1", table, key_field); + let mut statement = conn.prepare(&query).map_err(Error::from)?; + statement + .execute([cloned_path.as_str()]) + .map_err(Error::from)?; + Ok(()) + }) + .await + .map_err(Error::from) + .and_then(|inner_result| inner_result) + } +} + +impl From for Error { + fn from(value: rusqlite::Error) -> Error { + Error::new(ErrorKind::Unexpected, "unhandled error from sqlite").set_source(value) + } +} + +impl From for Error { + fn from(value: task::JoinError) -> Error { + Error::new( + ErrorKind::Unexpected, + "unhandled error from sqlite when spawning task", + ) + .set_source(value) + } +} diff --git a/core/src/services/sqlite/docs.md b/core/src/services/sqlite/docs.md new file mode 100644 index 00000000000..4032ba29028 --- /dev/null +++ b/core/src/services/sqlite/docs.md @@ -0,0 +1,48 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [ ] copy +- [ ] rename +- [ ] ~~list~~ +- [ ] scan +- [ ] ~~presign~~ +- [ ] blocking + +## Configuration + +- `root`: Set the working directory of `OpenDAL` +- `connection_string`: Set the connection string of postgres server +- `table`: Set the table of sqlite +- `key_field`: Set the key field of sqlite +- `value_field`: Set the value field of sqlite + +## Example + +### Via Builder + +```rust +use anyhow::Result; +use opendal::services::Sqlite; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + let mut builder = Sqlite::default(); + builder.root("/"); + builder.connection_string("file//abc.db"); + builder.table("your_table"); + // key field type in the table should be compatible with Rust's &str like text + builder.key_field("key"); + // value field type in the table should be compatible with Rust's Vec like bytea + builder.value_field("value"); + + let op = Operator::new(builder)?.finish(); + Ok(()) +} +``` diff --git a/core/src/services/sqlite/mod.rs b/core/src/services/sqlite/mod.rs new file mode 100644 index 00000000000..31da06964db --- /dev/null +++ b/core/src/services/sqlite/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +pub use backend::SqliteBuilder as Sqlite; diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index e2af80f6433..84480bf68b7 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -90,6 +90,8 @@ pub enum Scheme { Postgresql, /// [mysql][crate::services::Mysql]: Mysql services Mysql, + /// [sqlite][crate::services::Sqlite]: Sqlite services + Sqlite, /// [rocksdb][crate::services::Rocksdb]: RocksDB services Rocksdb, /// [s3][crate::services::S3]: AWS S3 alike services. @@ -170,6 +172,7 @@ impl FromStr for Scheme { "memcached" => Ok(Scheme::Memcached), "memory" => Ok(Scheme::Memory), "mysql" => Ok(Scheme::Mysql), + "sqlite" => Ok(Scheme::Sqlite), "mini_moka" => Ok(Scheme::MiniMoka), "moka" => Ok(Scheme::Moka), "obs" => Ok(Scheme::Obs), @@ -237,6 +240,7 @@ impl From for &'static str { Scheme::Webhdfs => "webhdfs", Scheme::Redb => "redb", Scheme::Tikv => "tikv", + Scheme::Sqlite => "sqlite", Scheme::Custom(v) => v, } } diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs index d9fc670437a..695eefe2ce2 100644 --- a/core/tests/behavior/main.rs +++ b/core/tests/behavior/main.rs @@ -180,6 +180,8 @@ fn main() -> anyhow::Result<()> { tests.extend(behavior_test::()); #[cfg(feature = "services-mysql")] tests.extend(behavior_test::()); + #[cfg(feature = "services-sqlite")] + tests.extend(behavior_test::()); // Don't init logging while building operator which may break cargo // nextest output diff --git a/fixtures/sqlite/data.sql b/fixtures/sqlite/data.sql new file mode 100644 index 00000000000..c156a10891a --- /dev/null +++ b/fixtures/sqlite/data.sql @@ -0,0 +1,23 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. +-- + +CREATE TABLE IF NOT EXISTS `data` ( + `key` TEXT PRIMARY KEY NOT NULL CHECK(length(key) <= 255), + `data` BLOB +);