Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(services/hdfs): Atomic write for hdfs #3875

Merged
merged 27 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
848b22e
Add HdfsConfig to implement ConfigDeserializer
shbhmrzd Dec 22, 2023
6e51ac2
Merge remote-tracking branch 'upstream/main' into add_hdfs_config
shbhmrzd Dec 22, 2023
d06bf66
fix conflicting impl debug
shbhmrzd Dec 22, 2023
50665a5
add doc for struct fields
shbhmrzd Dec 22, 2023
0fb22fd
atomic write support for hdfs
shbhmrzd Dec 31, 2023
96fba38
Merge remote-tracking branch 'upstream/main' into hdfs_atomic_write
shbhmrzd Dec 31, 2023
b5c96e3
use arc of hdrs client
shbhmrzd Jan 1, 2024
dfa0e71
fix issues with test
shbhmrzd Jan 1, 2024
8b44433
cargo fmt
shbhmrzd Jan 1, 2024
e93b17d
Merge remote-tracking branch 'upstream/main' into hdfs_atomic_write
shbhmrzd Jan 1, 2024
f3d7bf6
revert .env.example
shbhmrzd Jan 1, 2024
2179349
impl sync for HdfsWriter
shbhmrzd Jan 1, 2024
a592a0d
Merge remote-tracking branch 'upstream/main' into hdfs_atomic_write
shbhmrzd Jan 1, 2024
af8f51d
take fut as mut
shbhmrzd Jan 1, 2024
d7fd22a
add atomic write test workflow
shbhmrzd Jan 1, 2024
c0e465b
use Option<F> in HdfsWriter
shbhmrzd Jan 2, 2024
55587e3
Merge remote-tracking branch 'upstream/main' into hdfs_atomic_write
shbhmrzd Jan 2, 2024
e313c5c
revert .env change
shbhmrzd Jan 2, 2024
1895b9f
no support for atomic write if append true
shbhmrzd Jan 2, 2024
474fe57
revert
shbhmrzd Jan 2, 2024
c5dd416
testing with adding atomic write dir env param
shbhmrzd Jan 2, 2024
465f9b1
revert
shbhmrzd Jan 2, 2024
dc3f8e8
add debug statements
shbhmrzd Jan 2, 2024
dafef0c
remove debugs
shbhmrzd Jan 2, 2024
0d2a780
use f.close
shbhmrzd Jan 2, 2024
d92712d
review comments
shbhmrzd Jan 2, 2024
e229a9e
review comments
shbhmrzd Jan 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .github/workflows/service_test_hdfs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,40 @@ jobs:
OPENDAL_HDFS_ROOT: /tmp/opendal/
OPENDAL_HDFS_NAME_NODE: hdfs://localhost:8020
OPENDAL_HDFS_ENABLE_APPEND: true

hdfs-default-with-atomic-write-dir:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Setup Rust toolchain
uses: ./.github/actions/setup
with:
need-nextest: true

- name: Setup java env
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: "11"
- name: Setup hadoop env
shell: bash
run: |
curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner

- name: Test
shell: bash
working-directory: core
run: |
export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob)
export LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${{ env.HADOOP_HOME }}/lib/native
cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${{ env.HADOOP_HOME }}/etc/hadoop/hdfs-site.xml

cargo test behavior --features tests,services-hdfs
env:
HADOOP_HOME: "/home/runner/hadoop-3.3.5"
OPENDAL_TEST: hdfs
OPENDAL_HDFS_ROOT: /tmp/opendal/
OPENDAL_HDFS_ATOMIC_WRITE_DIR: /tmp/atomic_write_dir/opendal/
OPENDAL_HDFS_NAME_NODE: default
OPENDAL_HDFS_ENABLE_APPEND: false
98 changes: 86 additions & 12 deletions core/src/services/hdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use async_trait::async_trait;
use futures::AsyncWriteExt;
use log::debug;
use serde::Deserialize;
use uuid::Uuid;

