Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(services/hdfs): Atomic write for hdfs #3875

Merged
merged 27 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
848b22e
Add HdfsConfig to implement ConfigDeserializer
shbhmrzd Dec 22, 2023
6e51ac2
Merge remote-tracking branch 'upstream/main' into add_hdfs_config
shbhmrzd Dec 22, 2023
d06bf66
fix conflicting impl debug
shbhmrzd Dec 22, 2023
50665a5
add doc for struct fields
shbhmrzd Dec 22, 2023
0fb22fd
atomic write support for hdfs
shbhmrzd Dec 31, 2023
96fba38
Merge remote-tracking branch 'upstream/main' into hdfs_atomic_write
shbhmrzd Dec 31, 2023
b5c96e3
use arc of hdrs client
shbhmrzd Jan 1, 2024
dfa0e71
fix issues with test
shbhmrzd Jan 1, 2024
8b44433
cargo fmt
shbhmrzd Jan 1, 2024
e93b17d
Merge remote-tracking branch 'upstream/main' into hdfs_atomic_write
shbhmrzd Jan 1, 2024
f3d7bf6
revert .env.example
shbhmrzd Jan 1, 2024
2179349
impl sync for HdfsWriter
shbhmrzd Jan 1, 2024
a592a0d
Merge remote-tracking branch 'upstream/main' into hdfs_atomic_write
shbhmrzd Jan 1, 2024
af8f51d
take fut as mut
shbhmrzd Jan 1, 2024
d7fd22a
add atomic write test workflow
shbhmrzd Jan 1, 2024
c0e465b
use Option<F> in HdfsWriter
shbhmrzd Jan 2, 2024
55587e3
Merge remote-tracking branch 'upstream/main' into hdfs_atomic_write
shbhmrzd Jan 2, 2024
e313c5c
revert .env change
shbhmrzd Jan 2, 2024
1895b9f
no support for atomic write if append true
shbhmrzd Jan 2, 2024
474fe57
revert
shbhmrzd Jan 2, 2024
c5dd416
testing with adding atomic write dir env param
shbhmrzd Jan 2, 2024
465f9b1
revert
shbhmrzd Jan 2, 2024
dc3f8e8
add debug statements
shbhmrzd Jan 2, 2024
dafef0c
remove debugs
shbhmrzd Jan 2, 2024
0d2a780
use f.close
shbhmrzd Jan 2, 2024
d92712d
review comments
shbhmrzd Jan 2, 2024
e229a9e
review comments
shbhmrzd Jan 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,4 @@ OPENDAL_SEAFILE_REPO_NAME=<repo_name>
OPENDAL_UPYUN_ROOT=/path/to/dir
OPENDAL_UPYUN_BUCKET=<bucket>
OPENDAL_UPYUN_OPERATOR=<operator>
OPENDAL_UPYUN_PASSWORD=<password>
# chainsafe
shbhmrzd marked this conversation as resolved.
Show resolved Hide resolved
OPENDAL_CHAINSAFE_ROOT=/path/to/dir
OPENDAL_CHAINSAFE_BUCKET_ID=<bucket_id>
OPENDAL_CHAINSAFE_API_KEY=<api_key>
OPENDAL_UPYUN_PASSWORD=<password>
99 changes: 87 additions & 12 deletions core/src/services/hdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use async_trait::async_trait;
use futures::AsyncWriteExt;
use log::debug;
use serde::Deserialize;
use uuid::Uuid;

use super::lister::HdfsLister;
use super::writer::HdfsWriter;
Expand All @@ -48,6 +49,8 @@ pub struct HdfsConfig {
pub user: Option<String>,
/// enable the append capacity
pub enable_append: bool,
/// atomic_write_dir of this backend
pub atomic_write_dir: Option<String>,
}

