Skip to content

Commit

Permalink
Merge branch 'main' into webdav-list-op
Browse files Browse the repository at this point in the history
  • Loading branch information
imWildCat authored Feb 18, 2023
2 parents b04b300 + 91a0733 commit 1ff5211
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 77 deletions.
19 changes: 19 additions & 0 deletions src/raw/http_util/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use http::header::CONTENT_RANGE;
use http::header::CONTENT_TYPE;
use http::header::ETAG;
use http::header::LAST_MODIFIED;
use http::header::LOCATION;
use http::HeaderMap;
use time::format_description::well_known::Rfc2822;
use time::OffsetDateTime;
Expand All @@ -30,6 +31,24 @@ use crate::ObjectMetadata;
use crate::ObjectMode;
use crate::Result;

/// Parse redirect location from header map
///
/// # Note
/// The returned value maybe a relative path, like `/index.html`, `/robots.txt`, etc.
pub fn parse_location(headers: &HeaderMap) -> Result<Option<&str>> {
match headers.get(LOCATION) {
None => Ok(None),
Some(v) => Ok(Some(v.to_str().map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"header value has to be valid utf-8 string",
)
.with_operation("http_util::parse_location")
.set_source(e)
})?)),
}
}

/// Parse content length from header map.
pub fn parse_content_length(headers: &HeaderMap) -> Result<Option<u64>> {
match headers.get(CONTENT_LENGTH) {
Expand Down
1 change: 1 addition & 0 deletions src/raw/http_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub use header::parse_content_type;
pub use header::parse_etag;
pub use header::parse_into_object_metadata;
pub use header::parse_last_modified;
pub use header::parse_location;

mod uri;
pub use uri::percent_encode_path;
Expand Down
110 changes: 49 additions & 61 deletions src/services/webhdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use super::message::BooleanResp;
use super::message::FileStatusType;
use super::message::FileStatusWrapper;
use super::message::FileStatusesWrapper;
use super::message::Redirection;
use crate::ops::*;
use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -271,7 +270,7 @@ impl WebhdfsBackend {
"CREATE"
};
let mut url = format!(
"{}/webhdfs/v1/{}?op={}&overwrite=true&noredirect=true",
"{}/webhdfs/v1/{}?op={}&overwrite=true",
self.endpoint,
percent_encode_path(&p),
op,
Expand All @@ -283,21 +282,35 @@ impl WebhdfsBackend {
let req = Request::put(&url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;

// mkdir does not redirect
if path.ends_with('/') {
return Ok(req);
}

let resp = self.client.send_async(req).await?;

self.webhdfs_put_redirect(resp, size, content_type, body)
.await
// should be a 307 TEMPORARY_REDIRECT
if resp.status() != StatusCode::TEMPORARY_REDIRECT {
return Err(parse_error(resp).await?);
}
let re_url = self.follow_redirect(resp)?;

let mut re_builder = Request::put(re_url);
if let Some(size) = size {
re_builder = re_builder.header(CONTENT_LENGTH, size.to_string());
}
if let Some(content_type) = content_type {
re_builder = re_builder.header(CONTENT_TYPE, content_type);
}

re_builder.body(body).map_err(new_request_build_error)
}

async fn webhdfs_open_req(&self, path: &str, range: &BytesRange) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let mut url = format!(
"{}/webhdfs/v1/{}?op=OPEN&noredirect=true",
"{}/webhdfs/v1/{}?op=OPEN",
self.endpoint,
percent_encode_path(&p),
);
Expand Down Expand Up @@ -378,15 +391,16 @@ impl WebhdfsBackend {
let req = self.webhdfs_open_req(path, &range).await?;
let resp = self.client.send_async(req).await?;

// this should be an 200 OK http response
// with JSON redirect message in its body
if resp.status() != StatusCode::OK {
// let the outside handle this error
return Ok(resp);
// this should be a 307 redirect
if resp.status() != StatusCode::TEMPORARY_REDIRECT {
return Err(parse_error(resp).await?);
}

let redirected = self.webhdfs_get_redirect(resp).await?;
self.client.send_async(redirected).await
let re_url = self.follow_redirect(resp)?;
let re_req = Request::get(&re_url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
self.client.send_async(re_req).await
}

async fn webhdfs_status_object(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
Expand Down Expand Up @@ -427,56 +441,32 @@ impl WebhdfsBackend {

self.client.send_async(req).await
}

/// get redirect destination from 307 TEMPORARY_REDIRECT http response
async fn follow_redirect(&self, resp: Response<IncomingAsyncBody>) -> Result<String> {
let bs = resp.into_body().bytes().await.map_err(|e| {
Error::new(ErrorKind::Unexpected, "redirection receive fail")
.with_context("service", Scheme::Webhdfs)
.set_source(e)
})?;
let loc = serde_json::from_reader::<_, Redirection>(bs.reader())
.map_err(|e| {
Error::new(ErrorKind::Unexpected, "redirection fail")
.with_context("service", Scheme::Webhdfs)
.set_permanent()
.set_source(e)
})?
.location;

Ok(loc)
}
}

impl WebhdfsBackend {
async fn webhdfs_get_redirect(
&self,
redirection: Response<IncomingAsyncBody>,
) -> Result<Request<AsyncBody>> {
let redirect = self.follow_redirect(redirection).await?;

Request::get(redirect)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)
}

async fn webhdfs_put_redirect(
&self,
resp: Response<IncomingAsyncBody>,
size: Option<u64>,
content_type: Option<&str>,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let redirect = self.follow_redirect(resp).await?;

let mut req = Request::put(redirect);
if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size.to_string());
}
if let Some(content_type) = content_type {
req = req.header(CONTENT_TYPE, content_type);
}
req.body(body).map_err(new_request_build_error)
/// get redirect destination from 307 TEMPORARY_REDIRECT http response
fn follow_redirect(&self, resp: Response<IncomingAsyncBody>) -> Result<String> {
let loc = match parse_location(resp.headers())? {
Some(p) => {
if !p.starts_with('/') {
// is not relative path
p.to_string()
} else {
// is relative path
// prefix with endpoint url
let url = self.endpoint.clone();
format!("{url}/{p}")
}
}
None => {
let err = Error::new(
ErrorKind::Unexpected,
"redirection fail: no location header",
);
return Err(err);
}
};
Ok(loc)
}

fn consume_success_mkdir(&self, path: &str, parts: Parts, body: &str) -> Result<RpCreate> {
Expand All @@ -497,9 +487,7 @@ impl WebhdfsBackend {
));
}
}
}

impl WebhdfsBackend {
async fn check_root(&self) -> Result<()> {
let resp = self.webhdfs_status_object("/").await?;
match resp.status() {
Expand Down
16 changes: 0 additions & 16 deletions src/services/webhdfs/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@ use serde::Deserialize;

use crate::*;

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(super) struct Redirection {
pub location: String,
}

#[derive(Debug, Deserialize)]
pub(super) struct BooleanResp {
pub boolean: bool,
Expand Down Expand Up @@ -89,16 +83,6 @@ mod test {
use crate::services::webhdfs::dir_stream::DirStream;
use crate::ObjectMode;

#[test]
fn test_file_statuses() {
let json = r#"{"Location":"http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE..."}"#;
let redir: Redirection = serde_json::from_str(json).expect("must success");
assert_eq!(
redir.location,
"http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE..."
);
}

#[test]
fn test_file_status() {
let json = r#"
Expand Down

0 comments on commit 1ff5211

Please sign in to comment.