From b856968c26881a7173b8791bee2173f1e22a9508 Mon Sep 17 00:00:00 2001 From: hoslo Date: Fri, 15 Dec 2023 14:39:19 +0800 Subject: [PATCH] feat(services): add seafile support --- .env.example | 6 + .github/services/seafile/seafile/action.yml | 45 +++ .github/workflows/ci.yml | 1 + bindings/java/Cargo.toml | 2 + bindings/nodejs/Cargo.toml | 2 + bindings/python/Cargo.toml | 2 + core/Cargo.toml | 1 + core/src/raw/chrono_util.rs | 9 + core/src/raw/http_util/multipart.rs | 40 +- core/src/services/mod.rs | 7 + core/src/services/seafile/backend.rs | 341 ++++++++++++++++ core/src/services/seafile/core.rs | 417 ++++++++++++++++++++ core/src/services/seafile/docs.md | 56 +++ core/src/services/seafile/error.rs | 94 +++++ core/src/services/seafile/lister.rs | 119 ++++++ core/src/services/seafile/mod.rs | 25 ++ core/src/services/seafile/writer.rs | 89 +++++ core/src/types/operator/builder.rs | 2 + core/src/types/scheme.rs | 6 + fixtures/seafile/docker-compose-seafile.yml | 62 +++ 20 files changed, 1312 insertions(+), 14 deletions(-) create mode 100644 .github/services/seafile/seafile/action.yml create mode 100644 core/src/services/seafile/backend.rs create mode 100644 core/src/services/seafile/core.rs create mode 100644 core/src/services/seafile/docs.md create mode 100644 core/src/services/seafile/error.rs create mode 100644 core/src/services/seafile/lister.rs create mode 100644 core/src/services/seafile/mod.rs create mode 100644 core/src/services/seafile/writer.rs create mode 100644 fixtures/seafile/docker-compose-seafile.yml diff --git a/.env.example b/.env.example index 6277e05ae61..f95a10298db 100644 --- a/.env.example +++ b/.env.example @@ -180,3 +180,9 @@ OPENDAL_HUGGINGFACE_REPO_TYPE=dataset OPENDAL_HUGGINGFACE_REPO_ID=opendal/huggingface-testdata OPENDAL_HUGGINGFACE_REVISION=main OPENDAL_HUGGINGFACE_ROOT=/testdata/ +# seafile +OPENDAL_SEAFILE_ROOT=/path/to/dir +OPENDAL_SEAFILE_ENDPOINT= +OPENDAL_SEAFILE_USERNAME= +OPENDAL_SEAFILE_PASSWORD= +OPENDAL_SEAFILE_REPO_NAME= \ No newline at end of file diff --git a/.github/services/seafile/seafile/action.yml b/.github/services/seafile/seafile/action.yml new file mode 100644 index 00000000000..c504bc5b19b --- /dev/null +++ b/.github/services/seafile/seafile/action.yml @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: seafile +description: "Behavior test for Seafile" + +runs: + using: "composite" + steps: + - name: Setup Seafile service + shell: bash + working-directory: fixtures/seafile + run: | + docker compose -f docker-compose-seafile.yml up -d --wait + + - name: Create test token and setup test library + shell: bash + run: | + token=$(curl --location --request POST -d "username=me@example.com&password=asecret" http://127.0.0.1:80/api2/auth-token/ | awk -F '"' '/token/{print $4}') + curl --location --request POST -d 'name=test' -H "Authorization: Token $token" http://127.0.0.1:80/api2/repos/ + + - name: Set environment variables + shell: bash + run: | + cat << EOF >> $GITHUB_ENV + OPENDAL_SEAFILE_ENDPOINT=http://127.0.0.1:80 + OPENDAL_SEAFILE_USERNAME=me@example.com + OPENDAL_SEAFILE_PASSWORD=asecret + OPENDAL_SEAFILE_REPO_NAME=test + OPENDAL_SEAFILE_ROOT=/ + EOF diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 73b90e9814d..3d6d2c24cbe 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -279,6 +279,7 @@ jobs: # TODO: we need to find ways to using pre-install rocksdb library # services-rocksdb services-s3 + services-seafile # TODO: sftp is known to not work on windows, waiting for https://github.com/apache/incubator-opendal/issues/2963 # services-sftp services-sled diff --git a/bindings/java/Cargo.toml b/bindings/java/Cargo.toml index b5e98623511..de42caf066c 100644 --- a/bindings/java/Cargo.toml +++ b/bindings/java/Cargo.toml @@ -90,6 +90,7 @@ services-all = [ "services-swift", "services-alluxio", "services-b2", + "services-seafile", ] # Default services provided by opendal. @@ -137,6 +138,7 @@ services-redis = ["opendal/services-redis"] services-rocksdb = ["opendal/services-rocksdb"] services-sftp = ["opendal/services-sftp"] services-sled = ["opendal/services-sled"] +services-seafile = ["opendal/services-seafile"] services-sqlite = ["opendal/services-sqlite"] services-supabase = ["opendal/services-supabase"] services-swift = ["opendal/services-swift"] diff --git a/bindings/nodejs/Cargo.toml b/bindings/nodejs/Cargo.toml index 4a1a1f6380c..a812d69ec2f 100644 --- a/bindings/nodejs/Cargo.toml +++ b/bindings/nodejs/Cargo.toml @@ -85,6 +85,7 @@ services-all = [ "services-libsql", "services-alluxio", "services-b2", + "services-seafile", ] # Default services provided by opendal. @@ -133,6 +134,7 @@ services-rocksdb = ["opendal/services-rocksdb"] services-sftp = ["opendal/services-sftp"] services-sled = ["opendal/services-sled"] services-sqlite = ["opendal/services-sqlite"] +services-seafile = ["opendal/services-seafile"] services-supabase = ["opendal/services-supabase"] services-swift = ["opendal/services-swift"] services-tikv = ["opendal/services-tikv"] diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index beea4e11a38..5de87a0814c 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -84,6 +84,7 @@ services-all = [ "services-libsql", "services-alluxio", "services-b2", + "services-seafile", ] # Default services provided by opendal. @@ -132,6 +133,7 @@ services-rocksdb = ["opendal/services-rocksdb"] services-sftp = ["opendal/services-sftp"] services-sled = ["opendal/services-sled"] services-sqlite = ["opendal/services-sqlite"] +services-seafile = ["opendal/services-seafile"] services-supabase = ["opendal/services-supabase"] services-swift = ["opendal/services-swift"] services-tikv = ["opendal/services-tikv"] diff --git a/core/Cargo.toml b/core/Cargo.toml index 301dc99e7b4..dac151d427e 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -126,6 +126,7 @@ services-azdls = [ ] services-azfile = [] services-b2 = [] +services-seafile = [] services-cacache = ["dep:cacache"] services-cloudflare-kv = [] services-cos = [ diff --git a/core/src/raw/chrono_util.rs b/core/src/raw/chrono_util.rs index 2039ee19e2e..b15a3f9356c 100644 --- a/core/src/raw/chrono_util.rs +++ b/core/src/raw/chrono_util.rs @@ -53,3 +53,12 @@ pub fn parse_datetime_from_from_timestamp_millis(s: i64) -> Result Ok(st.into()) } + +/// parse datetime from given timestamp +pub fn parse_datetime_from_from_timestamp(s: i64) -> Result> { + let st = UNIX_EPOCH + .checked_add(Duration::from_secs(s as u64)) + .ok_or_else(|| Error::new(ErrorKind::Unexpected, "input timestamp overflow"))?; + + Ok(st.into()) +} diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index a6c6f765278..55c502b9ab4 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -291,7 +291,19 @@ impl Part for FormDataPart { // Building pre-content. for (k, v) in self.headers.iter() { - bs.extend_from_slice(k.as_str().as_bytes()); + // Trick! + // + // Seafile could not recognize header names like `content-disposition` + // and requires to use `Content-Disposition`. So we hardcode the part + // headers name here. + match k.as_str() { + "content-disposition" => { + bs.extend_from_slice("Content-Disposition".as_bytes()); + } + _ => { + bs.extend_from_slice(k.as_str().as_bytes()); + } + } bs.extend_from_slice(b": "); bs.extend_from_slice(v.as_bytes()); bs.extend_from_slice(b"\r\n"); @@ -728,11 +740,11 @@ mod tests { assert_eq!(size, bs.len() as u64); let expected = "--lalala\r\n\ - content-disposition: form-data; name=\"foo\"\r\n\ + Content-Disposition: form-data; name=\"foo\"\r\n\ \r\n\ bar\r\n\ --lalala\r\n\ - content-disposition: form-data; name=\"hello\"\r\n\ + Content-Disposition: form-data; name=\"hello\"\r\n\ \r\n\ world\r\n\ --lalala--\r\n"; @@ -764,48 +776,48 @@ mod tests { assert_eq!(size, bs.len() as u64); let expected = r#"--9431149156168 -content-disposition: form-data; name="key" +Content-Disposition: form-data; name="key" user/eric/MyPicture.jpg --9431149156168 -content-disposition: form-data; name="acl" +Content-Disposition: form-data; name="acl" public-read --9431149156168 -content-disposition: form-data; name="success_action_redirect" +Content-Disposition: form-data; name="success_action_redirect" https://awsexamplebucket1.s3.us-west-1.amazonaws.com/successful_upload.html --9431149156168 -content-disposition: form-data; name="content-type" +Content-Disposition: form-data; name="content-type" image/jpeg --9431149156168 -content-disposition: form-data; name="x-amz-meta-uuid" +Content-Disposition: form-data; name="x-amz-meta-uuid" 14365123651274 --9431149156168 -content-disposition: form-data; name="x-amz-meta-tag" +Content-Disposition: form-data; name="x-amz-meta-tag" Some,Tag,For,Picture --9431149156168 -content-disposition: form-data; name="AWSAccessKeyId" +Content-Disposition: form-data; name="AWSAccessKeyId" AKIAIOSFODNN7EXAMPLE --9431149156168 -content-disposition: form-data; name="Policy" +Content-Disposition: form-data; name="Policy" eyAiZXhwaXJhdGlvbiI6ICIyMDA3LTEyLTAxVDEyOjAwOjAwLjAwMFoiLAogICJjb25kaXRpb25zIjogWwogICAgeyJidWNrZXQiOiAiam9obnNtaXRoIn0sCiAgICBbInN0YXJ0cy13aXRoIiwgIiRrZXkiLCAidXNlci9lcmljLyJdLAogICAgeyJhY2wiOiAicHVibGljLXJlYWQifSwKICAgIHsic3VjY2Vzc19hY3Rpb25fcmVkaXJlY3QiOiAiaHR0cDovL2pvaG5zbWl0aC5zMy5hbWF6b25hd3MuY29tL3N1Y2Nlc3NmdWxfdXBsb2FkLmh0bWwifSwKICAgIFsic3RhcnRzLXdpdGgiLCAiJENvbnRlbnQtVHlwZSIsICJpbWFnZS8iXSwKICAgIHsieC1hbXotbWV0YS11dWlkIjogIjE0MzY1MTIzNjUxMjc0In0sCiAgICBbInN0YXJ0cy13aXRoIiwgIiR4LWFtei1tZXRhLXRhZyIsICIiXQogIF0KfQo= --9431149156168 -content-disposition: form-data; name="Signature" +Content-Disposition: form-data; name="Signature" 0RavWzkygo6QX9caELEqKi9kDbU= --9431149156168 -content-disposition: form-data; name="file" +Content-Disposition: form-data; name="file" content-type: image/jpeg ...file content... --9431149156168 -content-disposition: form-data; name="submit" +Content-Disposition: form-data; name="submit" Upload to Amazon S3 --9431149156168-- diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 0ae125537a4..086db17cd30 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -305,3 +305,10 @@ mod b2; pub use b2::B2Config; #[cfg(feature = "services-b2")] pub use b2::B2; + +#[cfg(feature = "services-seafile")] +mod seafile; +#[cfg(feature = "services-seafile")] +pub use seafile::Seafile; +#[cfg(feature = "services-seafile")] +pub use seafile::SeafileConfig; diff --git a/core/src/services/seafile/backend.rs b/core/src/services/seafile/backend.rs new file mode 100644 index 00000000000..63f2f71220a --- /dev/null +++ b/core/src/services/seafile/backend.rs @@ -0,0 +1,341 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use http::StatusCode; +use log::debug; +use serde::Deserialize; +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; +use tokio::sync::RwLock; + +use super::core::parse_dir_detail; +use super::core::parse_file_detail; +use super::core::SeafileCore; +use super::error::parse_error; +use super::lister::SeafileLister; +use super::writer::SeafileWriter; +use super::writer::SeafileWriters; +use crate::raw::*; +use crate::services::seafile::core::SeafileSigner; +use crate::*; + +/// Config for backblaze seafile services support. +#[derive(Default, Deserialize)] +#[serde(default)] +#[non_exhaustive] +pub struct SeafileConfig { + /// root of this backend. + /// + /// All operations will happen under this root. + pub root: Option, + /// endpoint address of this backend. + pub endpoint: Option, + /// username of this backend. + pub username: Option, + /// password of this backend. + pub password: Option, + /// repo_name of this backend. + /// + /// required. + pub repo_name: String, +} + +impl Debug for SeafileConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut d = f.debug_struct("SeafileConfig"); + + d.field("root", &self.root) + .field("endpoint", &self.endpoint) + .field("username", &self.username) + .field("repo_name", &self.repo_name); + + d.finish_non_exhaustive() + } +} + +/// [seafile](https://www.seafile.com) services support. +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct SeafileBuilder { + config: SeafileConfig, + + http_client: Option, +} + +impl Debug for SeafileBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut d = f.debug_struct("SeafileBuilder"); + + d.field("config", &self.config); + d.finish_non_exhaustive() + } +} + +impl SeafileBuilder { + /// Set root of this backend. + /// + /// All operations will happen under this root. + pub fn root(&mut self, root: &str) -> &mut Self { + self.config.root = if root.is_empty() { + None + } else { + Some(root.to_string()) + }; + + self + } + + /// endpoint of this backend. + /// + /// It is required. e.g. `http://127.0.0.1:80` + pub fn endpoint(&mut self, endpoint: &str) -> &mut Self { + self.config.endpoint = if endpoint.is_empty() { + None + } else { + Some(endpoint.to_string()) + }; + + self + } + + /// username of this backend. + /// + /// It is required. e.g. `me@example.com` + pub fn username(&mut self, username: &str) -> &mut Self { + self.config.username = if username.is_empty() { + None + } else { + Some(username.to_string()) + }; + + self + } + + /// password of this backend. + /// + /// It is required. e.g. `asecret` + pub fn password(&mut self, password: &str) -> &mut Self { + self.config.password = if password.is_empty() { + None + } else { + Some(password.to_string()) + }; + + self + } + + /// Set repo name of this backend. + /// + /// It is required. e.g. `myrepo` + pub fn repo_name(&mut self, repo_name: &str) -> &mut Self { + self.config.repo_name = repo_name.to_string(); + + self + } + + /// Specify the http client that used by this service. + /// + /// # Notes + /// + /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed + /// during minor updates. + pub fn http_client(&mut self, client: HttpClient) -> &mut Self { + self.http_client = Some(client); + self + } +} + +impl Builder for SeafileBuilder { + const SCHEME: Scheme = Scheme::Seafile; + type Accessor = SeafileBackend; + + /// Converts a HashMap into an SeafileBuilder instance. + /// + /// # Arguments + /// + /// * `map` - A HashMap containing the configuration values. + /// + /// # Returns + /// + /// Returns an instance of SeafileBuilder. + fn from_map(map: HashMap) -> Self { + // Deserialize the configuration from the HashMap. + let config = SeafileConfig::deserialize(ConfigDeserializer::new(map)) + .expect("config deserialize must succeed"); + + // Create an SeafileBuilder instance with the deserialized config. + SeafileBuilder { + config, + http_client: None, + } + } + + /// Builds the backend and returns the result of SeafileBackend. + fn build(&mut self) -> Result { + debug!("backend build started: {:?}", &self); + + let root = normalize_root(&self.config.root.clone().unwrap_or_default()); + debug!("backend use root {}", &root); + + // Handle bucket. + if self.config.repo_name.is_empty() { + return Err(Error::new(ErrorKind::ConfigInvalid, "repo_name is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Seafile)); + } + + debug!("backend use repo_name {}", &self.config.repo_name); + + let endpoint = match &self.config.endpoint { + Some(endpoint) => Ok(endpoint.clone()), + None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Seafile)), + }?; + + let username = match &self.config.username { + Some(username) => Ok(username.clone()), + None => Err(Error::new(ErrorKind::ConfigInvalid, "username is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Seafile)), + }?; + + let password = match &self.config.password { + Some(password) => Ok(password.clone()), + None => Err(Error::new(ErrorKind::ConfigInvalid, "password is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Seafile)), + }?; + + let client = if let Some(client) = self.http_client.take() { + client + } else { + HttpClient::new().map_err(|err| { + err.with_operation("Builder::build") + .with_context("service", Scheme::Seafile) + })? + }; + + Ok(SeafileBackend { + core: Arc::new(SeafileCore { + root, + endpoint, + username, + password, + repo_name: self.config.repo_name.clone(), + signer: Arc::new(RwLock::new(SeafileSigner::default())), + client, + }), + }) + } +} + +/// Backend for seafile services. +#[derive(Debug, Clone)] +pub struct SeafileBackend { + core: Arc, +} + +#[async_trait] +impl Accessor for SeafileBackend { + type Reader = IncomingAsyncBody; + + type BlockingReader = (); + + type Writer = SeafileWriters; + + type BlockingWriter = (); + + type Lister = oio::PageLister; + + type BlockingLister = (); + + fn info(&self) -> AccessorInfo { + let mut am = AccessorInfo::default(); + am.set_scheme(Scheme::Seafile) + .set_root(&self.core.root) + .set_native_capability(Capability { + stat: true, + + read: true, + read_can_next: true, + + write: true, + write_can_empty: true, + + delete: true, + + list: true, + + ..Default::default() + }); + + am + } + + async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> { + let resp = self.core.download_file(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let size = parse_content_length(resp.headers())?; + Ok((RpRead::new().with_size(size), resp.into_body())) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn stat(&self, path: &str, _args: OpStat) -> Result { + if path == "/" { + return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); + } + + let metadata = if path.ends_with('/') { + let dir_detail = self.core.dir_detail(path).await?; + parse_dir_detail(dir_detail) + } else { + let file_detail = self.core.file_detail(path).await?; + + parse_file_detail(file_detail) + }; + + metadata.map(RpStat::new) + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let w = SeafileWriter::new(self.core.clone(), args, path.to_string()); + let w = oio::OneShotWriter::new(w); + + Ok((RpWrite::default(), w)) + } + + async fn delete(&self, path: &str, _args: OpDelete) -> Result { + let _ = self.core.delete(path).await?; + + Ok(RpDelete::default()) + } + + async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> { + let l = SeafileLister::new(self.core.clone(), path); + Ok((RpList::default(), oio::PageLister::new(l))) + } +} diff --git a/core/src/services/seafile/core.rs b/core/src/services/seafile/core.rs new file mode 100644 index 00000000000..a7cb8af7f5b --- /dev/null +++ b/core/src/services/seafile/core.rs @@ -0,0 +1,417 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Bytes; +use http::header; +use http::Request; +use http::Response; +use http::StatusCode; +use serde::Deserialize; +use std::sync::Arc; +use tokio::sync::RwLock; + +use std::fmt::Debug; +use std::fmt::Formatter; + +use crate::raw::*; +use crate::*; + +use super::error::parse_error; + +/// Core of [seafile](https://www.seafile.com) services support. +#[derive(Clone)] +pub struct SeafileCore { + /// The root of this core. + pub root: String, + /// The endpoint of this backend. + pub endpoint: String, + /// The username of this backend. + pub username: String, + /// The password id of this backend. + pub password: String, + /// The repo name of this backend. + pub repo_name: String, + + /// signer of this backend. + pub signer: Arc>, + + pub client: HttpClient, +} + +impl Debug for SeafileCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Backend") + .field("root", &self.root) + .field("endpoint", &self.endpoint) + .field("username", &self.username) + .field("repo_name", &self.repo_name) + .finish_non_exhaustive() + } +} + +impl SeafileCore { + #[inline] + pub async fn send(&self, req: Request) -> Result> { + self.client.send(req).await + } + + /// get auth info + pub async fn get_auth_info(&self) -> Result { + { + let signer = self.signer.read().await; + + if !signer.auth_info.token.is_empty() { + let auth_info = signer.auth_info.clone(); + return Ok(auth_info.clone()); + } + } + + { + let mut signer = self.signer.write().await; + let body = format!( + "username={}&password={}", + percent_encode_path(&self.username), + percent_encode_path(&self.password) + ); + let req = Request::post(format!("{}/api2/auth-token/", self.endpoint)) + .header(header::CONTENT_TYPE, "application/x-www-form-urlencoded") + .body(AsyncBody::Bytes(Bytes::from(body))) + .map_err(new_request_build_error)?; + + let resp = self.client.send(req).await?; + let status = resp.status(); + + match status { + StatusCode::OK => { + let resp_body = &resp.into_body().bytes().await?; + let auth_response = serde_json::from_slice::(resp_body) + .map_err(new_json_deserialize_error)?; + signer.auth_info = AuthInfo { + token: auth_response.token, + repo_id: "".to_string(), + }; + } + _ => { + return Err(parse_error(resp).await?); + } + } + + let url = format!("{}/api2/repos", self.endpoint); + + let req = Request::get(url) + .header( + header::AUTHORIZATION, + format!("Token {}", signer.auth_info.token), + ) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.client.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let resp_body = &resp.into_body().bytes().await?; + let list_library_response = + serde_json::from_slice::>(resp_body) + .map_err(new_json_deserialize_error)?; + + for library in list_library_response { + if library.name == self.repo_name { + signer.auth_info.repo_id = library.id; + break; + } + } + + // repo not found + if signer.auth_info.repo_id.is_empty() { + return Err(Error::new( + ErrorKind::NotFound, + &format!("repo {} not found", self.repo_name), + )); + } + } + _ => { + return Err(parse_error(resp).await?); + } + } + Ok(signer.auth_info.clone()) + } + } +} + +impl SeafileCore { + /// get upload url + pub async fn get_upload_url(&self) -> Result { + let auth_info = self.get_auth_info().await?; + + let req = Request::get(format!( + "{}/api2/repos/{}/upload-link/", + self.endpoint, auth_info.repo_id + )); + + let req = req + .header(header::AUTHORIZATION, format!("Token {}", auth_info.token)) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.send(req).await?; + let status = resp.status(); + + match status { + StatusCode::OK => { + let resp_body = &resp.into_body().bytes().await?; + let upload_url = serde_json::from_slice::(resp_body) + .map_err(new_json_deserialize_error)?; + Ok(upload_url) + } + _ => Err(parse_error(resp).await?), + } + } + + /// get download + pub async fn get_download_url(&self, path: &str) -> Result { + let path = build_abs_path(&self.root, path); + let path = percent_encode_path(&path); + + let auth_info = self.get_auth_info().await?; + + let req = Request::get(format!( + "{}/api2/repos/{}/file/?p={}", + self.endpoint, auth_info.repo_id, path + )); + + let req = req + .header(header::AUTHORIZATION, format!("Token {}", auth_info.token)) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.send(req).await?; + let status = resp.status(); + + match status { + StatusCode::OK => { + let resp_body = &resp.into_body().bytes().await?; + let download_url = serde_json::from_slice::(resp_body) + .map_err(new_json_deserialize_error)?; + + Ok(download_url) + } + _ => Err(parse_error(resp).await?), + } + } + + /// download file + pub async fn download_file(&self, path: &str) -> Result> { + let download_url = self.get_download_url(path).await?; + + let req = Request::get(download_url); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.send(req).await?; + let status = resp.status(); + + match status { + StatusCode::OK => Ok(resp), + _ => Err(parse_error(resp).await?), + } + } + + /// file detail + pub async fn file_detail(&self, path: &str) -> Result { + let path = build_abs_path(&self.root, path); + let path = percent_encode_path(&path); + + let auth_info = self.get_auth_info().await?; + + let req = Request::get(format!( + "{}/api2/repos/{}/file/detail/?p={}", + self.endpoint, auth_info.repo_id, path + )); + + let req = req + .header(header::AUTHORIZATION, format!("Token {}", auth_info.token)) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.send(req).await?; + let status = resp.status(); + + match status { + StatusCode::OK => { + let resp_body = &resp.into_body().bytes().await?; + let file_detail = serde_json::from_slice::(resp_body) + .map_err(new_json_deserialize_error)?; + Ok(file_detail) + } + _ => Err(parse_error(resp).await?), + } + } + + /// dir detail + pub async fn dir_detail(&self, path: &str) -> Result { + let path = build_abs_path(&self.root, path); + let path = percent_encode_path(&path); + + let auth_info = self.get_auth_info().await?; + + let req = Request::get(format!( + "{}/api/v2.1/repos/{}/dir/detail/?path={}", + self.endpoint, auth_info.repo_id, path + )); + + let req = req + .header(header::AUTHORIZATION, format!("Token {}", auth_info.token)) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.send(req).await?; + let status = resp.status(); + + match status { + StatusCode::OK => { + let resp_body = &resp.into_body().bytes().await?; + let dir_detail = serde_json::from_slice::(resp_body) + .map_err(new_json_deserialize_error)?; + Ok(dir_detail) + } + _ => Err(parse_error(resp).await?), + } + } + + /// create dir + pub async fn create_dir(&self, path: &str) -> Result<()> { + let path = build_abs_path(&self.root, path); + let path = format!("/{}", &path[..path.len() - 1]); + let path = percent_encode_path(&path); + + let auth_info = self.get_auth_info().await?; + + let req = Request::post(format!( + "{}/api2/repos/{}/dir/?p={}", + self.endpoint, auth_info.repo_id, path, + )); + + let body = "operation=mkdir".to_string(); + + let req = req + .header(header::AUTHORIZATION, format!("Token {}", auth_info.token)) + .header(header::CONTENT_TYPE, "application/x-www-form-urlencoded") + .body(AsyncBody::Bytes(Bytes::from(body))) + .map_err(new_request_build_error)?; + + let resp = self.send(req).await?; + let status = resp.status(); + + match status { + StatusCode::CREATED => Ok(()), + _ => Err(parse_error(resp).await?), + } + } + + /// delete file or dir + pub async fn delete(&self, path: &str) -> Result<()> { + let path = build_abs_path(&self.root, path); + let path = percent_encode_path(&path); + + let auth_info = self.get_auth_info().await?; + + let url = if path.ends_with('/') { + format!( + "{}/api2/repos/{}/dir/?p={}", + self.endpoint, auth_info.repo_id, path + ) + } else { + format!( + "{}/api2/repos/{}/file/?p={}", + self.endpoint, auth_info.repo_id, path + ) + }; + + let req = Request::delete(url); + + let req = req + .header(header::AUTHORIZATION, format!("Token {}", auth_info.token)) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(()), + _ => Err(parse_error(resp).await?), + } + } +} + +#[derive(Deserialize)] +pub struct AuthTokenResponse { + pub token: String, +} + +#[derive(Deserialize)] +pub struct FileDetail { + pub last_modified: String, + pub size: u64, +} + +#[derive(Debug, Deserialize)] +pub struct DirDetail { + mtime: String, +} + +pub fn parse_dir_detail(dir_detail: DirDetail) -> Result { + let mut md = Metadata::new(EntryMode::DIR); + + md.set_last_modified(parse_datetime_from_rfc3339(&dir_detail.mtime)?); + + Ok(md) +} + +pub fn parse_file_detail(file_detail: FileDetail) -> Result { + let mut md = Metadata::new(EntryMode::FILE); + + md.set_content_length(file_detail.size); + md.set_last_modified(parse_datetime_from_rfc3339(&file_detail.last_modified)?); + + Ok(md) +} + +#[derive(Clone, Default)] +pub struct SeafileSigner { + pub auth_info: AuthInfo, +} + +#[derive(Clone, Default)] +pub struct AuthInfo { + /// The repo id of this auth info. + pub repo_id: String, + /// The token of this auth info, + pub token: String, +} + +#[derive(Deserialize)] +pub struct ListLibraryResponse { + pub name: String, + pub id: String, +} diff --git a/core/src/services/seafile/docs.md b/core/src/services/seafile/docs.md new file mode 100644 index 00000000000..4690405359f --- /dev/null +++ b/core/src/services/seafile/docs.md @@ -0,0 +1,56 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [ ] copy +- [ ] rename +- [x] list +- [x] scan +- [ ] presign +- [ ] blocking + +## Configuration + +- `root`: Set the work directory for backend +- `endpoint`: Seafile endpoint address +- `username` Seafile username +- `password` Seafile password +- `repo_name` Seafile repo name + +You can refer to [`SeafileBuilder`]'s docs for more information + +## Example + +### Via Builder + +```rust +use anyhow::Result; +use opendal::services::Seafile; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + // create backend builder + let mut builder = Seafile::default(); + + // set the storage bucket for OpenDAL + builder.root("/"); + // set the endpoint for OpenDAL + builder.endpoint("http://127.0.0.1:80"); + // set the username for OpenDAL + builder.username("xxxxxxxxxx"); + // set the password name for OpenDAL + builder.password("opendal"); + // set the repo_name for OpenDAL + builder.repo_name("xxxxxxxxxxxxx"); + + let op: Operator = Operator::new(builder)?.finish(); + + Ok(()) +} +``` diff --git a/core/src/services/seafile/error.rs b/core/src/services/seafile/error.rs new file mode 100644 index 00000000000..05a0fed9991 --- /dev/null +++ b/core/src/services/seafile/error.rs @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Buf; +use http::Response; +use serde::Deserialize; + +use crate::raw::*; +use crate::Error; +use crate::ErrorKind; +use crate::Result; + +/// the error response of seafile +#[derive(Default, Debug, Deserialize)] +#[allow(dead_code)] +struct SeafileError { + error_msg: String, +} + +/// Parse error response into Error. +pub async fn parse_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + let (kind, _retryable) = match parts.status.as_u16() { + 400 => (ErrorKind::InvalidInput, false), + 403 => (ErrorKind::PermissionDenied, false), + 404 => (ErrorKind::NotFound, false), + 520 => (ErrorKind::Unexpected, false), + _ => (ErrorKind::Unexpected, false), + }; + + let (message, _seafile_err) = serde_json::from_reader::<_, SeafileError>(bs.clone().reader()) + .map(|seafile_err| (format!("{seafile_err:?}"), Some(seafile_err))) + .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None)); + + let mut err = Error::new(kind, &message); + + err = with_error_response_context(err, parts); + + Ok(err) +} + +#[cfg(test)] +mod test { + use futures::stream; + use http::StatusCode; + + use super::*; + + #[tokio::test] + async fn test_parse_error() { + let err_res = vec![ + ( + r#"{"error_msg": "Permission denied"}"#, + ErrorKind::PermissionDenied, + StatusCode::FORBIDDEN, + ), + ( + r#"{"error_msg": "Folder /e982e75a-fead-487c-9f41-63094d9bf0de/a9d867b9-778d-4612-b674-47e674c14c28/ not found."}"#, + ErrorKind::NotFound, + StatusCode::NOT_FOUND, + ), + ]; + + for res in err_res { + let bs = bytes::Bytes::from(res.0); + let body = IncomingAsyncBody::new( + Box::new(oio::into_stream(stream::iter(vec![Ok(bs.clone())]))), + None, + ); + let resp = Response::builder().status(res.2).body(body).unwrap(); + + let err = parse_error(resp).await; + + assert!(err.is_ok()); + assert_eq!(err.unwrap().kind(), res.1); + } + } +} diff --git a/core/src/services/seafile/lister.rs b/core/src/services/seafile/lister.rs new file mode 100644 index 00000000000..c1ee6b94705 --- /dev/null +++ b/core/src/services/seafile/lister.rs @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use http::{header, Request, StatusCode}; +use serde::Deserialize; + +use super::core::SeafileCore; +use super::error::parse_error; + +use crate::raw::oio::Entry; +use crate::raw::*; +use crate::*; + +pub struct SeafileLister { + core: Arc, + + path: String, +} + +impl SeafileLister { + pub(super) fn new(core: Arc, path: &str) -> Self { + SeafileLister { + core, + path: path.to_string(), + } + } +} + +#[async_trait] +impl oio::PageList for SeafileLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { + let path = build_rooted_abs_path(&self.core.root, &self.path); + + let auth_info = self.core.get_auth_info().await?; + + let url = format!( + "{}/api2/repos/{}/dir/?p={}", + self.core.endpoint, + auth_info.repo_id, + percent_encode_path(&path) + ); + + let req = Request::get(url); + + let req = req + .header(header::AUTHORIZATION, format!("Token {}", auth_info.token)) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let resp_body = &resp.into_body().bytes().await?; + let infos = serde_json::from_slice::>(resp_body) + .map_err(new_json_deserialize_error)?; + + for info in infos { + if !info.name.is_empty() { + let rel_path = + build_rel_path(&self.core.root, &format!("{}{}", path, info.name)); + + let entry = if info.type_field == "file" { + let meta = Metadata::new(EntryMode::FILE) + .with_last_modified(parse_datetime_from_from_timestamp(info.mtime)?) + .with_content_length(info.size.unwrap_or(0)); + Entry::new(&rel_path, meta) + } else { + let path = format!("{}/", rel_path); + Entry::new(&path, Metadata::new(EntryMode::DIR)) + }; + + ctx.entries.push_back(entry); + } + } + + ctx.done = true; + + Ok(()) + } + // return nothing when not exist + StatusCode::NOT_FOUND => { + ctx.done = true; + Ok(()) + } + _ => { + return Err(parse_error(resp).await?); + } + } + } +} + +#[derive(Debug, Deserialize)] +struct Info { + #[serde(rename = "type")] + pub type_field: String, + pub mtime: i64, + pub size: Option, + pub name: String, +} diff --git a/core/src/services/seafile/mod.rs b/core/src/services/seafile/mod.rs new file mode 100644 index 00000000000..dc9e1ccce07 --- /dev/null +++ b/core/src/services/seafile/mod.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +pub use backend::SeafileBuilder as Seafile; +pub use backend::SeafileConfig; + +mod core; +mod error; +mod lister; +mod writer; diff --git a/core/src/services/seafile/writer.rs b/core/src/services/seafile/writer.rs new file mode 100644 index 00000000000..8c7c0ce9db6 --- /dev/null +++ b/core/src/services/seafile/writer.rs @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use http::{header, Request, StatusCode}; + +use super::core::SeafileCore; +use super::error::parse_error; +use crate::raw::*; +use crate::*; + +pub type SeafileWriters = oio::OneShotWriter; + +pub struct SeafileWriter { + core: Arc, + _op: OpWrite, + path: String, +} + +impl SeafileWriter { + pub fn new(core: Arc, op: OpWrite, path: String) -> Self { + SeafileWriter { + core, + _op: op, + path, + } + } +} + +#[async_trait] +impl oio::OneShotWrite for SeafileWriter { + async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> { + let path = build_abs_path(&self.core.root, &self.path); + let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining())); + + let upload_url = self.core.get_upload_url().await?; + + let req = Request::post(upload_url); + + let paths = path.split('/').collect::>(); + let filename = paths[paths.len() - 1]; + let relative_path = path.replace(filename, ""); + + let file_part = FormDataPart::new("file") + .header( + header::CONTENT_DISPOSITION, + format!("form-data; name=\"file\"; filename=\"{filename}\"") + .parse() + .unwrap(), + ) + .stream(bs.len() as u64, Box::new(bs)); + + let multipart = Multipart::new() + .part(file_part) + .part(FormDataPart::new("parent_dir").content("/")) + .part(FormDataPart::new("relative_path").content(relative_path.clone())) + .part(FormDataPart::new("replace").content("1")); + + let req = multipart.apply(req)?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } +} diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 785b8622964..c5f1f27b2c1 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -229,6 +229,8 @@ impl Operator { Scheme::Rocksdb => Self::from_map::(map)?.finish(), #[cfg(feature = "services-s3")] Scheme::S3 => Self::from_map::(map)?.finish(), + #[cfg(feature = "services-seafile")] + Scheme::Seafile => Self::from_map::(map)?.finish(), #[cfg(feature = "services-sftp")] Scheme::Sftp => Self::from_map::(map)?.finish(), #[cfg(feature = "services-sled")] diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index bdc8630deae..295de6a9630 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -40,6 +40,8 @@ pub enum Scheme { Azdls, /// [B2][crate::services::B2]: Backblaze B2 Services. B2, + /// [Seafile][crate::services::Seafile]: Seafile Services. + Seafile, /// [cacache][crate::services::Cacache]: cacache backend support. Cacache, /// [cloudflare-kv][crate::services::CloudflareKv]: Cloudflare KV services. @@ -239,6 +241,8 @@ impl Scheme { Scheme::Rocksdb, #[cfg(feature = "services-s3")] Scheme::S3, + #[cfg(feature = "services-seafile")] + Scheme::Seafile, #[cfg(feature = "services-sftp")] Scheme::Sftp, #[cfg(feature = "services-sled")] @@ -326,6 +330,7 @@ impl FromStr for Scheme { "redis" => Ok(Scheme::Redis), "rocksdb" => Ok(Scheme::Rocksdb), "s3" => Ok(Scheme::S3), + "seafile" => Ok(Scheme::Seafile), "sftp" => Ok(Scheme::Sftp), "sled" => Ok(Scheme::Sled), "supabase" => Ok(Scheme::Supabase), @@ -382,6 +387,7 @@ impl From for &'static str { Scheme::Redis => "redis", Scheme::Rocksdb => "rocksdb", Scheme::S3 => "s3", + Scheme::Seafile => "seafile", Scheme::Sftp => "sftp", Scheme::Sled => "sled", Scheme::Supabase => "supabase", diff --git a/fixtures/seafile/docker-compose-seafile.yml b/fixtures/seafile/docker-compose-seafile.yml new file mode 100644 index 00000000000..dc883f6a991 --- /dev/null +++ b/fixtures/seafile/docker-compose-seafile.yml @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +version: "3.8" +services: + db: + image: mariadb:10.11 + container_name: seafile-mysql + environment: + - MYSQL_ROOT_PASSWORD=db_dev # Requested, set the root's password of MySQL service. + - MYSQL_LOG_CONSOLE=true + networks: + - seafile-net + + memcached: + image: memcached:1.6.18 + container_name: seafile-memcached + entrypoint: memcached -m 256 + networks: + - seafile-net + + seafile: + image: seafileltd/seafile-mc:latest + container_name: seafile + ports: + - "80:80" + healthcheck: + test: curl --fail http://127.0.0.1:80/ || exit 1 + interval: 10s + timeout: 30s + retries: 5 + start_period: 60s + environment: + - DB_HOST=db + - DB_ROOT_PASSWD=db_dev # Requested, the value should be root's password of MySQL service. + - TIME_ZONE=Asia/Shanghai # Optional, default is UTC. Should be uncomment and set to your local time zone. + - SEAFILE_ADMIN_EMAIL=me@example.com # Specifies Seafile admin user, default is 'me@example.com'. + - SEAFILE_ADMIN_PASSWORD=asecret # Specifies Seafile admin password, default is 'asecret'. + - SEAFILE_SERVER_LETSENCRYPT=false # Whether to use https or not. + - SEAFILE_SERVER_HOSTNAME=127.0.0.1:80 # Specifies your host name if https is enabled. + depends_on: + - db + - memcached + networks: + - seafile-net + +networks: + seafile-net: