diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index acb8d452f426..d6191222e7e4 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -155,6 +155,33 @@ pub struct HdfsBackend { unsafe impl Send for HdfsBackend {} unsafe impl Sync for HdfsBackend {} +impl HdfsBackend { + fn create_parent_if_need(&self, path: &str) -> Result<()>{ + if let Err(err) = self.client.metadata(path) { + // Early return if other error happened. + if err.kind() != io::ErrorKind::NotFound { + return Err(new_std_io_error(err)); + } + + let parent = PathBuf::from(path) + .parent() + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "path should have parent but not, it must be malformed", + ).with_context("input", path) + })? + .to_path_buf(); + + self.client + .create_dir(&parent.to_string_lossy()) + .map_err(new_std_io_error)?; + } + + Ok(()) + } +} + #[async_trait] impl Accessor for HdfsBackend { type Reader = oio::FuturesReader; @@ -184,6 +211,7 @@ impl Accessor for HdfsBackend { list: true, list_without_recursive: true, + rename: true, blocking: true, ..Default::default() @@ -250,6 +278,16 @@ impl Accessor for HdfsBackend { Ok((RpWrite::new(), HdfsWriter::new(f))) } + async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result { + let from_path = build_rooted_abs_path(&self.root, from); + let to_path = build_rooted_abs_path(&self.root, to); + self.create_parent_if_need(&to_path)?; + + self.client.rename_file( &from_path, &to_path).map_err(new_std_io_error())?; + + Ok(RpRename::default()) + } + async fn stat(&self, path: &str, _: OpStat) -> Result { let p = build_rooted_abs_path(&self.root, path); @@ -367,6 +405,16 @@ impl Accessor for HdfsBackend { Ok((RpWrite::new(), HdfsWriter::new(f))) } + fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result { + let from_path = build_rooted_abs_path(&self.root, from); + let to_path = build_rooted_abs_path(&self.root, to); + self.create_parent_if_need(&to_path)?; + + self.client.rename_file( &from_path, &to_path).map_err(new_std_io_error)?; + + Ok(RpRename::default()) + } + fn blocking_stat(&self, path: &str, _: OpStat) -> Result { let p = build_rooted_abs_path(&self.root, path);