diff --git a/.env.example b/.env.example index 448f94ce194..0f2a3eb82b0 100644 --- a/.env.example +++ b/.env.example @@ -204,3 +204,8 @@ OPENDAL_PCLOUD_PASSWORD= # yandex-disk OPENDAL_YANDEX_DISK_ROOT=/path/to/dir OPENDAL_YANDEX_DISK_ACCESS_TOKEN= +# koofr +OPENDAL_KOOFR_DISK_ROOT=/path/to/dir +OPENDAL_KOOFR_ENDPOINT= +OPENDAL_KOOFR_EMAIL= +OPENDAL_KOOFR_PASSWORD=> \ No newline at end of file diff --git a/core/Cargo.toml b/core/Cargo.toml index 356c30d3c7e..51cc1b025a5 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -154,6 +154,7 @@ services-http = [] services-huggingface = [] services-ipfs = ["dep:prost"] services-ipmfs = [] +services-koofr = [] services-libsql = ["dep:hrana-client-proto"] services-memcached = ["dep:bb8"] services-memory = [] diff --git a/core/src/services/koofr/backend.rs b/core/src/services/koofr/backend.rs new file mode 100644 index 00000000000..6e19d9a7e78 --- /dev/null +++ b/core/src/services/koofr/backend.rs @@ -0,0 +1,396 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use async_trait::async_trait; +use http::StatusCode; +use log::debug; +use serde::Deserialize; +use tokio::sync::Mutex; +use tokio::sync::OnceCell; + +use super::core::File; +use super::core::KoofrCore; +use super::core::KoofrSigner; +use super::error::parse_error; +use super::lister::KoofrLister; +use super::writer::KoofrWriter; +use super::writer::KoofrWriters; +use crate::raw::*; +use crate::*; + +/// Config for backblaze Koofr services support. +#[derive(Default, Deserialize)] +#[serde(default)] +#[non_exhaustive] +pub struct KoofrConfig { + /// root of this backend. + /// + /// All operations will happen under this root. + pub root: Option, + /// Koofr endpoint. + pub endpoint: String, + /// Koofr email. + pub email: String, + /// password of this backend. + pub password: Option, +} + +impl Debug for KoofrConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("Config"); + + ds.field("root", &self.root); + ds.field("email", &self.email); + + ds.finish() + } +} + +/// [Koofr](https://app.koofr.net/) services support. +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct KoofrBuilder { + config: KoofrConfig, + + http_client: Option, +} + +impl Debug for KoofrBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut d = f.debug_struct("KoofrBuilder"); + + d.field("config", &self.config); + d.finish_non_exhaustive() + } +} + +impl KoofrBuilder { + /// Set root of this backend. + /// + /// All operations will happen under this root. + pub fn root(&mut self, root: &str) -> &mut Self { + self.config.root = if root.is_empty() { + None + } else { + Some(root.to_string()) + }; + + self + } + + /// endpoint. + /// + /// It is required. e.g. `https://api.koofr.net/` + pub fn endpoint(&mut self, endpoint: &str) -> &mut Self { + self.config.endpoint = endpoint.to_string(); + + self + } + + /// email. + /// + /// It is required. e.g. `test@example.com` + pub fn email(&mut self, email: &str) -> &mut Self { + self.config.email = email.to_string(); + + self + } + + /// Koofr app password. + /// + /// Go to https://app.koofr.net/app/admin/preferences/password. + /// Click "Generate Password" button to generate a new password. + pub fn password(&mut self, password: &str) -> &mut Self { + self.config.password = if password.is_empty() { + None + } else { + Some(password.to_string()) + }; + + self + } + + /// Specify the http client that used by this service. + /// + /// # Notes + /// + /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed + /// during minor updates. + pub fn http_client(&mut self, client: HttpClient) -> &mut Self { + self.http_client = Some(client); + self + } +} + +impl Builder for KoofrBuilder { + const SCHEME: Scheme = Scheme::Koofr; + type Accessor = KoofrBackend; + + /// Converts a HashMap into an KoofrBuilder instance. + /// + /// # Arguments + /// + /// * `map` - A HashMap containing the configuration values. + /// + /// # Returns + /// + /// Returns an instance of KoofrBuilder. + fn from_map(map: HashMap) -> Self { + // Deserialize the configuration from the HashMap. + let config = KoofrConfig::deserialize(ConfigDeserializer::new(map)) + .expect("config deserialize must succeed"); + + // Create an KoofrBuilder instance with the deserialized config. + KoofrBuilder { + config, + http_client: None, + } + } + + /// Builds the backend and returns the result of KoofrBackend. + fn build(&mut self) -> Result { + debug!("backend build started: {:?}", &self); + + let root = normalize_root(&self.config.root.clone().unwrap_or_default()); + debug!("backend use root {}", &root); + + // Handle endpoint. + if self.config.endpoint.is_empty() { + return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Koofr)); + } + + debug!("backend use endpoint {}", &self.config.endpoint); + + // Handle email. + if self.config.email.is_empty() { + return Err(Error::new(ErrorKind::ConfigInvalid, "email is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Koofr)); + } + + debug!("backend use email {}", &self.config.email); + + let password = match &self.config.password { + Some(password) => Ok(password.clone()), + None => Err(Error::new(ErrorKind::ConfigInvalid, "password is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::Koofr)), + }?; + + let client = if let Some(client) = self.http_client.take() { + client + } else { + HttpClient::new().map_err(|err| { + err.with_operation("Builder::build") + .with_context("service", Scheme::Koofr) + })? + }; + + let signer = Arc::new(Mutex::new(KoofrSigner::default())); + + Ok(KoofrBackend { + core: Arc::new(KoofrCore { + root, + endpoint: self.config.endpoint.clone(), + email: self.config.email.clone(), + password, + mount_id: OnceCell::new(), + signer, + client, + }), + }) + } +} + +/// Backend for Koofr services. +#[derive(Debug, Clone)] +pub struct KoofrBackend { + core: Arc, +} + +#[async_trait] +impl Accessor for KoofrBackend { + type Reader = IncomingAsyncBody; + type Writer = KoofrWriters; + type Lister = oio::PageLister; + type BlockingReader = (); + type BlockingWriter = (); + type BlockingLister = (); + + fn info(&self) -> AccessorInfo { + let mut am = AccessorInfo::default(); + am.set_scheme(Scheme::Koofr) + .set_root(&self.core.root) + .set_native_capability(Capability { + stat: true, + + create_dir: true, + + read: true, + + write: true, + write_can_empty: true, + + delete: true, + rename: true, + copy: true, + + list: true, + + ..Default::default() + }); + + am + } + + async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { + self.core.ensure_dir_exists(path).await?; + self.core + .create_dir(&build_abs_path(&self.core.root, path)) + .await?; + Ok(RpCreateDir::default()) + } + + async fn stat(&self, path: &str, _args: OpStat) -> Result { + let path = build_rooted_abs_path(&self.core.root, path); + let resp = self.core.info(&path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + + let file: File = serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; + + let mode = if file.ty == "dir" { + EntryMode::DIR + } else { + EntryMode::FILE + }; + + let mut md = Metadata::new(mode); + + md.set_content_length(file.size) + .set_content_type(&file.content_type) + .set_last_modified(parse_datetime_from_from_timestamp_millis(file.modified)?); + + Ok(RpStat::new(md)) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> { + let resp = self.core.get(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK | StatusCode::PARTIAL_CONTENT => { + let size = parse_content_length(resp.headers())?; + let range = parse_content_range(resp.headers())?; + Ok(( + RpRead::new().with_size(size).with_range(range), + resp.into_body(), + )) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let writer = KoofrWriter::new(self.core.clone(), path.to_string()); + + let w = oio::OneShotWriter::new(writer); + + Ok((RpWrite::default(), w)) + } + + async fn delete(&self, path: &str, _: OpDelete) -> Result { + let resp = self.core.remove(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(RpDelete::default()), + // Allow 404 when deleting a non-existing object + StatusCode::NOT_FOUND => Ok(RpDelete::default()), + _ => Err(parse_error(resp).await?), + } + } + + async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> { + let l = KoofrLister::new(self.core.clone(), path); + Ok((RpList::default(), oio::PageLister::new(l))) + } + + async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { + self.core.ensure_dir_exists(to).await?; + let resp = self.core.remove(to).await?; + + let status = resp.status(); + + if status != StatusCode::OK && status != StatusCode::NOT_FOUND { + return Err(parse_error(resp).await?); + } + + let resp = self.core.copy(from, to).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + + Ok(RpCopy::default()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { + self.core.ensure_dir_exists(to).await?; + let resp = self.core.remove(to).await?; + + let status = resp.status(); + + if status != StatusCode::OK && status != StatusCode::NOT_FOUND { + return Err(parse_error(resp).await?); + } + + let resp = self.core.move_object(from, to).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + + Ok(RpRename::default()) + } + _ => Err(parse_error(resp).await?), + } + } +} diff --git a/core/src/services/koofr/core.rs b/core/src/services/koofr/core.rs new file mode 100644 index 00000000000..dd30bed7d32 --- /dev/null +++ b/core/src/services/koofr/core.rs @@ -0,0 +1,450 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::VecDeque; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use bytes::Bytes; +use http::header; +use http::request; +use http::Request; +use http::Response; +use http::StatusCode; +use serde::Deserialize; +use serde_json::json; +use tokio::sync::Mutex; +use tokio::sync::OnceCell; + +use crate::raw::*; +use crate::*; + +use super::error::parse_error; + +#[derive(Clone)] +pub struct KoofrCore { + /// The root of this core. + pub root: String, + /// The endpoint of this backend. + pub endpoint: String, + /// Koofr email + pub email: String, + /// Koofr password + pub password: String, + + /// signer of this backend. + pub signer: Arc>, + + // Koofr mount_id. + pub mount_id: OnceCell, + + pub client: HttpClient, +} + +impl Debug for KoofrCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Backend") + .field("root", &self.root) + .field("endpoint", &self.endpoint) + .field("email", &self.email) + .finish_non_exhaustive() + } +} + +impl KoofrCore { + #[inline] + pub async fn send(&self, req: Request) -> Result> { + self.client.send(req).await + } + + pub async fn get_mount_id(&self) -> Result<&String> { + self.mount_id + .get_or_try_init(|| async { + let req = Request::get(format!("{}/api/v2/mounts", self.endpoint)); + + let req = self.sign(req).await?; + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.send(req).await?; + + let status = resp.status(); + + if status != StatusCode::OK { + return Err(parse_error(resp).await?); + } + + let bs = resp.into_body().bytes().await?; + + let resp: MountsResponse = + serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; + + for mount in resp.mounts { + if mount.is_primary { + return Ok(mount.id); + } + } + + Err(Error::new(ErrorKind::Unexpected, "No primary mount found")) + }) + .await + } + + pub async fn sign(&self, req: request::Builder) -> Result { + let mut signer = self.signer.lock().await; + if !signer.token.is_empty() { + return Ok(req.header( + header::AUTHORIZATION, + format!("Token token={}", signer.token), + )); + } + + let url = format!("{}/token", self.endpoint); + + let body = json!({ + "email": self.email, + "password": self.password, + }); + + let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?; + + let auth_req = Request::post(url) + .header(header::CONTENT_TYPE, "application/json") + .body(AsyncBody::Bytes(Bytes::from(bs))) + .map_err(new_request_build_error)?; + + let resp = self.client.send(auth_req).await?; + + let status = resp.status(); + + if status != StatusCode::OK { + return Err(parse_error(resp).await?); + } + + let bs = resp.into_body().bytes().await?; + let resp: TokenResponse = + serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; + + signer.token = resp.token; + + Ok(req.header( + header::AUTHORIZATION, + format!("Token token={}", signer.token), + )) + } +} + +impl KoofrCore { + pub async fn ensure_dir_exists(&self, path: &str) -> Result<()> { + let mut dirs = VecDeque::default(); + + let mut p = build_abs_path(&self.root, path); + + while p != "/" { + let parent = get_parent(&p).to_string(); + + dirs.push_front(parent.clone()); + p = parent; + } + + for dir in dirs { + self.create_dir(&dir).await?; + } + + Ok(()) + } + + pub async fn create_dir(&self, path: &str) -> Result<()> { + let resp = self.info(path).await?; + + let status = resp.status(); + + match status { + StatusCode::NOT_FOUND => { + let name = get_basename(path).trim_end_matches('/'); + let parent = get_parent(path); + + let mount_id = self.get_mount_id().await?; + + let url = format!( + "{}/api/v2/mounts/{}/files/folder?path={}", + self.endpoint, + mount_id, + percent_encode_path(parent) + ); + + let body = json!({ + "name": name + }); + + let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?; + + let req = Request::post(url); + + let req = self.sign(req).await?; + + let req = req + .header(header::CONTENT_TYPE, "application/json") + .body(AsyncBody::Bytes(Bytes::from(bs))) + .map_err(new_request_build_error)?; + + let resp = self.client.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK | StatusCode::CREATED => Ok(()), + _ => Err(parse_error(resp).await?), + } + } + StatusCode::OK => Ok(()), + _ => Err(parse_error(resp).await?), + } + } + + pub async fn info(&self, path: &str) -> Result> { + let mount_id = self.get_mount_id().await?; + + let url = format!( + "{}/api/v2/mounts/{}/files/info?path={}", + self.endpoint, + mount_id, + percent_encode_path(path) + ); + + let req = Request::get(url); + + let req = self.sign(req).await?; + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn get(&self, path: &str) -> Result> { + let path = build_rooted_abs_path(&self.root, path); + + let mount_id = self.get_mount_id().await?; + + let url = format!( + "{}/api/v2/mounts/{}/files/get?path={}", + self.endpoint, + mount_id, + percent_encode_path(&path) + ); + + let req = Request::get(url); + + let req = self.sign(req).await?; + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn put(&self, path: &str, bs: Bytes) -> Result> { + let path = build_rooted_abs_path(&self.root, path); + + let filename = get_basename(&path); + let parent = get_parent(&path); + + let mount_id = self.get_mount_id().await?; + + let url = format!( + "{}/content/api/v2/mounts/{}/files/put?path={}&filename={}&info=true&overwriteIgnoreNonexisting=&autorename=false&overwrite=true", + self.endpoint, + mount_id, + percent_encode_path(parent), + percent_encode_path(filename) + ); + + let file_part = FormDataPart::new("file") + .header( + header::CONTENT_DISPOSITION, + format!("form-data; name=\"file\"; filename=\"{filename}\"") + .parse() + .unwrap(), + ) + .content(bs); + + let multipart = Multipart::new().part(file_part); + + let req = Request::post(url); + + let req = self.sign(req).await?; + + let req = multipart.apply(req)?; + + self.send(req).await + } + + pub async fn remove(&self, path: &str) -> Result> { + let path = build_rooted_abs_path(&self.root, path); + + let mount_id = self.get_mount_id().await?; + + let url = format!( + "{}/api/v2/mounts/{}/files/remove?path={}", + self.endpoint, + mount_id, + percent_encode_path(&path) + ); + + let req = Request::delete(url); + + let req = self.sign(req).await?; + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn copy(&self, from: &str, to: &str) -> Result> { + let from = build_rooted_abs_path(&self.root, from); + let to = build_rooted_abs_path(&self.root, to); + + let mount_id = self.get_mount_id().await?; + + let url = format!( + "{}/api/v2/mounts/{}/files/copy?path={}", + self.endpoint, + mount_id, + percent_encode_path(&from), + ); + + let body = json!({ + "toMountId": mount_id, + "toPath": to, + }); + + let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?; + + let req = Request::put(url); + + let req = self.sign(req).await?; + + let req = req + .header(header::CONTENT_TYPE, "application/json") + .body(AsyncBody::Bytes(Bytes::from(bs))) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn move_object(&self, from: &str, to: &str) -> Result> { + let from = build_rooted_abs_path(&self.root, from); + let to = build_rooted_abs_path(&self.root, to); + + let mount_id = self.get_mount_id().await?; + + let url = format!( + "{}/api/v2/mounts/{}/files/move?path={}", + self.endpoint, + mount_id, + percent_encode_path(&from), + ); + + let body = json!({ + "toMountId": mount_id, + "toPath": to, + }); + + let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?; + + let req = Request::put(url); + + let req = self.sign(req).await?; + + let req = req + .header(header::CONTENT_TYPE, "application/json") + .body(AsyncBody::Bytes(Bytes::from(bs))) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn list(&self, path: &str) -> Result> { + let path = build_rooted_abs_path(&self.root, path); + + let mount_id = self.get_mount_id().await?; + + let url = format!( + "{}/api/v2/mounts/{}/files/list?path={}", + self.endpoint, + mount_id, + percent_encode_path(&path) + ); + + let req = Request::get(url); + + let req = self.sign(req).await?; + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.send(req).await + } +} + +#[derive(Clone, Default)] +pub struct KoofrSigner { + pub token: String, +} + +#[derive(Debug, Deserialize)] +pub struct TokenResponse { + pub token: String, +} + +#[derive(Debug, Deserialize)] +pub struct MountsResponse { + pub mounts: Vec, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Mount { + pub id: String, + pub is_primary: bool, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ListResponse { + pub files: Vec, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct File { + pub name: String, + #[serde(rename = "type")] + pub ty: String, + pub size: u64, + pub modified: i64, + pub content_type: String, +} diff --git a/core/src/services/koofr/docs.md b/core/src/services/koofr/docs.md new file mode 100644 index 00000000000..e7bea550bf0 --- /dev/null +++ b/core/src/services/koofr/docs.md @@ -0,0 +1,53 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [x] copy +- [x] rename +- [x] list +- [x] scan +- [ ] presign +- [ ] blocking + +## Configuration + +- `root`: Set the work directory for backend +- `endpoint`: Koofr endpoint +- `email` Koofr email +- `password` Koofr password + +You can refer to [`KoofrBuilder`]'s docs for more information + +## Example + +### Via Builder + +```rust +use anyhow::Result; +use opendal::services::Koofr; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + // create backend builder + let mut builder = Koofr::default(); + + // set the storage bucket for OpenDAL + builder.root("/"); + // set the bucket for OpenDAL + builder.endpoint("https://api.koofr.net/"); + // set the email for OpenDAL + builder.email("me@example.com"); + // set the password for OpenDAL + builder.password("xxx xxx xxx xxx"); + + let op: Operator = Operator::new(builder)?.finish(); + + Ok(()) +} +``` diff --git a/core/src/services/koofr/error.rs b/core/src/services/koofr/error.rs new file mode 100644 index 00000000000..c3104c781fd --- /dev/null +++ b/core/src/services/koofr/error.rs @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use http::Response; + +use crate::raw::*; +use crate::Error; +use crate::ErrorKind; +use crate::Result; + +/// Parse error response into Error. +pub async fn parse_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + let (kind, retryable) = match parts.status.as_u16() { + 403 => (ErrorKind::PermissionDenied, false), + 404 => (ErrorKind::NotFound, false), + 304 | 412 => (ErrorKind::ConditionNotMatch, false), + // Service like Koofr could return 499 error with a message like: + // Client Disconnect, we should retry it. + 499 => (ErrorKind::Unexpected, true), + 500 | 502 | 503 | 504 => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + + let message = String::from_utf8_lossy(&bs).into_owned(); + + let mut err = Error::new(kind, &message); + + err = with_error_response_context(err, parts); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) +} + +#[cfg(test)] +mod test { + use futures::stream; + use http::StatusCode; + + use super::*; + + #[tokio::test] + async fn test_parse_error() { + let err_res = vec![(r#""#, ErrorKind::NotFound, StatusCode::NOT_FOUND)]; + + for res in err_res { + let bs = bytes::Bytes::from(res.0); + let body = IncomingAsyncBody::new( + Box::new(oio::into_stream(stream::iter(vec![Ok(bs.clone())]))), + None, + ); + let resp = Response::builder().status(res.2).body(body).unwrap(); + + let err = parse_error(resp).await; + + assert!(err.is_ok()); + assert_eq!(err.unwrap().kind(), res.1); + } + } +} diff --git a/core/src/services/koofr/lister.rs b/core/src/services/koofr/lister.rs new file mode 100644 index 00000000000..8ad58a58d2c --- /dev/null +++ b/core/src/services/koofr/lister.rs @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; + +use super::core::KoofrCore; +use super::core::ListResponse; +use super::error::parse_error; +use crate::raw::oio::Entry; +use crate::raw::*; +use crate::EntryMode; +use crate::Metadata; +use crate::Result; + +pub struct KoofrLister { + core: Arc, + + path: String, +} + +impl KoofrLister { + pub(super) fn new(core: Arc, path: &str) -> Self { + KoofrLister { + core, + path: path.to_string(), + } + } +} + +#[async_trait] +impl oio::PageList for KoofrLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { + let resp = self.core.list(&self.path).await?; + + if resp.status() == http::StatusCode::NOT_FOUND { + ctx.done = true; + return Ok(()); + } + + match resp.status() { + http::StatusCode::OK => {} + _ => { + return Err(parse_error(resp).await?); + } + } + + let bs = resp.into_body().bytes().await?; + + let response: ListResponse = + serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; + + for file in response.files { + let path = build_abs_path(&normalize_root(&self.path), &file.name); + + let entry = if file.ty == "dir" { + let path = format!("{}/", path); + Entry::new(&path, Metadata::new(EntryMode::DIR)) + } else { + let m = Metadata::new(EntryMode::FILE) + .with_content_length(file.size) + .with_content_type(file.content_type) + .with_last_modified(parse_datetime_from_from_timestamp_millis(file.modified)?); + Entry::new(&path, m) + }; + + ctx.entries.push_back(entry); + } + + ctx.done = true; + + Ok(()) + } +} diff --git a/core/src/services/koofr/mod.rs b/core/src/services/koofr/mod.rs new file mode 100644 index 00000000000..445d69fbb82 --- /dev/null +++ b/core/src/services/koofr/mod.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +pub use backend::KoofrBuilder as Koofr; +pub use backend::KoofrConfig; + +mod core; +mod error; +mod lister; +mod writer; diff --git a/core/src/services/koofr/writer.rs b/core/src/services/koofr/writer.rs new file mode 100644 index 00000000000..e59b2a1d51a --- /dev/null +++ b/core/src/services/koofr/writer.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use http::StatusCode; + +use super::core::KoofrCore; +use super::error::parse_error; +use crate::raw::*; +use crate::*; + +pub type KoofrWriters = oio::OneShotWriter; + +pub struct KoofrWriter { + core: Arc, + path: String, +} + +impl KoofrWriter { + pub fn new(core: Arc, path: String) -> Self { + KoofrWriter { core, path } + } +} + +#[async_trait] +impl oio::OneShotWrite for KoofrWriter { + async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> { + self.core.ensure_dir_exists(&self.path).await?; + + let bs = bs.bytes(bs.remaining()); + + let resp = self.core.put(&self.path, bs).await?; + + let status = resp.status(); + + match status { + StatusCode::OK | StatusCode::CREATED => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } +} diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 8aef3e8f291..3f4c63af2bc 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -366,3 +366,10 @@ mod yandex_disk; pub use yandex_disk::YandexDisk; #[cfg(feature = "services-yandex-disk")] pub use yandex_disk::YandexDiskConfig; + +#[cfg(feature = "services-koofr")] +mod koofr; +#[cfg(feature = "services-koofr")] +pub use koofr::Koofr; +#[cfg(feature = "services-koofr")] +pub use koofr::KoofrConfig; diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 91d4a2cf371..deeb124a98c 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -159,6 +159,8 @@ impl Operator { Scheme::Alluxio => Self::from_map::(map)?.finish(), #[cfg(feature = "services-upyun")] Scheme::Upyun => Self::from_map::(map)?.finish(), + #[cfg(feature = "services-koofr")] + Scheme::Koofr => Self::from_map::(map)?.finish(), #[cfg(feature = "services-yandex-disk")] Scheme::YandexDisk => Self::from_map::(map)?.finish(), #[cfg(feature = "services-pcloud")] diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 289fb9ca365..7bc2136ff07 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -48,6 +48,8 @@ pub enum Scheme { YandexDisk, /// [Pcloud][crate::services::Pcloud]: Pcloud Services. Pcloud, + /// [Koofr][crate::services::Koofr]: Koofr Services. + Koofr, /// [Chainsafe][crate::services::Chainsafe]: Chainsafe Services. Chainsafe, /// [cacache][crate::services::Cacache]: cacache backend support. @@ -334,6 +336,7 @@ impl FromStr for Scheme { "ftp" | "ftps" => Ok(Scheme::Ftp), "ipfs" | "ipns" => Ok(Scheme::Ipfs), "ipmfs" => Ok(Scheme::Ipmfs), + "koofr" => Ok(Scheme::Koofr), "libsql" => Ok(Scheme::Libsql), "memcached" => Ok(Scheme::Memcached), "memory" => Ok(Scheme::Memory), @@ -396,6 +399,7 @@ impl From for &'static str { Scheme::Ftp => "ftp", Scheme::Ipfs => "ipfs", Scheme::Ipmfs => "ipmfs", + Scheme::Koofr => "koofr", Scheme::Libsql => "libsql", Scheme::Memcached => "memcached", Scheme::Memory => "memory",