From 91a07334353677ca3f6d81d9b2f93d63912f3f05 Mon Sep 17 00:00:00 2001 From: ClSlaid Date: Sat, 18 Feb 2023 10:12:21 +0800 Subject: [PATCH] refactor(webhdfs): handle 307 redirection instead of noredirect (#1358) * refactor: handle 307 redirection instead of noredirect OpenDAL will handle all redirections, and auto redirection of client is not taken into considerations. So for WebHDFS, handler redirections directly instead of `noredirect=true`. This also offers better compabilities. Signed-off-by: ClSlaid * refactor: move Location parser to http_util Signed-off-by: ClSlaid * refactor: use parse_error utils Signed-off-by: ClSlaid --------- Signed-off-by: ClSlaid Co-authored-by: Xuanwo --- src/raw/http_util/header.rs | 19 ++++++ src/raw/http_util/mod.rs | 1 + src/services/webhdfs/backend.rs | 110 ++++++++++++++------------------ src/services/webhdfs/message.rs | 16 ----- 4 files changed, 69 insertions(+), 77 deletions(-) diff --git a/src/raw/http_util/header.rs b/src/raw/http_util/header.rs index 7c306a1c6d7..7ef6356d47e 100644 --- a/src/raw/http_util/header.rs +++ b/src/raw/http_util/header.rs @@ -19,6 +19,7 @@ use http::header::CONTENT_RANGE; use http::header::CONTENT_TYPE; use http::header::ETAG; use http::header::LAST_MODIFIED; +use http::header::LOCATION; use http::HeaderMap; use time::format_description::well_known::Rfc2822; use time::OffsetDateTime; @@ -30,6 +31,24 @@ use crate::ObjectMetadata; use crate::ObjectMode; use crate::Result; +/// Parse redirect location from header map +/// +/// # Note +/// The returned value maybe a relative path, like `/index.html`, `/robots.txt`, etc. +pub fn parse_location(headers: &HeaderMap) -> Result> { + match headers.get(LOCATION) { + None => Ok(None), + Some(v) => Ok(Some(v.to_str().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "header value has to be valid utf-8 string", + ) + .with_operation("http_util::parse_location") + .set_source(e) + })?)), + } +} + /// Parse content length from header map. pub fn parse_content_length(headers: &HeaderMap) -> Result> { match headers.get(CONTENT_LENGTH) { diff --git a/src/raw/http_util/mod.rs b/src/raw/http_util/mod.rs index 114e8101113..b3ed4a63d12 100644 --- a/src/raw/http_util/mod.rs +++ b/src/raw/http_util/mod.rs @@ -36,6 +36,7 @@ pub use header::parse_content_type; pub use header::parse_etag; pub use header::parse_into_object_metadata; pub use header::parse_last_modified; +pub use header::parse_location; mod uri; pub use uri::percent_encode_path; diff --git a/src/services/webhdfs/backend.rs b/src/services/webhdfs/backend.rs index 4db071ca621..0c830ee5981 100644 --- a/src/services/webhdfs/backend.rs +++ b/src/services/webhdfs/backend.rs @@ -32,7 +32,6 @@ use super::message::BooleanResp; use super::message::FileStatusType; use super::message::FileStatusWrapper; use super::message::FileStatusesWrapper; -use super::message::Redirection; use crate::ops::*; use crate::raw::*; use crate::*; @@ -271,7 +270,7 @@ impl WebhdfsBackend { "CREATE" }; let mut url = format!( - "{}/webhdfs/v1/{}?op={}&overwrite=true&noredirect=true", + "{}/webhdfs/v1/{}?op={}&overwrite=true", self.endpoint, percent_encode_path(&p), op, @@ -283,6 +282,7 @@ impl WebhdfsBackend { let req = Request::put(&url) .body(AsyncBody::Empty) .map_err(new_request_build_error)?; + // mkdir does not redirect if path.ends_with('/') { return Ok(req); @@ -290,14 +290,27 @@ impl WebhdfsBackend { let resp = self.client.send_async(req).await?; - self.webhdfs_put_redirect(resp, size, content_type, body) - .await + // should be a 307 TEMPORARY_REDIRECT + if resp.status() != StatusCode::TEMPORARY_REDIRECT { + return Err(parse_error(resp).await?); + } + let re_url = self.follow_redirect(resp)?; + + let mut re_builder = Request::put(re_url); + if let Some(size) = size { + re_builder = re_builder.header(CONTENT_LENGTH, size.to_string()); + } + if let Some(content_type) = content_type { + re_builder = re_builder.header(CONTENT_TYPE, content_type); + } + + re_builder.body(body).map_err(new_request_build_error) } async fn webhdfs_open_req(&self, path: &str, range: &BytesRange) -> Result> { let p = build_abs_path(&self.root, path); let mut url = format!( - "{}/webhdfs/v1/{}?op=OPEN&noredirect=true", + "{}/webhdfs/v1/{}?op=OPEN", self.endpoint, percent_encode_path(&p), ); @@ -378,15 +391,16 @@ impl WebhdfsBackend { let req = self.webhdfs_open_req(path, &range).await?; let resp = self.client.send_async(req).await?; - // this should be an 200 OK http response - // with JSON redirect message in its body - if resp.status() != StatusCode::OK { - // let the outside handle this error - return Ok(resp); + // this should be a 307 redirect + if resp.status() != StatusCode::TEMPORARY_REDIRECT { + return Err(parse_error(resp).await?); } - let redirected = self.webhdfs_get_redirect(resp).await?; - self.client.send_async(redirected).await + let re_url = self.follow_redirect(resp)?; + let re_req = Request::get(&re_url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + self.client.send_async(re_req).await } async fn webhdfs_status_object(&self, path: &str) -> Result> { @@ -427,56 +441,32 @@ impl WebhdfsBackend { self.client.send_async(req).await } - - /// get redirect destination from 307 TEMPORARY_REDIRECT http response - async fn follow_redirect(&self, resp: Response) -> Result { - let bs = resp.into_body().bytes().await.map_err(|e| { - Error::new(ErrorKind::Unexpected, "redirection receive fail") - .with_context("service", Scheme::Webhdfs) - .set_source(e) - })?; - let loc = serde_json::from_reader::<_, Redirection>(bs.reader()) - .map_err(|e| { - Error::new(ErrorKind::Unexpected, "redirection fail") - .with_context("service", Scheme::Webhdfs) - .set_permanent() - .set_source(e) - })? - .location; - - Ok(loc) - } } impl WebhdfsBackend { - async fn webhdfs_get_redirect( - &self, - redirection: Response, - ) -> Result> { - let redirect = self.follow_redirect(redirection).await?; - - Request::get(redirect) - .body(AsyncBody::Empty) - .map_err(new_request_build_error) - } - - async fn webhdfs_put_redirect( - &self, - resp: Response, - size: Option, - content_type: Option<&str>, - body: AsyncBody, - ) -> Result> { - let redirect = self.follow_redirect(resp).await?; - - let mut req = Request::put(redirect); - if let Some(size) = size { - req = req.header(CONTENT_LENGTH, size.to_string()); - } - if let Some(content_type) = content_type { - req = req.header(CONTENT_TYPE, content_type); - } - req.body(body).map_err(new_request_build_error) + /// get redirect destination from 307 TEMPORARY_REDIRECT http response + fn follow_redirect(&self, resp: Response) -> Result { + let loc = match parse_location(resp.headers())? { + Some(p) => { + if !p.starts_with('/') { + // is not relative path + p.to_string() + } else { + // is relative path + // prefix with endpoint url + let url = self.endpoint.clone(); + format!("{url}/{p}") + } + } + None => { + let err = Error::new( + ErrorKind::Unexpected, + "redirection fail: no location header", + ); + return Err(err); + } + }; + Ok(loc) } fn consume_success_mkdir(&self, path: &str, parts: Parts, body: &str) -> Result { @@ -497,9 +487,7 @@ impl WebhdfsBackend { )); } } -} -impl WebhdfsBackend { async fn check_root(&self) -> Result<()> { let resp = self.webhdfs_status_object("/").await?; match resp.status() { diff --git a/src/services/webhdfs/message.rs b/src/services/webhdfs/message.rs index 1f4f43acac6..2ea848a4429 100644 --- a/src/services/webhdfs/message.rs +++ b/src/services/webhdfs/message.rs @@ -18,12 +18,6 @@ use serde::Deserialize; use crate::*; -#[derive(Debug, Deserialize)] -#[serde(rename_all = "PascalCase")] -pub(super) struct Redirection { - pub location: String, -} - #[derive(Debug, Deserialize)] pub(super) struct BooleanResp { pub boolean: bool, @@ -89,16 +83,6 @@ mod test { use crate::services::webhdfs::dir_stream::DirStream; use crate::ObjectMode; - #[test] - fn test_file_statuses() { - let json = r#"{"Location":"http://:/webhdfs/v1/?op=CREATE..."}"#; - let redir: Redirection = serde_json::from_str(json).expect("must success"); - assert_eq!( - redir.location, - "http://:/webhdfs/v1/?op=CREATE..." - ); - } - #[test] fn test_file_status() { let json = r#"