use super::lister::HdfsLister;
use super::writer::HdfsWriter;
Expand All @@ -48,6 +49,8 @@ pub struct HdfsConfig {
pub user: Option<String>,
/// enable the append capacity
pub enable_append: bool,
/// atomic_write_dir of this backend
pub atomic_write_dir: Option<String>,
}

impl Debug for HdfsConfig {
Expand Down Expand Up @@ -133,6 +136,21 @@ impl HdfsBuilder {
self.config.enable_append = enable_append;
self
}

/// Set temp dir for atomic write.
///
/// # Notes
///
/// - When append is enabled, we will not use atomic write
/// to avoid data loss and performance issue.
pub fn atomic_write_dir(&mut self, dir: &str) -> &mut Self {
self.config.atomic_write_dir = if dir.is_empty() {
None
} else {
Some(String::from(dir))
};
self
}
}

impl Builder for HdfsBuilder {
Expand Down Expand Up @@ -181,19 +199,40 @@ impl Builder for HdfsBuilder {
}
}

let atomic_write_dir = self.config.atomic_write_dir.take();

// If atomic write dir is not exist, we must create it.
if let Some(d) = &atomic_write_dir {
if let Err(e) = client.metadata(d) {
if e.kind() == io::ErrorKind::NotFound {
client.create_dir(d).map_err(new_std_io_error)?
}
}
}

debug!("backend build finished: {:?}", &self);
Ok(HdfsBackend {
root,
atomic_write_dir,
client: Arc::new(client),
enable_append: self.config.enable_append,
})
}
}

#[inline]
fn tmp_file_of(path: &str) -> String {
let name = get_basename(path);
let uuid = Uuid::new_v4().to_string();

format!("{name}.{uuid}")
}

/// Backend for hdfs services.
#[derive(Debug, Clone)]
pub struct HdfsBackend {
root: String,
atomic_write_dir: Option<String>,
client: Arc<hdrs::Client>,
enable_append: bool,
}
Expand Down Expand Up @@ -263,15 +302,28 @@ impl Accessor for HdfsBackend {
}

async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let p = build_rooted_abs_path(&self.root, path);
let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir {
let target_path = build_rooted_abs_path(&self.root, path);
let tmp_path = build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path));

// If the target file exists, we should append to the end of it directly.
if op.append() && self.client.metadata(&target_path).is_ok() {
(target_path, None)
} else {
(target_path, Some(tmp_path))
}
} else {
let p = build_rooted_abs_path(&self.root, path);
(p, None)
};

if let Err(err) = self.client.metadata(&p) {
if let Err(err) = self.client.metadata(&target_path) {
// Early return if other error happened.
if err.kind() != io::ErrorKind::NotFound {
return Err(new_std_io_error(err));
}

let parent = get_parent(&p);
let parent = get_parent(&target_path);

self.client.create_dir(parent).map_err(new_std_io_error)?;

Expand All @@ -280,7 +332,7 @@ impl Accessor for HdfsBackend {
.open_file()
.create(true)
.write(true)
.async_open(&p)
.async_open(&target_path)
.await
.map_err(new_std_io_error)?;
f.close().await.map_err(new_std_io_error)?;
Expand All @@ -295,11 +347,14 @@ impl Accessor for HdfsBackend {
}

let f = open_options
.async_open(&p)
.async_open(tmp_path.as_ref().unwrap_or(&target_path))
.await
.map_err(new_std_io_error)?;

Ok((RpWrite::new(), HdfsWriter::new(f)))
Ok((
RpWrite::new(),
HdfsWriter::new(target_path, tmp_path, f, Arc::clone(&self.client)),
))
}

async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
Expand Down Expand Up @@ -438,23 +493,37 @@ impl Accessor for HdfsBackend {
}

fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
let p = build_rooted_abs_path(&self.root, path);
let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir {
let target_path = build_rooted_abs_path(&self.root, path);
let tmp_path = build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path));

if let Err(err) = self.client.metadata(&p) {
// If the target file exists, we should append to the end of it directly.
if op.append() && self.client.metadata(&target_path).is_ok() {
(target_path, None)
} else {
(target_path, Some(tmp_path))
}
} else {
let p = build_rooted_abs_path(&self.root, path);

(p, None)
};

if let Err(err) = self.client.metadata(&target_path) {
// Early return if other error happened.
if err.kind() != io::ErrorKind::NotFound {
return Err(new_std_io_error(err));
}

let parent = get_parent(&p);
let parent = get_parent(&target_path);

self.client.create_dir(parent).map_err(new_std_io_error)?;

self.client
.open_file()
.create(true)
.write(true)
.open(&p)
.open(&target_path)
.map_err(new_std_io_error)?;
}

Expand All @@ -466,9 +535,14 @@ impl Accessor for HdfsBackend {
open_options.write(true);
}

let f = open_options.open(&p).map_err(new_std_io_error)?;
let f = open_options
.open(tmp_path.as_ref().unwrap_or(&target_path))
.map_err(new_std_io_error)?;

Ok((RpWrite::new(), HdfsWriter::new(f)))
Ok((
RpWrite::new(),
HdfsWriter::new(target_path, tmp_path, f, Arc::clone(&self.client)),
))
}