impl Debug for HdfsConfig {
Expand Down Expand Up @@ -133,6 +136,21 @@ impl HdfsBuilder {
self.config.enable_append = enable_append;
self
}

/// Set temp dir for atomic write.
///
/// # Notes
///
/// - When append is enabled, we will not use atomic write
/// to avoid data loss and performance issue.
pub fn atomic_write_dir(&mut self, dir: &str) -> &mut Self {
self.config.atomic_write_dir = if dir.is_empty() {
None
} else {
Some(String::from(dir))
};
self
}
}

impl Builder for HdfsBuilder {
Expand Down Expand Up @@ -181,19 +199,40 @@ impl Builder for HdfsBuilder {
}
}

let atomic_write_dir = self.config.atomic_write_dir.take();

// If atomic write dir is not exist, we must create it.
if let Some(d) = &atomic_write_dir {
if let Err(e) = client.metadata(&d) {
if e.kind() == io::ErrorKind::NotFound {
client.create_dir(&d).map_err(new_std_io_error)?
}
}
}

debug!("backend build finished: {:?}", &self);
Ok(HdfsBackend {
root,
atomic_write_dir,
client: Arc::new(client),
enable_append: self.config.enable_append,
})
}
}

#[inline]
fn tmp_file_of(path: &str) -> String {
let name = get_basename(path);
let uuid = Uuid::new_v4().to_string();

format!("{name}.{uuid}")
}

/// Backend for hdfs services.
#[derive(Debug, Clone)]
pub struct HdfsBackend {
root: String,
atomic_write_dir: Option<String>,
client: Arc<hdrs::Client>,
enable_append: bool,
}
Expand Down Expand Up @@ -263,15 +302,29 @@ impl Accessor for HdfsBackend {
}

async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let p = build_rooted_abs_path(&self.root, path);
let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir {
let target_path = build_rooted_abs_path(&self.root, path);
let tmp_path = build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path));

if let Err(err) = self.client.metadata(&p) {
// If the target file exists, we should append to the end of it directly.
if op.append() && self.client.metadata(&target_path).is_ok() {
(target_path, None)
} else {
(target_path, Some(tmp_path))
}
} else {
let p = build_rooted_abs_path(&self.root, path);

(p, None)
};

if let Err(err) = self.client.metadata(&target_path) {
// Early return if other error happened.
if err.kind() != io::ErrorKind::NotFound {
return Err(new_std_io_error(err));
}

let parent = get_parent(&p);
let parent = get_parent(&target_path);

self.client.create_dir(parent).map_err(new_std_io_error)?;

Expand All @@ -280,7 +333,7 @@ impl Accessor for HdfsBackend {
.open_file()
.create(true)
.write(true)
.async_open(&p)
.async_open(tmp_path.as_ref().unwrap_or(&target_path))
.await
.map_err(new_std_io_error)?;
f.close().await.map_err(new_std_io_error)?;
Expand All @@ -295,11 +348,14 @@ impl Accessor for HdfsBackend {
}

let f = open_options
.async_open(&p)
.async_open(tmp_path.as_ref().unwrap_or(&target_path))
.await
.map_err(new_std_io_error)?;

Ok((RpWrite::new(), HdfsWriter::new(f)))
Ok((
RpWrite::new(),
HdfsWriter::new(target_path, tmp_path, f, Arc::clone(&self.client)),
))
}

async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
Expand Down Expand Up @@ -438,23 +494,37 @@ impl Accessor for HdfsBackend {
}

fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
let p = build_rooted_abs_path(&self.root, path);
let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir {
let target_path = build_rooted_abs_path(&self.root, path);
let tmp_path = build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path));

if let Err(err) = self.client.metadata(&p) {
// If the target file exists, we should append to the end of it directly.
if op.append() && self.client.metadata(&target_path).is_ok() {
(target_path, None)
} else {
(target_path, Some(tmp_path))
}
} else {
let p = build_rooted_abs_path(&self.root, path);

(p, None)
};

if let Err(err) = self.client.metadata(&target_path) {
// Early return if other error happened.
if err.kind() != io::ErrorKind::NotFound {
return Err(new_std_io_error(err));
}

let parent = get_parent(&p);
let parent = get_parent(&target_path);

self.client.create_dir(parent).map_err(new_std_io_error)?;

self.client
.open_file()
.create(true)
.write(true)
.open(&p)
.open(tmp_path.as_ref().unwrap_or(&target_path))
.map_err(new_std_io_error)?;
}

