Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: reduce duplicated code for hdfs #5044

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 10 additions & 110 deletions core/src/services/hdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,31 +237,12 @@ impl Access for HdfsBackend {
am.into()
}

async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
let p = build_rooted_abs_path(&self.root, path);

self.client.create_dir(&p).map_err(new_std_io_error)?;

Ok(RpCreateDir::default())
async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
self.blocking_create_dir(path, args)
}

async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let p = build_rooted_abs_path(&self.root, path);

let meta = self.client.metadata(&p).map_err(new_std_io_error)?;

let mode = if meta.is_dir() {
EntryMode::DIR
} else if meta.is_file() {
EntryMode::FILE
} else {
EntryMode::Unknown
};
let mut m = Metadata::new(mode);
m.set_content_length(meta.len());
m.set_last_modified(meta.modified().into());

Ok(RpStat::new(m))
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.blocking_stat(path, args)
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
Expand Down Expand Up @@ -345,97 +326,16 @@ impl Access for HdfsBackend {
))
}

async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
let p = build_rooted_abs_path(&self.root, path);

let meta = self.client.metadata(&p);

if let Err(err) = meta {
return if err.kind() == io::ErrorKind::NotFound {
Ok(RpDelete::default())
} else {
Err(new_std_io_error(err))
};
}

// Safety: Err branch has been checked, it's OK to unwrap.
let meta = meta.ok().unwrap();

let result = if meta.is_dir() {
self.client.remove_dir(&p)
} else {
self.client.remove_file(&p)
};

result.map_err(new_std_io_error)?;

Ok(RpDelete::default())
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
self.blocking_delete(path, args)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for this change. However, it's not advisable to use a blocking API call directly (even though we currently do it this way). The ideal approach is to add an async API in hdrs first.

}

async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
let p = build_rooted_abs_path(&self.root, path);

let f = match self.client.read_dir(&p) {
Ok(f) => f,
Err(e) => {
return if e.kind() == io::ErrorKind::NotFound {
Ok((RpList::default(), None))
} else {
Err(new_std_io_error(e))
}
}
};

let rd = HdfsLister::new(&self.root, f);

Ok((RpList::default(), Some(rd)))
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
self.blocking_list(path, args)
}

async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
let from_path = build_rooted_abs_path(&self.root, from);
self.client.metadata(&from_path).map_err(new_std_io_error)?;

let to_path = build_rooted_abs_path(&self.root, to);
let result = self.client.metadata(&to_path);
match result {
Err(err) => {
// Early return if other error happened.
if err.kind() != io::ErrorKind::NotFound {
return Err(new_std_io_error(err));
}

let parent = PathBuf::from(&to_path)
.parent()
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"path should have parent but not, it must be malformed",
)
.with_context("input", &to_path)
})?
.to_path_buf();

self.client
.create_dir(&parent.to_string_lossy())
.map_err(new_std_io_error)?;
}
Ok(metadata) => {
if metadata.is_file() {
self.client
.remove_file(&to_path)
.map_err(new_std_io_error)?;
} else {
return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
.with_context("input", &to_path));
}
}
}

self.client
.rename_file(&from_path, &to_path)
.map_err(new_std_io_error)?;

Ok(RpRename::new())
async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
self.blocking_rename(from, to, args)
}

fn blocking_create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
Expand Down
Loading