Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(services/dropbox): impl create_dir and polish error handling #2600

Merged
merged 14 commits into from
Jul 6, 2023
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,7 @@ OPENDAL_REDB_TABLE=redb-table
# cacache
OPENDAL_CACACHE_TEST=false
OPENDAL_CACACHE_DATADIR=/tmp/opendal/cacache/
#dropbox
OPENDAL_DROPBOX_TEST=false
OPENDAL_DROPBOX_ROOT=/tmp/opendal/
OPENDAL_DROPBOX_ACCESS_TOKEN=<access_token>
52 changes: 45 additions & 7 deletions core/src/services/dropbox/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,36 @@ impl Accessor for DropboxBackend {
ma.set_scheme(Scheme::Dropbox)
.set_root(&self.core.root)
.set_capability(Capability {
stat: true,

read: true,

write: true,

create_dir: true,

delete: true,

..Default::default()
});
ma
}

async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
let resp = self.core.dropbox_create_folder(path).await?;
let status = resp.status();
match status {
StatusCode::OK => Ok(RpCreateDir::default()),
_ => {
let err = parse_error(resp).await?;
match err.kind() {
ErrorKind::AlreadyExists => Ok(RpCreateDir::default()),
_ => Err(err),
}
}
}
}

async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> {
let resp = self.core.dropbox_get(path).await?;
let status = resp.status();
Expand Down Expand Up @@ -88,7 +110,13 @@ impl Accessor for DropboxBackend {

match status {
StatusCode::OK => Ok(RpDelete::default()),
_ => Err(parse_error(resp).await?),
_ => {
let err = parse_error(resp).await?;
match err.kind() {
ErrorKind::NotFound => Ok(RpDelete::default()),
_ => Err(err),
}
}
}
}

Expand All @@ -109,12 +137,22 @@ impl Accessor for DropboxBackend {
_ => EntryMode::Unknown,
};
let mut metadata = Metadata::new(entry_mode);
let last_modified = decoded_response.client_modified;
let date_utc_last_modified = parse_datetime_from_rfc3339(&last_modified)?;
metadata.set_last_modified(date_utc_last_modified);
if decoded_response.size.is_some() {
let size = decoded_response.size.unwrap();
metadata.set_content_length(size);
// Only set last_modified and size if entry_mode is FILE, because Dropbox API
// returns last_modified and size only for files.
// FYI: https://www.dropbox.com/developers/documentation/http/documentation#files-get_metadata
if entry_mode == EntryMode::FILE {
let date_utc_last_modified =
parse_datetime_from_rfc3339(&decoded_response.client_modified)?;
metadata.set_last_modified(date_utc_last_modified);

if let Some(size) = decoded_response.size {
metadata.set_content_length(size);
} else {
return Err(Error::new(
ErrorKind::Unexpected,
&format!("no size found for file {}", path),
));
}
}
Ok(RpStat::new(metadata))
}
Expand Down
1 change: 1 addition & 0 deletions core/src/services/dropbox/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl Builder for DropboxBuilder {

fn from_map(map: HashMap<String, String>) -> Self {
let mut builder = Self::default();
map.get("root").map(|v| builder.root(v));
map.get("access_token").map(|v| builder.access_token(v));
builder
}
Expand Down
44 changes: 39 additions & 5 deletions core/src/services/dropbox/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ impl Debug for DropboxCore {
}

impl DropboxCore {
fn build_path(&self, path: &str) -> String {
let path = build_rooted_abs_path(&self.root, path);
// For dropbox, even the path is a directory,
// we still need to remove the trailing slash.
path.trim_end_matches('/').to_string()
}

pub async fn dropbox_get(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
let url: String = "https://content.dropboxapi.com/2/files/download".to_string();
let download_args = DropboxDownloadArgs {
Expand All @@ -59,6 +66,7 @@ impl DropboxCore {
let request = self
.build_auth_header(Request::post(&url))
.header("Dropbox-API-Arg", request_payload)
.header(header::CONTENT_LENGTH, 0)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
self.client.send(request).await
Expand All @@ -80,9 +88,11 @@ impl DropboxCore {
if let Some(size) = size {
request_builder = request_builder.header(header::CONTENT_LENGTH, size);
}
if let Some(mime) = content_type {
request_builder = request_builder.header(header::CONTENT_TYPE, mime);
}
request_builder = request_builder.header(
header::CONTENT_TYPE,
content_type.unwrap_or("application/octet-stream"),
);

let request = self
.build_auth_header(request_builder)
.header(
Expand All @@ -98,14 +108,32 @@ impl DropboxCore {
pub async fn dropbox_delete(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
let url = "https://api.dropboxapi.com/2/files/delete_v2".to_string();
let args = DropboxDeleteArgs {
path: build_rooted_abs_path(&self.root, path),
path: self.build_path(path),
};

let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);

let request = self
.build_auth_header(Request::post(&url))
suyanhanx marked this conversation as resolved.
Show resolved Hide resolved
.header(header::CONTENT_TYPE, "application/json")
.header(header::CONTENT_LENGTH, bs.len())
.body(AsyncBody::Bytes(bs))
.map_err(new_request_build_error)?;
self.client.send(request).await
}

pub async fn dropbox_create_folder(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
let url = "https://api.dropboxapi.com/2/files/create_folder_v2".to_string();
let args = DropboxCreateFolderArgs {
path: self.build_path(path),
};

let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);

let request = self
.build_auth_header(Request::post(&url))
.header(header::CONTENT_TYPE, "application/json")
.header(header::CONTENT_LENGTH, bs.len())
.body(AsyncBody::Bytes(bs))
.map_err(new_request_build_error)?;
self.client.send(request).await
Expand All @@ -114,7 +142,7 @@ impl DropboxCore {
pub async fn dropbox_get_metadata(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
let url = "https://api.dropboxapi.com/2/files/get_metadata".to_string();
let args = DropboxMetadataArgs {
path: build_rooted_abs_path(&self.root, path),
path: self.build_path(path),
..Default::default()
};

Expand All @@ -123,6 +151,7 @@ impl DropboxCore {
let request = self
.build_auth_header(Request::post(&url))
.header(header::CONTENT_TYPE, "application/json")
.header(header::CONTENT_LENGTH, bs.len())
.body(AsyncBody::Bytes(bs))
.map_err(new_request_build_error)?;
self.client.send(request).await
Expand Down Expand Up @@ -154,6 +183,11 @@ struct DropboxDeleteArgs {
path: String,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
struct DropboxCreateFolderArgs {
path: String,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
struct DropboxMetadataArgs {
include_deleted: bool,
Expand Down
30 changes: 28 additions & 2 deletions core/src/services/dropbox/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,38 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
| StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
_ => (ErrorKind::Unexpected, false),
};

let dropbox_error =
serde_json::from_slice::<DropboxErrorResponse>(&bs).map_err(new_json_deserialize_error);
match dropbox_error {
Ok(dropbox_error) => {
let mut err = Error::new(kind, dropbox_error.error_summary.as_ref())
.with_context("response", format!("{parts:?}"));
// We cannot get the error type from the response header when the status code is 409.
// Because Dropbox API v2 will put error summary in the response body,
// we need to parse it to get the correct error type and then error kind.
// See https://www.dropbox.com/developers/documentation/http/documentation#error-handling
let error_summary = dropbox_error.error_summary.as_str();

let mut err = Error::new(
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
match parts.status {
// 409 Conflict means that Endpoint-specific error.
// Look to the JSON response body for the specifics of the error.
StatusCode::CONFLICT => {
if error_summary.contains("path/not_found")
|| error_summary.contains("path_lookup/not_found")
{
ErrorKind::NotFound
} else if error_summary.contains("path/conflict") {
ErrorKind::AlreadyExists
} else {
ErrorKind::Unexpected
}
}
// Otherwise, we can get the error type from the response status code.
_ => kind,
},
error_summary,
)
.with_context("response", format!("{parts:?}"));

if retryable {
err = err.set_temporary();
Expand Down