Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Add local native filesystem globbing. #1449

Merged
merged 6 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ rand = "^0.8"
serde_json = "1.0.104"
snafu = "0.7.4"
tokio = {version = "1.32.0", features = ["net", "time", "bytes", "process", "signal", "macros", "rt", "rt-multi-thread"]}
tokio-stream = {version = "0.1.14", features = ["fs"]}

[workspace.dependencies.arrow2]
git = "https://github.com/Eventual-Inc/arrow2"
Expand Down
5 changes: 4 additions & 1 deletion src/common/error/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub enum DaftError {
path: String,
source: GenericError,
},
InternalError(String),
External(GenericError),
}

Expand All @@ -31,7 +32,8 @@ impl std::error::Error for DaftError {
| DaftError::TypeError(_)
| DaftError::ComputeError(_)
| DaftError::ArrowError(_)
| DaftError::ValueError(_) => None,
| DaftError::ValueError(_)
| DaftError::InternalError(_) => None,
DaftError::IoError(io_error) => Some(io_error),
DaftError::FileNotFound { source, .. } | DaftError::External(source) => Some(&**source),
#[cfg(feature = "python")]
Expand Down Expand Up @@ -96,6 +98,7 @@ impl Display for DaftError {
Self::ComputeError(s) => write!(f, "DaftError::ComputeError {s}"),
Self::ArrowError(s) => write!(f, "DaftError::ArrowError {s}"),
Self::ValueError(s) => write!(f, "DaftError::ValueError {s}"),
Self::InternalError(s) => write!(f, "DaftError::InternalError {s}"),
#[cfg(feature = "python")]
Self::PyO3Error(e) => write!(f, "DaftError::PyO3Error {e}"),
Self::IoError(e) => write!(f, "DaftError::IoError {e}"),
Expand Down
1 change: 1 addition & 0 deletions src/daft-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ serde = {workspace = true}
serde_json = {workspace = true}
snafu = {workspace = true}
tokio = {workspace = true}
tokio-stream = {workspace = true}
url = "2.4.0"

[dependencies.reqwest]
Expand Down
211 changes: 165 additions & 46 deletions src/daft-io/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ use std::io::SeekFrom;
use std::ops::Range;
use std::path::PathBuf;

use crate::object_io::LSResult;
use crate::object_io::{self, FileMetadata, LSResult};

use super::object_io::{GetResult, ObjectSource};
use super::Result;
use async_trait::async_trait;
use bytes::Bytes;
use common_error::DaftError;
use futures::stream::BoxStream;
use futures::StreamExt;
use futures::TryStreamExt;
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
Expand All @@ -33,6 +37,21 @@ enum Error {
source: std::io::Error,
},

#[snafu(display("Unable to fetch file metadata for file {}: {}", path, source))]
UnableToFetchFileMetadata {
path: String,
source: std::io::Error,
},

#[snafu(display("Unable to get entries for directory {}: {}", path, source))]
UnableToFetchDirectoryEntries {
path: String,
source: std::io::Error,
},

#[snafu(display("Unexpected symlink when processing directory {}: {}", path, source))]
UnexpectedSymlink { path: String, source: DaftError },

#[snafu(display("Unable to parse URL \"{}\"", url.to_string_lossy()))]
InvalidUrl { url: PathBuf, source: ParseError },

Expand All @@ -44,7 +63,9 @@ impl From<Error> for super::Error {
fn from(error: Error) -> Self {
use Error::*;
match error {
UnableToOpenFile { path, source } => {
UnableToOpenFile { path, source }
| UnableToFetchFileMetadata { path, source }
| UnableToFetchDirectoryEntries { path, source } => {
use std::io::ErrorKind::*;
match source.kind() {
NotFound => super::Error::NotFound {
Expand Down Expand Up @@ -84,49 +105,104 @@ pub struct LocalFile {
#[async_trait]
impl ObjectSource for LocalSource {
async fn get(&self, uri: &str, range: Option<Range<usize>>) -> super::Result<GetResult> {
const TO_STRIP: &str = "file://";
if let Some(p) = uri.strip_prefix(TO_STRIP) {
let path = std::path::Path::new(p);
const LOCAL_PROTOCOL: &str = "file://";
if let Some(uri) = uri.strip_prefix(LOCAL_PROTOCOL) {
Ok(GetResult::File(LocalFile {
path: path.to_path_buf(),
path: uri.into(),
range,
}))
} else {
return Err(Error::InvalidFilePath {
path: uri.to_string(),
}
.into());
Err(Error::InvalidFilePath { path: uri.into() }.into())
}
}

async fn get_size(&self, uri: &str) -> super::Result<usize> {
const TO_STRIP: &str = "file://";
if let Some(p) = uri.strip_prefix(TO_STRIP) {
let path = std::path::Path::new(p);
let file = tokio::fs::File::open(path)
.await
.context(UnableToOpenFileSnafu {
path: path.to_string_lossy(),
})?;
let metadata = file.metadata().await.context(UnableToOpenFileSnafu {
path: path.to_string_lossy(),
})?;
return Ok(metadata.len() as usize);
} else {
return Err(Error::InvalidFilePath {
const LOCAL_PROTOCOL: &str = "file://";
let Some(uri) = uri.strip_prefix(LOCAL_PROTOCOL) else {
return Err(Error::InvalidFilePath { path: uri.into() }.into());
};
let meta = tokio::fs::metadata(uri)
.await
.context(UnableToFetchFileMetadataSnafu {
path: uri.to_string(),
}
.into());
}
})?;
Ok(meta.len() as usize)
}

async fn ls(
&self,
_path: &str,
path: &str,
_delimiter: Option<&str>,
_continuation_token: Option<&str>,
) -> super::Result<LSResult> {
unimplemented!("local ls");
let s = self.iter_dir(path, None, None).await?;
let files = s.try_collect::<Vec<_>>().await?;
Ok(LSResult {
files,
continuation_token: None,
})
}

async fn iter_dir(
&self,
uri: &str,
_delimiter: Option<&str>,
_limit: Option<usize>,
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
const LOCAL_PROTOCOL: &str = "file://";
let Some(uri) = uri.strip_prefix(LOCAL_PROTOCOL) else {
return Err(Error::InvalidFilePath { path: uri.into() }.into());
};
let meta =
tokio::fs::metadata(uri)
.await
.with_context(|_| UnableToFetchFileMetadataSnafu {
path: uri.to_string(),
})?;
if meta.file_type().is_file() {
// Provided uri points to a file, so only return that file.
return Ok(futures::stream::iter([Ok(FileMetadata {
filepath: format!("{}{}", LOCAL_PROTOCOL, uri),
size: Some(meta.len()),
filetype: object_io::FileType::File,
})])
.boxed());
}
let dir_entries = tokio::fs::read_dir(uri).await.with_context(|_| {
UnableToFetchDirectoryEntriesSnafu {
path: uri.to_string(),
}
})?;
let dir_stream = tokio_stream::wrappers::ReadDirStream::new(dir_entries);
let uri = Arc::new(uri.to_string());
let file_meta_stream = dir_stream.then(move |entry| {
let uri = uri.clone();
async move {
let entry = entry.with_context(|_| UnableToFetchDirectoryEntriesSnafu {
path: uri.to_string(),
})?;
let meta = tokio::fs::metadata(entry.path()).await.with_context(|_| {
UnableToFetchFileMetadataSnafu {
path: entry.path().to_string_lossy().to_string(),
}
})?;
Ok(FileMetadata {
filepath: format!(
"{}{}{}",
LOCAL_PROTOCOL,
entry.path().to_string_lossy(),
if meta.is_dir() { "/" } else { "" }
),
size: Some(meta.len()),
filetype: meta.file_type().try_into().with_context(|_| {
UnexpectedSymlinkSnafu {
path: entry.path().to_string_lossy().to_string(),
}
})?,
})
}
});
Ok(file_meta_stream.boxed())
}
}

Expand Down Expand Up @@ -171,16 +247,15 @@ pub(crate) async fn collect_file(local_file: LocalFile) -> Result<Bytes> {
#[cfg(test)]

mod tests {

use std::io::Write;

use crate::object_io::ObjectSource;
use crate::object_io::{FileMetadata, FileType, ObjectSource};
use crate::Result;
use crate::{HttpSource, LocalSource};

#[tokio::test]
async fn test_full_get_from_local() -> Result<()> {
let mut file1 = tempfile::NamedTempFile::new().unwrap();
async fn write_remote_parquet_to_local_file(
f: &mut tempfile::NamedTempFile,
) -> Result<bytes::Bytes> {
let parquet_file_path = "https://daft-public-data.s3.us-west-2.amazonaws.com/test_fixtures/parquet_small/0dad4c3f-da0d-49db-90d8-98684571391b-0.parquet";
let parquet_expected_md5 = "929674747af64a98aceaa6d895863bd3";

Expand All @@ -190,45 +265,89 @@ mod tests {
let all_bytes = bytes.as_ref();
let checksum = format!("{:x}", md5::compute(all_bytes));
assert_eq!(checksum, parquet_expected_md5);
file1.write_all(all_bytes).unwrap();
file1.flush().unwrap();
f.write_all(all_bytes).unwrap();
f.flush().unwrap();
Ok(bytes)
}

#[tokio::test]
async fn test_local_full_get() -> Result<()> {
let mut file1 = tempfile::NamedTempFile::new().unwrap();
let bytes = write_remote_parquet_to_local_file(&mut file1).await?;

let parquet_file_path = format!("file://{}", file1.path().to_str().unwrap());
let client = LocalSource::get_client().await?;

let try_all_bytes = client.get(&parquet_file_path, None).await?.bytes().await?;
assert_eq!(try_all_bytes.len(), all_bytes.len());
assert_eq!(try_all_bytes.as_ref(), all_bytes);
assert_eq!(try_all_bytes.len(), bytes.len());
assert_eq!(try_all_bytes, bytes);

let first_bytes = client
.get_range(&parquet_file_path, 0..10)
.await?
.bytes()
.await?;
assert_eq!(first_bytes.len(), 10);
assert_eq!(first_bytes.as_ref(), &all_bytes[..10]);
assert_eq!(first_bytes.as_ref(), &bytes[..10]);

let first_bytes = client
.get_range(&parquet_file_path, 10..100)
.await?
.bytes()
.await?;
assert_eq!(first_bytes.len(), 90);
assert_eq!(first_bytes.as_ref(), &all_bytes[10..100]);
assert_eq!(first_bytes.as_ref(), &bytes[10..100]);

let last_bytes = client
.get_range(
&parquet_file_path,
(all_bytes.len() - 10)..(all_bytes.len() + 10),
)
.get_range(&parquet_file_path, (bytes.len() - 10)..(bytes.len() + 10))
.await?
.bytes()
.await?;
assert_eq!(last_bytes.len(), 10);
assert_eq!(last_bytes.as_ref(), &all_bytes[(all_bytes.len() - 10)..]);
assert_eq!(last_bytes.as_ref(), &bytes[(bytes.len() - 10)..]);

let size_from_get_size = client.get_size(parquet_file_path.as_str()).await?;
assert_eq!(size_from_get_size, all_bytes.len());
assert_eq!(size_from_get_size, bytes.len());

Ok(())
}

#[tokio::test]
async fn test_local_full_ls() -> Result<()> {
let dir = tempfile::tempdir().unwrap();
let mut file1 = tempfile::NamedTempFile::new_in(dir.path()).unwrap();
write_remote_parquet_to_local_file(&mut file1).await?;
let mut file2 = tempfile::NamedTempFile::new_in(dir.path()).unwrap();
write_remote_parquet_to_local_file(&mut file2).await?;
let mut file3 = tempfile::NamedTempFile::new_in(dir.path()).unwrap();
write_remote_parquet_to_local_file(&mut file3).await?;
let dir_path = format!("file://{}", dir.path().to_string_lossy());
let client = LocalSource::get_client().await?;

let ls_result = client.ls(dir_path.as_ref(), None, None).await?;
let mut files = ls_result.files.clone();
// Ensure stable sort ordering of file paths before comparing with expected payload.
files.sort_by(|a, b| a.filepath.cmp(&b.filepath));
let mut expected = vec![
FileMetadata {
filepath: format!("file://{}", file1.path().to_string_lossy()),
size: Some(file1.as_file().metadata().unwrap().len()),
filetype: FileType::File,
},
FileMetadata {
filepath: format!("file://{}", file2.path().to_string_lossy()),
size: Some(file2.as_file().metadata().unwrap().len()),
filetype: FileType::File,
},
FileMetadata {
filepath: format!("file://{}", file3.path().to_string_lossy()),
size: Some(file3.as_file().metadata().unwrap().len()),
filetype: FileType::File,
},
];
expected.sort_by(|a, b| a.filepath.cmp(&b.filepath));
assert_eq!(files, expected);
assert_eq!(ls_result.continuation_token, None);

Ok(())
}
Expand Down
25 changes: 23 additions & 2 deletions src/daft-io/src/object_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use common_error::DaftError;
use futures::stream::{BoxStream, Stream};
use futures::StreamExt;
use tokio::sync::mpsc::Sender;
Expand Down Expand Up @@ -52,12 +53,32 @@ impl GetResult {
}
}

#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub enum FileType {
File,
Directory,
}
#[derive(Debug)]

impl TryFrom<std::fs::FileType> for FileType {
type Error = DaftError;

fn try_from(value: std::fs::FileType) -> Result<Self, Self::Error> {
if value.is_dir() {
Ok(Self::Directory)
} else if value.is_file() {
Ok(Self::File)
} else if value.is_symlink() {
Err(DaftError::InternalError(format!("Symlinks should never be encountered when constructing FileMetadata, but got: {:?}", value)))
} else {
unreachable!(
"Can only be a directory, file, or symlink, but got: {:?}",
value
)
}
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct FileMetadata {
pub filepath: String,
pub size: Option<u64>,
Expand Down
Loading
Loading