diff --git a/.github/workflows/service_test_hdfs.yml b/.github/workflows/service_test_hdfs.yml index 3533ad0be6b..1e1dae9f7e6 100644 --- a/.github/workflows/service_test_hdfs.yml +++ b/.github/workflows/service_test_hdfs.yml @@ -136,3 +136,40 @@ jobs: OPENDAL_HDFS_ROOT: /tmp/opendal/ OPENDAL_HDFS_NAME_NODE: hdfs://localhost:8020 OPENDAL_HDFS_ENABLE_APPEND: true + + hdfs-default-with-atomic-write-dir: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Setup Rust toolchain + uses: ./.github/actions/setup + with: + need-nextest: true + + - name: Setup java env + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "11" + - name: Setup hadoop env + shell: bash + run: | + curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner + + - name: Test + shell: bash + working-directory: core + run: | + export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob) + export LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${{ env.HADOOP_HOME }}/lib/native + cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${{ env.HADOOP_HOME }}/etc/hadoop/hdfs-site.xml + + cargo test behavior --features tests,services-hdfs + env: + HADOOP_HOME: "/home/runner/hadoop-3.3.5" + OPENDAL_TEST: hdfs + OPENDAL_HDFS_ROOT: /tmp/opendal/ + OPENDAL_HDFS_ATOMIC_WRITE_DIR: /tmp/atomic_write_dir/opendal/ + OPENDAL_HDFS_NAME_NODE: default + OPENDAL_HDFS_ENABLE_APPEND: false diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index b9255a16d1e..cd2ee0b2623 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -25,6 +25,7 @@ use async_trait::async_trait; use futures::AsyncWriteExt; use log::debug; use serde::Deserialize; +use uuid::Uuid; use super::lister::HdfsLister; use super::writer::HdfsWriter; @@ -48,6 +49,8 @@ pub struct HdfsConfig { pub user: Option, /// enable the append capacity pub enable_append: bool, + /// atomic_write_dir of this backend + pub atomic_write_dir: Option, } impl Debug for HdfsConfig { @@ -133,6 +136,21 @@ impl HdfsBuilder { self.config.enable_append = enable_append; self } + + /// Set temp dir for atomic write. + /// + /// # Notes + /// + /// - When append is enabled, we will not use atomic write + /// to avoid data loss and performance issue. + pub fn atomic_write_dir(&mut self, dir: &str) -> &mut Self { + self.config.atomic_write_dir = if dir.is_empty() { + None + } else { + Some(String::from(dir)) + }; + self + } } impl Builder for HdfsBuilder { @@ -181,19 +199,40 @@ impl Builder for HdfsBuilder { } } + let atomic_write_dir = self.config.atomic_write_dir.take(); + + // If atomic write dir is not exist, we must create it. + if let Some(d) = &atomic_write_dir { + if let Err(e) = client.metadata(d) { + if e.kind() == io::ErrorKind::NotFound { + client.create_dir(d).map_err(new_std_io_error)? + } + } + } + debug!("backend build finished: {:?}", &self); Ok(HdfsBackend { root, + atomic_write_dir, client: Arc::new(client), enable_append: self.config.enable_append, }) } } +#[inline] +fn tmp_file_of(path: &str) -> String { + let name = get_basename(path); + let uuid = Uuid::new_v4().to_string(); + + format!("{name}.{uuid}") +} + /// Backend for hdfs services. #[derive(Debug, Clone)] pub struct HdfsBackend { root: String, + atomic_write_dir: Option, client: Arc, enable_append: bool, } @@ -263,15 +302,28 @@ impl Accessor for HdfsBackend { } async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let p = build_rooted_abs_path(&self.root, path); + let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir { + let target_path = build_rooted_abs_path(&self.root, path); + let tmp_path = build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path)); + + // If the target file exists, we should append to the end of it directly. + if op.append() && self.client.metadata(&target_path).is_ok() { + (target_path, None) + } else { + (target_path, Some(tmp_path)) + } + } else { + let p = build_rooted_abs_path(&self.root, path); + (p, None) + }; - if let Err(err) = self.client.metadata(&p) { + if let Err(err) = self.client.metadata(&target_path) { // Early return if other error happened. if err.kind() != io::ErrorKind::NotFound { return Err(new_std_io_error(err)); } - let parent = get_parent(&p); + let parent = get_parent(&target_path); self.client.create_dir(parent).map_err(new_std_io_error)?; @@ -280,7 +332,7 @@ impl Accessor for HdfsBackend { .open_file() .create(true) .write(true) - .async_open(&p) + .async_open(&target_path) .await .map_err(new_std_io_error)?; f.close().await.map_err(new_std_io_error)?; @@ -295,11 +347,14 @@ impl Accessor for HdfsBackend { } let f = open_options - .async_open(&p) + .async_open(tmp_path.as_ref().unwrap_or(&target_path)) .await .map_err(new_std_io_error)?; - Ok((RpWrite::new(), HdfsWriter::new(f))) + Ok(( + RpWrite::new(), + HdfsWriter::new(target_path, tmp_path, f, Arc::clone(&self.client)), + )) } async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { @@ -438,15 +493,29 @@ impl Accessor for HdfsBackend { } fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - let p = build_rooted_abs_path(&self.root, path); + let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir { + let target_path = build_rooted_abs_path(&self.root, path); + let tmp_path = build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path)); - if let Err(err) = self.client.metadata(&p) { + // If the target file exists, we should append to the end of it directly. + if op.append() && self.client.metadata(&target_path).is_ok() { + (target_path, None) + } else { + (target_path, Some(tmp_path)) + } + } else { + let p = build_rooted_abs_path(&self.root, path); + + (p, None) + }; + + if let Err(err) = self.client.metadata(&target_path) { // Early return if other error happened. if err.kind() != io::ErrorKind::NotFound { return Err(new_std_io_error(err)); } - let parent = get_parent(&p); + let parent = get_parent(&target_path); self.client.create_dir(parent).map_err(new_std_io_error)?; @@ -454,7 +523,7 @@ impl Accessor for HdfsBackend { .open_file() .create(true) .write(true) - .open(&p) + .open(&target_path) .map_err(new_std_io_error)?; } @@ -466,9 +535,14 @@ impl Accessor for HdfsBackend { open_options.write(true); } - let f = open_options.open(&p).map_err(new_std_io_error)?; + let f = open_options + .open(tmp_path.as_ref().unwrap_or(&target_path)) + .map_err(new_std_io_error)?; - Ok((RpWrite::new(), HdfsWriter::new(f))) + Ok(( + RpWrite::new(), + HdfsWriter::new(target_path, tmp_path, f, Arc::clone(&self.client)), + )) } fn blocking_rename(&self, from: &str, to: &str, _args: OpRename) -> Result { diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 4990df40a99..a2ddd5c0021 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -15,56 +15,110 @@ // specific language governing permissions and limitations // under the License. +use futures::future::BoxFuture; use std::io::Write; use std::pin::Pin; -use std::task::Context; +use std::sync::Arc; use std::task::Poll; +use std::task::{ready, Context}; use async_trait::async_trait; -use futures::AsyncWrite; +use futures::{AsyncWrite, AsyncWriteExt, FutureExt}; use crate::raw::*; use crate::*; pub struct HdfsWriter { - f: F, + target_path: String, + tmp_path: Option, + f: Option, + client: Arc, + fut: Option>>, } +/// # Safety +/// +/// We will only take `&mut Self` reference for HdfsWriter. +unsafe impl Sync for HdfsWriter {} + impl HdfsWriter { - pub fn new(f: F) -> Self { - Self { f } + pub fn new( + target_path: String, + tmp_path: Option, + f: F, + client: Arc, + ) -> Self { + Self { + target_path, + tmp_path, + f: Some(f), + client, + fut: None, + } } } #[async_trait] impl oio::Write for HdfsWriter { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll> { - Pin::new(&mut self.f) + let f = self.f.as_mut().expect("HdfsWriter must be initialized"); + + Pin::new(f) .poll_write(cx, bs.chunk()) .map_err(new_std_io_error) } + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + if let Some(fut) = self.fut.as_mut() { + let res = ready!(fut.poll_unpin(cx)); + self.fut = None; + return Poll::Ready(res); + } + + let mut f = self.f.take().expect("HdfsWriter must be initialized"); + let tmp_path = self.tmp_path.clone(); + let target_path = self.target_path.clone(); + // Clone client to allow move into the future. + let client = self.client.clone(); + + self.fut = Some(Box::pin(async move { + f.close().await.map_err(new_std_io_error)?; + + if let Some(tmp_path) = tmp_path { + client + .rename_file(&tmp_path, &target_path) + .map_err(new_std_io_error)?; + } + + Ok(()) + })); + } + } + fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Err(Error::new( ErrorKind::Unsupported, "HdfsWriter doesn't support abort", ))) } - - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.f) - .poll_close(cx) - .map_err(new_std_io_error) - } } impl oio::BlockingWrite for HdfsWriter { fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result { - self.f.write(bs.chunk()).map_err(new_std_io_error) + let f = self.f.as_mut().expect("HdfsWriter must be initialized"); + f.write(bs.chunk()).map_err(new_std_io_error) } fn close(&mut self) -> Result<()> { - self.f.flush().map_err(new_std_io_error)?; + let f = self.f.as_mut().expect("HdfsWriter must be initialized"); + f.flush().map_err(new_std_io_error)?; + + if let Some(tmp_path) = &self.tmp_path { + self.client + .rename_file(tmp_path, &self.target_path) + .map_err(new_std_io_error)?; + } Ok(()) }