diff --git a/.env.example b/.env.example index 746346e04f5..849e09786be 100644 --- a/.env.example +++ b/.env.example @@ -199,4 +199,7 @@ OPENDAL_CHAINSAFE_API_KEY= OPENDAL_PCLOUD_ROOT=/path/to/dir OPENDAL_PCLOUD_ENDPOINT= OPENDAL_PCLOUD_USERNAME= -OPENDAL_PCLOUD_PASSWORD= \ No newline at end of file +OPENDAL_PCLOUD_PASSWORD= +# yandex-disk +OPENDAL_YANDEX_DISK_ROOT=/path/to/dir +OPENDAL_YANDEX_DISK_ACCESS_TOKEN= diff --git a/core/Cargo.toml b/core/Cargo.toml index 11c92212a06..01344773526 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -195,6 +195,7 @@ services-vercel-artifacts = [] services-wasabi = [] services-webdav = [] services-webhdfs = [] +services-yandex-disk = [] [lib] bench = false diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index ad9d2368ac0..f67d2a23349 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -347,3 +347,10 @@ mod pcloud; pub use pcloud::Pcloud; #[cfg(feature = "services-pcloud")] pub use pcloud::PcloudConfig; + +#[cfg(feature = "services-yandex-disk")] +mod yandex_disk; +#[cfg(feature = "services-yandex-disk")] +pub use yandex_disk::YandexDisk; +#[cfg(feature = "services-yandex-disk")] +pub use yandex_disk::YandexDiskConfig; diff --git a/core/src/services/yandex_disk/backend.rs b/core/src/services/yandex_disk/backend.rs new file mode 100644 index 00000000000..d22032bc912 --- /dev/null +++ b/core/src/services/yandex_disk/backend.rs @@ -0,0 +1,325 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use http::Request; +use http::StatusCode; +use log::debug; +use serde::Deserialize; +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use super::core::*; +use super::error::parse_error; +use super::lister::YandexDiskLister; +use super::writer::YandexDiskWriter; +use super::writer::YandexDiskWriters; +use crate::raw::*; +use crate::*; + +/// Config for backblaze YandexDisk services support. +#[derive(Default, Deserialize)] +#[serde(default)] +#[non_exhaustive] +pub struct YandexDiskConfig { + /// root of this backend. + /// + /// All operations will happen under this root. + pub root: Option, + /// yandex disk oauth access_token. + pub access_token: String, +} + +impl Debug for YandexDiskConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("Config"); + + ds.field("root", &self.root); + + ds.finish() + } +} + +/// [YandexDisk](https://360.yandex.com/disk/) services support. +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct YandexDiskBuilder { + config: YandexDiskConfig, + + http_client: Option, +} + +impl Debug for YandexDiskBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut d = f.debug_struct("YandexDiskBuilder"); + + d.field("config", &self.config); + d.finish_non_exhaustive() + } +} + +impl YandexDiskBuilder { + /// Set root of this backend. + /// + /// All operations will happen under this root. + pub fn root(&mut self, root: &str) -> &mut Self { + self.config.root = if root.is_empty() { + None + } else { + Some(root.to_string()) + }; + + self + } + + /// yandex disk oauth access_token. + /// The valid token will looks like `y0_XXXXXXqihqIWAADLWwAAAAD3IXXXXXX0gtVeSPeIKM0oITMGhXXXXXX`. + /// We can fetch the debug token from . + /// To use it in production, please register an app at instead. + pub fn access_token(&mut self, access_token: &str) -> &mut Self { + self.config.access_token = access_token.to_string(); + + self + } + + /// Specify the http client that used by this service. + /// + /// # Notes + /// + /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed + /// during minor updates. + pub fn http_client(&mut self, client: HttpClient) -> &mut Self { + self.http_client = Some(client); + self + } +} + +impl Builder for YandexDiskBuilder { + const SCHEME: Scheme = Scheme::YandexDisk; + type Accessor = YandexDiskBackend; + + /// Converts a HashMap into an YandexDiskBuilder instance. + /// + /// # Arguments + /// + /// * `map` - A HashMap containing the configuration values. + /// + /// # Returns + /// + /// Returns an instance of YandexDiskBuilder. + fn from_map(map: HashMap) -> Self { + // Deserialize the configuration from the HashMap. + let config = YandexDiskConfig::deserialize(ConfigDeserializer::new(map)) + .expect("config deserialize must succeed"); + + // Create an YandexDiskBuilder instance with the deserialized config. + YandexDiskBuilder { + config, + http_client: None, + } + } + + /// Builds the backend and returns the result of YandexDiskBackend. + fn build(&mut self) -> Result { + debug!("backend build started: {:?}", &self); + + let root = normalize_root(&self.config.root.clone().unwrap_or_default()); + debug!("backend use root {}", &root); + + // Handle oauth access_token. + if self.config.access_token.is_empty() { + return Err( + Error::new(ErrorKind::ConfigInvalid, "access_token is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::YandexDisk), + ); + } + + let client = if let Some(client) = self.http_client.take() { + client + } else { + HttpClient::new().map_err(|err| { + err.with_operation("Builder::build") + .with_context("service", Scheme::YandexDisk) + })? + }; + + Ok(YandexDiskBackend { + core: Arc::new(YandexDiskCore { + root, + access_token: self.config.access_token.clone(), + client, + }), + }) + } +} + +/// Backend for YandexDisk services. +#[derive(Debug, Clone)] +pub struct YandexDiskBackend { + core: Arc, +} + +#[async_trait] +impl Accessor for YandexDiskBackend { + type Reader = IncomingAsyncBody; + type Writer = YandexDiskWriters; + type Lister = oio::PageLister; + type BlockingReader = (); + type BlockingWriter = (); + type BlockingLister = (); + + fn info(&self) -> AccessorInfo { + let mut am = AccessorInfo::default(); + am.set_scheme(Scheme::YandexDisk) + .set_root(&self.core.root) + .set_native_capability(Capability { + stat: true, + + create_dir: true, + + read: true, + + write: true, + write_can_empty: true, + + delete: true, + rename: true, + copy: true, + + list: true, + list_with_limit: true, + + ..Default::default() + }); + + am + } + + async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { + self.core.ensure_dir_exists(path).await?; + + Ok(RpCreateDir::default()) + } + + async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { + self.core.ensure_dir_exists(to).await?; + + let resp = self.core.move_object(from, to).await?; + + let status = resp.status(); + + match status { + StatusCode::OK | StatusCode::CREATED => { + resp.into_body().consume().await?; + + Ok(RpRename::default()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { + self.core.ensure_dir_exists(to).await?; + + let resp = self.core.copy(from, to).await?; + + let status = resp.status(); + + match status { + StatusCode::OK | StatusCode::CREATED => { + resp.into_body().consume().await?; + + Ok(RpCopy::default()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> { + let download_url = self.core.get_download_url(path).await?; + + let req = Request::get(download_url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + let resp = self.core.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let size = parse_content_length(resp.headers())?; + let range = parse_content_range(resp.headers())?; + Ok(( + RpRead::new().with_size(size).with_range(range), + resp.into_body(), + )) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn stat(&self, path: &str, _args: OpStat) -> Result { + let resp = self.core.metainformation(path, None, None).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + + let mf: MetainformationResponse = + serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; + + parse_info(mf).map(RpStat::new) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let writer = YandexDiskWriter::new(self.core.clone(), path.to_string()); + + let w = oio::OneShotWriter::new(writer); + + Ok((RpWrite::default(), w)) + } + + async fn delete(&self, path: &str, _: OpDelete) -> Result { + let resp = self.core.delete(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(RpDelete::default()), + StatusCode::NO_CONTENT => Ok(RpDelete::default()), + // Yandex Disk deleting a non-empty folder can take an unknown amount of time, + // So the API responds with the code 202 Accepted (the deletion process has started). + StatusCode::ACCEPTED => Ok(RpDelete::default()), + // Allow 404 when deleting a non-existing object + StatusCode::NOT_FOUND => Ok(RpDelete::default()), + _ => Err(parse_error(resp).await?), + } + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + let l = YandexDiskLister::new(self.core.clone(), path, args.limit()); + Ok((RpList::default(), oio::PageLister::new(l))) + } +} diff --git a/core/src/services/yandex_disk/core.rs b/core/src/services/yandex_disk/core.rs new file mode 100644 index 00000000000..d3852ea79e7 --- /dev/null +++ b/core/src/services/yandex_disk/core.rs @@ -0,0 +1,317 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{Debug, Formatter}; + +use http::{header, request, Request, Response, StatusCode}; +use serde::Deserialize; + +use crate::raw::*; +use crate::*; + +use super::error::parse_error; + +#[derive(Clone)] +pub struct YandexDiskCore { + /// The root of this core. + pub root: String, + /// Yandex Disk oauth access_token. + pub access_token: String, + + pub client: HttpClient, +} + +impl Debug for YandexDiskCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Backend") + .field("root", &self.root) + .finish_non_exhaustive() + } +} + +impl YandexDiskCore { + #[inline] + pub async fn send(&self, req: Request) -> Result> { + self.client.send(req).await + } + + #[inline] + pub fn sign(&self, req: request::Builder) -> request::Builder { + req.header( + header::AUTHORIZATION, + format!("OAuth {}", self.access_token), + ) + } +} + +impl YandexDiskCore { + /// Get upload url. + pub async fn get_upload_url(&self, path: &str) -> Result { + let path = build_rooted_abs_path(&self.root, path); + + let url = format!( + "https://cloud-api.yandex.net/v1/disk/resources/upload?path={}&overwrite=true", + percent_encode_path(&path) + ); + + let req = Request::get(url); + + let req = self.sign(req); + + // Set body + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let bytes = resp.into_body().bytes().await?; + + let resp: GetUploadUrlResponse = + serde_json::from_slice(&bytes).map_err(new_json_deserialize_error)?; + + Ok(resp.href) + } + _ => Err(parse_error(resp).await?), + } + } + + pub async fn get_download_url(&self, path: &str) -> Result { + let path = build_rooted_abs_path(&self.root, path); + + let url = format!( + "https://cloud-api.yandex.net/v1/disk/resources/download?path={}&overwrite=true", + percent_encode_path(&path) + ); + + let req = Request::get(url); + + let req = self.sign(req); + + // Set body + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let bytes = resp.into_body().bytes().await?; + + let resp: GetUploadUrlResponse = + serde_json::from_slice(&bytes).map_err(new_json_deserialize_error)?; + + Ok(resp.href) + } + _ => Err(parse_error(resp).await?), + } + } + + pub async fn ensure_dir_exists(&self, path: &str) -> Result<()> { + let path = build_abs_path(&self.root, path); + + let paths = path.split('/').collect::>(); + + for i in 0..paths.len() - 1 { + let path = paths[..i + 1].join("/"); + let resp = self.create_dir(&path).await?; + + let status = resp.status(); + + match status { + StatusCode::CREATED | StatusCode::CONFLICT => { + resp.into_body().consume().await?; + } + _ => return Err(parse_error(resp).await?), + } + } + Ok(()) + } + + pub async fn create_dir(&self, path: &str) -> Result> { + let url = format!( + "https://cloud-api.yandex.net/v1/disk/resources?path=/{}", + percent_encode_path(path), + ); + + let req = Request::put(url); + + let req = self.sign(req); + + // Set body + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn copy(&self, from: &str, to: &str) -> Result> { + let from = build_rooted_abs_path(&self.root, from); + let to = build_rooted_abs_path(&self.root, to); + + let url = format!( + "https://cloud-api.yandex.net/v1/disk/resources/copy?from={}&path={}&overwrite=true", + percent_encode_path(&from), + percent_encode_path(&to) + ); + + let req = Request::post(url); + + let req = self.sign(req); + + // Set body + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn move_object(&self, from: &str, to: &str) -> Result> { + let from = build_rooted_abs_path(&self.root, from); + let to = build_rooted_abs_path(&self.root, to); + + let url = format!( + "https://cloud-api.yandex.net/v1/disk/resources/move?from={}&path={}&overwrite=true", + percent_encode_path(&from), + percent_encode_path(&to) + ); + + let req = Request::post(url); + + let req = self.sign(req); + + // Set body + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn delete(&self, path: &str) -> Result> { + let path = build_rooted_abs_path(&self.root, path); + + let url = format!( + "https://cloud-api.yandex.net/v1/disk/resources?path={}&permanently=true", + percent_encode_path(&path), + ); + + let req = Request::delete(url); + + let req = self.sign(req); + + // Set body + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.send(req).await + } + + pub async fn metainformation( + &self, + path: &str, + limit: Option, + offset: Option, + ) -> Result> { + let path = build_rooted_abs_path(&self.root, path); + + let mut url = format!( + "https://cloud-api.yandex.net/v1/disk/resources?path={}", + percent_encode_path(&path), + ); + + if let Some(limit) = limit { + url = format!("{}&limit={}", url, limit); + } + + if let Some(offset) = offset { + url = format!("{}&offset={}", url, offset); + } + + let req = Request::get(url); + + let req = self.sign(req); + + // Set body + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.send(req).await + } +} + +pub(super) fn parse_info(mf: MetainformationResponse) -> Result { + let mode = if mf.ty == "file" { + EntryMode::FILE + } else { + EntryMode::DIR + }; + + let mut m = Metadata::new(mode); + + m.set_last_modified(parse_datetime_from_rfc3339(&mf.modified)?); + + if let Some(md5) = mf.md5 { + m.set_content_md5(&md5); + } + + if let Some(mime_type) = mf.mime_type { + m.set_content_type(&mime_type); + } + + if let Some(size) = mf.size { + m.set_content_length(size); + } + + Ok(m) +} + +#[derive(Debug, Deserialize)] +pub struct GetUploadUrlResponse { + pub href: String, +} + +#[derive(Debug, Deserialize)] +pub struct MetainformationResponse { + #[serde(rename = "type")] + pub ty: String, + pub name: String, + pub path: String, + pub modified: String, + pub md5: Option, + pub mime_type: Option, + pub size: Option, + #[serde(rename = "_embedded")] + pub embedded: Option, +} + +#[derive(Debug, Deserialize)] +pub struct Embedded { + pub total: usize, + pub items: Vec, +} diff --git a/core/src/services/yandex_disk/docs.md b/core/src/services/yandex_disk/docs.md new file mode 100644 index 00000000000..5a551197143 --- /dev/null +++ b/core/src/services/yandex_disk/docs.md @@ -0,0 +1,47 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [x] copy +- [x] rename +- [x] list +- [x] scan +- [ ] presign +- [ ] blocking + +## Configuration + +- `root`: Set the work directory for backend +- `access_token` YandexDisk oauth access_token + +You can refer to [`YandexDiskBuilder`]'s docs for more information + +## Example + +### Via Builder + +```rust +use anyhow::Result; +use opendal::services::YandexDisk; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + // create backend builder + let mut builder = YandexDisk::default(); + + // set the storage bucket for OpenDAL + builder.root("/"); + // set the access_token for OpenDAL + builder.access_token("test"); + + let op: Operator = Operator::new(builder)?.finish(); + + Ok(()) +} +``` diff --git a/core/src/services/yandex_disk/error.rs b/core/src/services/yandex_disk/error.rs new file mode 100644 index 00000000000..c56c40e84ff --- /dev/null +++ b/core/src/services/yandex_disk/error.rs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Buf; +use http::Response; +use quick_xml::de; +use serde::Deserialize; + +use crate::raw::*; +use crate::Error; +use crate::ErrorKind; +use crate::Result; + +/// YandexDiskError is the error returned by YandexDisk service. +#[derive(Default, Debug, Deserialize)] +#[allow(unused)] +struct YandexDiskError { + message: String, + description: String, + error: String, +} + +/// Parse error response into Error. +pub async fn parse_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + let (kind, retryable) = match parts.status.as_u16() { + 400 => (ErrorKind::InvalidInput, false), + 410 | 403 => (ErrorKind::PermissionDenied, false), + 404 => (ErrorKind::NotFound, false), + 499 => (ErrorKind::Unexpected, true), + 503 | 507 => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + + let (message, _yandex_disk_err) = de::from_reader::<_, YandexDiskError>(bs.clone().reader()) + .map(|yandex_disk_err| (format!("{yandex_disk_err:?}"), Some(yandex_disk_err))) + .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None)); + + let mut err = Error::new(kind, &message); + + err = with_error_response_context(err, parts); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) +} + +#[cfg(test)] +mod test { + use futures::stream; + use http::StatusCode; + + use super::*; + + #[tokio::test] + async fn test_parse_error() { + let err_res = vec![ + ( + r#"{ + "message": "Не удалось найти запрошенный ресурс.", + "description": "Resource not found.", + "error": "DiskNotFoundError" + }"#, + ErrorKind::NotFound, + StatusCode::NOT_FOUND, + ), + ( + r#"{ + "message": "Не авторизован.", + "description": "Unauthorized", + "error": "UnauthorizedError" + }"#, + ErrorKind::PermissionDenied, + StatusCode::FORBIDDEN, + ), + ]; + + for res in err_res { + let bs = bytes::Bytes::from(res.0); + let body = IncomingAsyncBody::new( + Box::new(oio::into_stream(stream::iter(vec![Ok(bs.clone())]))), + None, + ); + let resp = Response::builder().status(res.2).body(body).unwrap(); + + let err = parse_error(resp).await; + + assert!(err.is_ok()); + assert_eq!(err.unwrap().kind(), res.1); + } + } +} diff --git a/core/src/services/yandex_disk/lister.rs b/core/src/services/yandex_disk/lister.rs new file mode 100644 index 00000000000..a116ce53a08 --- /dev/null +++ b/core/src/services/yandex_disk/lister.rs @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; + +use super::core::parse_info; +use super::core::MetainformationResponse; +use super::core::YandexDiskCore; +use super::error::parse_error; +use crate::raw::oio::Entry; +use crate::raw::*; +use crate::Result; + +pub struct YandexDiskLister { + core: Arc, + + path: String, + limit: Option, +} + +impl YandexDiskLister { + pub(super) fn new(core: Arc, path: &str, limit: Option) -> Self { + YandexDiskLister { + core, + path: path.to_string(), + limit, + } + } +} + +#[async_trait] +impl oio::PageList for YandexDiskLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { + let offset = if ctx.token.is_empty() { + None + } else { + Some(ctx.token.clone()) + }; + + let resp = self + .core + .metainformation(&self.path, self.limit, offset) + .await?; + + if resp.status() == http::StatusCode::NOT_FOUND { + ctx.done = true; + return Ok(()); + } + + match resp.status() { + http::StatusCode::OK => { + let body = resp.into_body().bytes().await?; + + let resp: MetainformationResponse = + serde_json::from_slice(&body).map_err(new_json_deserialize_error)?; + + if let Some(embedded) = resp.embedded { + let n = embedded.items.len(); + + for mf in embedded.items { + let path = mf.path.strip_prefix("disk:"); + + if let Some(path) = path { + let mut path = build_rel_path(&self.core.root, path); + + let md = parse_info(mf)?; + + if md.mode().is_dir() { + path = format!("{}/", path); + } + + ctx.entries.push_back(Entry::new(&path, md)); + }; + } + + let current_len = ctx.token.parse::().unwrap_or(0) + n; + + if current_len >= embedded.total { + ctx.done = true; + } + + ctx.token = current_len.to_string(); + + return Ok(()); + } + } + http::StatusCode::NOT_FOUND => { + ctx.done = true; + return Ok(()); + } + _ => { + return Err(parse_error(resp).await?); + } + } + + Ok(()) + } +} diff --git a/core/src/services/yandex_disk/mod.rs b/core/src/services/yandex_disk/mod.rs new file mode 100644 index 00000000000..e2f2aff4488 --- /dev/null +++ b/core/src/services/yandex_disk/mod.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +pub use backend::YandexDiskBuilder as YandexDisk; +pub use backend::YandexDiskConfig; + +mod core; +mod error; +mod lister; +mod writer; diff --git a/core/src/services/yandex_disk/writer.rs b/core/src/services/yandex_disk/writer.rs new file mode 100644 index 00000000000..b25d770098b --- /dev/null +++ b/core/src/services/yandex_disk/writer.rs @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use http::{Request, StatusCode}; + +use crate::raw::*; +use crate::*; + +use super::core::YandexDiskCore; +use super::error::parse_error; + +pub type YandexDiskWriters = oio::OneShotWriter; + +pub struct YandexDiskWriter { + core: Arc, + path: String, +} + +impl YandexDiskWriter { + pub fn new(core: Arc, path: String) -> Self { + YandexDiskWriter { core, path } + } +} + +#[async_trait] +impl oio::OneShotWrite for YandexDiskWriter { + async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> { + self.core.ensure_dir_exists(&self.path).await?; + + let upload_url = self.core.get_upload_url(&self.path).await?; + + let bs = bs.bytes(bs.remaining()); + + let req = Request::put(upload_url) + .body(AsyncBody::Bytes(bs)) + .map_err(new_request_build_error)?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::CREATED => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } +} diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 506d1cefe47..30dcd251eda 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -159,6 +159,8 @@ impl Operator { Scheme::Alluxio => Self::from_map::(map)?.finish(), #[cfg(feature = "services-upyun")] Scheme::Upyun => Self::from_map::(map)?.finish(), + #[cfg(feature = "services-yandex-disk")] + Scheme::YandexDisk => Self::from_map::(map)?.finish(), #[cfg(feature = "services-pcloud")] Scheme::Pcloud => Self::from_map::(map)?.finish(), #[cfg(feature = "services-chainsafe")] diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index f74e9926727..cdb4cf2f67b 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -44,6 +44,8 @@ pub enum Scheme { Seafile, /// [Upyun][crate::services::Upyun]: Upyun Services. Upyun, + /// [YandexDisk][crate::services::YandexDisk]: YandexDisk Services. + YandexDisk, /// [Pcloud][crate::services::Pcloud]: Pcloud Services. Pcloud, /// [Chainsafe][crate::services::Chainsafe]: Chainsafe Services. @@ -251,6 +253,8 @@ impl Scheme { Scheme::Seafile, #[cfg(feature = "services-upyun")] Scheme::Upyun, + #[cfg(feature = "services-yandex-disk")] + Scheme::YandexDisk, #[cfg(feature = "services-pcloud")] Scheme::Pcloud, #[cfg(feature = "services-sftp")] @@ -343,6 +347,7 @@ impl FromStr for Scheme { "s3" => Ok(Scheme::S3), "seafile" => Ok(Scheme::Seafile), "upyun" => Ok(Scheme::Upyun), + "yandex_disk" => Ok(Scheme::YandexDisk), "pcloud" => Ok(Scheme::Pcloud), "sftp" => Ok(Scheme::Sftp), "sled" => Ok(Scheme::Sled), @@ -417,6 +422,7 @@ impl From for &'static str { Scheme::Mongodb => "mongodb", Scheme::Alluxio => "alluxio", Scheme::Upyun => "upyun", + Scheme::YandexDisk => "yandex_disk", Scheme::Pcloud => "pcloud", Scheme::Custom(v) => v, }