Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(services/webhdfs): Implement multi write via CONCAT #3939

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ OPENDAL_WEBDAV_ENDPOINT=http://127.0.0.1:8080
OPENDAL_WEBHDFS_ROOT=/tmp/opendal/
OPENDAL_WEBHDFS_ENDPOINT=http://127.0.0.1:9870
OPENDAL_WEBHDFS_DELEGATION=<delegation>
OPENDAL_WEBHDFS_ATOMIC_WRITE_DIR=.opendal_tmp/
OPENDAL_WEBHDFS_DISABLE_LIST_BATCH=false
# supbase
OPENDAL_SUPABASE_BUCKET=<bucket>
Expand Down
1 change: 1 addition & 0 deletions .github/services/webhdfs/webhdfs/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ runs:
cat << EOF >> $GITHUB_ENV
OPENDAL_WEBHDFS_ROOT=/
OPENDAL_WEBHDFS_ENDPOINT=http://127.0.0.1:9870
OPENDAL_WEBHDFS_ATOMIC_WRITE_DIR=.opendal_tmp/
EOF
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ runs:
cat << EOF >> $GITHUB_ENV
OPENDAL_WEBHDFS_ROOT=/
OPENDAL_WEBHDFS_ENDPOINT=http://127.0.0.1:9870
OPENDAL_WEBHDFS_ATOMIC_WRITE_DIR=.opendal_tmp/
OPENDAL_WEBHDFS_DISABLE_LIST_BATCH=true
EOF
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ edition = "2021"
homepage = "https://opendal.apache.org/"
license = "Apache-2.0"
repository = "https://github.com/apache/incubator-opendal"
rust-version = "1.75"
rust-version = "1.67"
version = "0.44.1"

[workspace.dependencies]
Expand Down
11 changes: 6 additions & 5 deletions core/src/raw/oio/write/block_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ where
})));
let size = self.fill_cache(bs);
return Poll::Ready(Ok(size));
} else {
ready!(self.futures.poll_next_unpin(cx));
} else if let Some(res) = ready!(self.futures.poll_next_unpin(cx)) {
res?;
}
}
State::Close(_) => {
Expand Down Expand Up @@ -209,8 +209,7 @@ where
}));
}
}
}
if self.futures.is_empty() && self.cache.is_none() {
} else if self.futures.is_empty() && self.cache.is_none() {
self.state =
State::Close(Box::pin(
async move { w.complete_block(block_ids).await },
Expand All @@ -232,7 +231,9 @@ where
})));
}
}
while ready!(self.futures.poll_next_unpin(cx)).is_some() {}
while let Some(res) = ready!(self.futures.poll_next_unpin(cx)) {
res?;
}
}
}
State::Close(fut) => {
Expand Down
161 changes: 131 additions & 30 deletions core/src/services/webhdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,16 @@ pub struct WebhdfsBuilder {
endpoint: Option<String>,
delegation: Option<String>,
disable_list_batch: bool,
/// atomic_write_dir of this backend
pub atomic_write_dir: Option<String>,
}

impl Debug for WebhdfsBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Builder")
.field("root", &self.root)
.field("endpoint", &self.endpoint)
.field("atomic_write_dir", &self.atomic_write_dir)
.finish_non_exhaustive()
}
}
Expand Down Expand Up @@ -117,6 +120,20 @@ impl WebhdfsBuilder {
self.disable_list_batch = true;
self
}

/// Set temp dir for atomic write.
///
/// # Notes
///
/// If not set, write multi not support, eg: `.opendal_tmp/`.
pub fn atomic_write_dir(&mut self, dir: &str) -> &mut Self {
self.atomic_write_dir = if dir.is_empty() {
None
} else {
Some(String::from(dir))
};
self
}
}

impl Builder for WebhdfsBuilder {
Expand All @@ -132,6 +149,8 @@ impl Builder for WebhdfsBuilder {
map.get("disable_list_batch")
.filter(|v| v == &"true")
.map(|_| builder.disable_list_batch());
map.get("atomic_write_dir")
.map(|v| builder.atomic_write_dir(v));

builder
}
Expand Down Expand Up @@ -162,6 +181,8 @@ impl Builder for WebhdfsBuilder {
};
debug!("backend use endpoint {}", endpoint);

let atomic_write_dir = self.atomic_write_dir.take();

let auth = self
.delegation
.take()
Expand All @@ -175,6 +196,7 @@ impl Builder for WebhdfsBuilder {
auth,
client,
root_checker: OnceCell::new(),
atomic_write_dir,
disable_list_batch: self.disable_list_batch,
};

Expand All @@ -190,52 +212,78 @@ pub struct WebhdfsBackend {
auth: Option<String>,
root_checker: OnceCell<()>,

pub atomic_write_dir: Option<String>,
pub disable_list_batch: bool,
pub client: HttpClient,
}

impl WebhdfsBackend {
/// create object or make a directory
///
/// TODO: we should split it into mkdir and create
pub fn webhdfs_create_object_request(
pub fn webhdfs_create_dir_request(&self, path: &str) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);

let mut url = format!(
"{}/webhdfs/v1/{}?op=MKDIRS&overwrite=true&noredirect=true",
self.endpoint,
percent_encode_path(&p),
);
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}

let req = Request::put(&url);

req.body(AsyncBody::Empty).map_err(new_request_build_error)
}
/// create object
pub async fn webhdfs_create_object_request(
&self,
path: &str,
size: Option<usize>,
size: Option<u64>,
args: &OpWrite,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let op = if path.ends_with('/') {
"MKDIRS"
} else {
"CREATE"
};

let mut url = format!(
"{}/webhdfs/v1/{}?op={}&overwrite=true",
"{}/webhdfs/v1/{}?op=CREATE&overwrite=true&noredirect=true",
self.endpoint,
percent_encode_path(&p),
op,
);
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}

let mut req = Request::put(&url);
let req = Request::put(&url);

let req = req
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;

// mkdir does not redirect
if path.ends_with('/') {
return req.body(AsyncBody::Empty).map_err(new_request_build_error);
let resp = self.client.send(req).await?;

let status = resp.status();

if status != StatusCode::CREATED && status != StatusCode::OK {
return Err(parse_error(resp).await?);
}

let bs = resp.into_body().bytes().await?;

let resp =
serde_json::from_slice::<LocationResponse>(&bs).map_err(new_json_deserialize_error)?;

let mut req = Request::put(&resp.location);

if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size.to_string());
}
req = req.header(CONTENT_LENGTH, size);
};

if let Some(content_type) = args.content_type() {
req = req.header(CONTENT_TYPE, content_type);
}
};
let req = req.body(body).map_err(new_request_build_error)?;

req.body(body).map_err(new_request_build_error)
Ok(req)
}

pub async fn webhdfs_init_append_request(&self, path: &str) -> Result<String> {
Expand All @@ -260,7 +308,7 @@ impl WebhdfsBackend {
match status {
StatusCode::OK => {
let bs = resp.into_body().bytes().await?;
let resp: InitAppendResponse =
let resp: LocationResponse =
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;

Ok(resp.location)
Expand All @@ -269,6 +317,32 @@ impl WebhdfsBackend {
}
}

pub async fn webhdfs_rename_object(
&self,
from: &str,
to: &str,
) -> Result<Response<IncomingAsyncBody>> {
let from = build_abs_path(&self.root, from);
let to = build_rooted_abs_path(&self.root, to);

let mut url = format!(
"{}/webhdfs/v1/{}?op=RENAME&destination={}",
self.endpoint,
percent_encode_path(&from),
percent_encode_path(&to)
);

if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}

let req = Request::put(&url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;

self.client.send(req).await
}

pub async fn webhdfs_append_request(
&self,
location: &str,
Expand All @@ -288,6 +362,36 @@ impl WebhdfsBackend {
req.body(body).map_err(new_request_build_error)
}

/// CONCAT will concat sources to the path
pub fn webhdfs_concat_request(
&self,
path: &str,
sources: Vec<String>,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);

let sources = sources
.iter()
.map(|p| build_rooted_abs_path(&self.root, p))
.collect::<Vec<String>>()
.join(",");

let mut url = format!(
"{}/webhdfs/v1/{}?op=CONCAT&sources={}",
self.endpoint,
percent_encode_path(&p),
percent_encode_path(&sources),
);

if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}

let req = Request::post(url);

req.body(AsyncBody::Empty).map_err(new_request_build_error)
}

async fn webhdfs_open_request(
&self,
path: &str,
Expand Down Expand Up @@ -403,7 +507,7 @@ impl WebhdfsBackend {
self.client.send(req).await
}

async fn webhdfs_delete(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
pub async fn webhdfs_delete(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
let mut url = format!(
"{}/webhdfs/v1/{}?op=DELETE&recursive=false",
Expand Down Expand Up @@ -469,6 +573,8 @@ impl Accessor for WebhdfsBackend {

write: true,
write_can_append: true,
write_can_multi: self.atomic_write_dir.is_some(),

create_dir: true,
delete: true,

Expand All @@ -481,12 +587,7 @@ impl Accessor for WebhdfsBackend {

/// Create a file or directory
async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
let req = self.webhdfs_create_object_request(
path,
Some(0),
&OpWrite::default(),
AsyncBody::Empty,
)?;
let req = self.webhdfs_create_dir_request(path)?;

let resp = self.client.send(req).await?;

Expand Down Expand Up @@ -585,7 +686,7 @@ impl Accessor for WebhdfsBackend {
let w = if args.append() {
WebhdfsWriters::Two(oio::AppendObjectWriter::new(w))
} else {
WebhdfsWriters::One(oio::OneShotWriter::new(w))
WebhdfsWriters::One(oio::BlockWriter::new(w, args.concurrent()))
};

Ok((RpWrite::default(), w))
Expand Down Expand Up @@ -619,6 +720,6 @@ impl Accessor for WebhdfsBackend {

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(super) struct InitAppendResponse {
pub(super) struct LocationResponse {
pub location: String,
}
3 changes: 3 additions & 0 deletions core/src/services/webhdfs/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ This service can be used to:
- `root`: The root path of the WebHDFS service.
- `endpoint`: The endpoint of the WebHDFS service.
- `delegation`: The delegation token for WebHDFS.
- `atomic_write_dir`: The tmp write dir of multi write for WebHDFS.

Refer to [`Builder`]'s public API docs for more information.

Expand Down Expand Up @@ -58,6 +59,8 @@ async fn main() -> Result<()> {
builder.endpoint("http://127.0.0.1:9870");
// set the delegation_token for builder
builder.delegation("delegation_token");
// set atomic_write_dir for builder
builder.atomic_write_dir(".opendal_tmp/");

let op: Operator = Operator::new(builder)?.finish();

Expand Down
Loading
Loading