fn blocking_rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
Expand Down
82 changes: 68 additions & 14 deletions core/src/services/hdfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,56 +15,110 @@
// specific language governing permissions and limitations
// under the License.

use futures::future::BoxFuture;
use std::io::Write;
use std::pin::Pin;
use std::task::Context;
use std::sync::Arc;
use std::task::Poll;
use std::task::{ready, Context};

use async_trait::async_trait;
use futures::AsyncWrite;
use futures::{AsyncWrite, AsyncWriteExt, FutureExt};

use crate::raw::*;
use crate::*;

pub struct HdfsWriter<F> {
f: F,
target_path: String,
tmp_path: Option<String>,
f: Option<F>,
client: Arc<hdrs::Client>,
fut: Option<BoxFuture<'static, Result<()>>>,
}

/// # Safety
///
/// We will only take `&mut Self` reference for HdfsWriter.
unsafe impl<F> Sync for HdfsWriter<F> {}

impl<F> HdfsWriter<F> {
pub fn new(f: F) -> Self {
Self { f }
pub fn new(
target_path: String,
tmp_path: Option<String>,
f: F,
client: Arc<hdrs::Client>,
) -> Self {
Self {
target_path,
tmp_path,
f: Some(f),
client,
fut: None,
}
}
}

#[async_trait]
impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
Pin::new(&mut self.f)
let f = self.f.as_mut().expect("HdfsWriter must be initialized");

Pin::new(f)
.poll_write(cx, bs.chunk())
.map_err(new_std_io_error)
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
loop {
if let Some(fut) = self.fut.as_mut() {
let res = ready!(fut.poll_unpin(cx));
self.fut = None;
return Poll::Ready(res);
}

let mut f = self.f.take().expect("HdfsWriter must be initialized");
let tmp_path = self.tmp_path.clone();
let target_path = self.target_path.clone();
// Clone client to allow move into the future.
let client = self.client.clone();

self.fut = Some(Box::pin(async move {
f.close().await.map_err(new_std_io_error)?;

if let Some(tmp_path) = tmp_path {
client
.rename_file(&tmp_path, &target_path)
.map_err(new_std_io_error)?;
}

Ok(())
}));
}
}

fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Err(Error::new(
ErrorKind::Unsupported,
"HdfsWriter doesn't support abort",
)))
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut self.f)
.poll_close(cx)
.map_err(new_std_io_error)
}
}

impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.f.write(bs.chunk()).map_err(new_std_io_error)
let f = self.f.as_mut().expect("HdfsWriter must be initialized");
f.write(bs.chunk()).map_err(new_std_io_error)
}

fn close(&mut self) -> Result<()> {
self.f.flush().map_err(new_std_io_error)?;
let f = self.f.as_mut().expect("HdfsWriter must be initialized");
f.flush().map_err(new_std_io_error)?;

if let Some(tmp_path) = &self.tmp_path {
self.client
.rename_file(tmp_path, &self.target_path)
.map_err(new_std_io_error)?;
}

Ok(())
}
Expand Down
Loading