Expand All @@ -466,9 +536,14 @@ impl Accessor for HdfsBackend {
open_options.write(true);
}

let f = open_options.open(&p).map_err(new_std_io_error)?;
let f = open_options
.open(tmp_path.as_ref().unwrap_or(&target_path))
.map_err(new_std_io_error)?;

Ok((RpWrite::new(), HdfsWriter::new(f)))
Ok((
RpWrite::new(),
HdfsWriter::new(target_path, tmp_path, f, Arc::clone(&self.client)),
))
}

fn blocking_rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
Expand Down
85 changes: 75 additions & 10 deletions core/src/services/hdfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,58 @@
// specific language governing permissions and limitations
// under the License.

use futures::future::BoxFuture;
use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::task::Context;
use std::sync::Arc;
use std::task::Poll;
use std::task::{ready, Context};

use async_trait::async_trait;
use futures::AsyncWrite;
use futures::{AsyncWrite, FutureExt};

use crate::raw::*;
use crate::*;

// A simple wrapper around a future that implements Future + Send + Sync
struct SyncFutureWrapper(pub BoxFuture<'static, Result<()>>);
shbhmrzd marked this conversation as resolved.
Show resolved Hide resolved

impl Future for SyncFutureWrapper {
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Delegate the polling to the inner future
Pin::new(&mut self.get_mut().0).poll(cx)
}
}

// Explicitly mark SyncFutureWrapper as Send and Sync
unsafe impl Send for SyncFutureWrapper {}
unsafe impl Sync for SyncFutureWrapper {}

pub struct HdfsWriter<F> {
target_path: String,
tmp_path: Option<String>,
f: F,
client: Arc<hdrs::Client>,
fut: Option<SyncFutureWrapper>,
}

impl<F> HdfsWriter<F> {
pub fn new(f: F) -> Self {
Self { f }
pub fn new(
target_path: String,
tmp_path: Option<String>,
f: F,
client: Arc<hdrs::Client>,
) -> Self {
Self {
target_path,
tmp_path,
f,
client,
fut: None,
}
}
}

Expand All @@ -44,18 +78,42 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
.map_err(new_std_io_error)
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
loop {
if let Some(mut fut) = self.fut.take() {
shbhmrzd marked this conversation as resolved.
Show resolved Hide resolved
let res = ready!(fut.poll_unpin(cx));
return Poll::Ready(res);
}

let _ = Pin::new(&mut self.f)
.poll_close(cx)
shbhmrzd marked this conversation as resolved.
Show resolved Hide resolved
.map_err(new_std_io_error);

// Clone client to allow move into the future.
let tmp_path = self.tmp_path.clone();
let client = Arc::clone(&self.client);
shbhmrzd marked this conversation as resolved.
Show resolved Hide resolved
let target_path = self.target_path.clone();

let fut = SyncFutureWrapper(Box::pin(async move {
if let Some(tmp_path) = tmp_path {
client
.rename_file(&tmp_path, &target_path)
.map_err(new_std_io_error)?;
}

Ok(())
}));

self.fut = Some(fut);
}
}

fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Err(Error::new(
ErrorKind::Unsupported,
"HdfsWriter doesn't support abort",
)))
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut self.f)
.poll_close(cx)
.map_err(new_std_io_error)
}
}

impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
Expand All @@ -66,6 +124,13 @@ impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
fn close(&mut self) -> Result<()> {
self.f.flush().map_err(new_std_io_error)?;

if let Some(tmp_path) = &self.tmp_path {
let client = Arc::as_ref(&self.client);
shbhmrzd marked this conversation as resolved.
Show resolved Hide resolved
client
.rename_file(tmp_path, &self.target_path)
.map_err(new_std_io_error)?;
}

Ok(())
}
}
Loading