Skip to content

Commit

Permalink
add correct error propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Sep 21, 2023
1 parent eb94b2f commit 325bc33
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/daft-io/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![feature(async_closure)]

#![feature(let_chains)]
mod azure_blob;
mod google_cloud;
mod http;
Expand Down
31 changes: 19 additions & 12 deletions src/daft-io/src/object_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,27 @@ pub(crate) async fn recursive_iter(
uri: &str,
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
let (to_rtn_tx, mut to_rtn_rx) = tokio::sync::mpsc::channel(16 * 1024);
fn add_to_channel(source: Arc<dyn ObjectSource>, tx: Sender<FileMetadata>, dir: String) {
fn add_to_channel(
source: Arc<dyn ObjectSource>,
tx: Sender<super::Result<FileMetadata>>,
dir: String,
) {
tokio::spawn(async move {
let mut s = source.iter_dir(&dir, None, None).await.unwrap();
let s = source.iter_dir(&dir, None, None).await;
let mut s = match s {
Ok(s) => s,
Err(e) => {
tx.send(Err(e)).await.unwrap();
return;
}
};
let tx = &tx;
while let Some(tr) = s.next().await {
let tr = tr.unwrap();
match tr.filetype {
FileType::File => tx.send(tr).await.unwrap(),
FileType::Directory => {
let dirpath = tr.filepath.clone();
tx.send(tr).await.unwrap();
add_to_channel(source.clone(), tx.clone(), dirpath)
}
};
let tr = tr;
if let Ok(ref tr) = tr && matches!(tr.filetype, FileType::Directory) {
add_to_channel(source.clone(), tx.clone(), tr.filepath.clone())
}
tx.send(tr).await.unwrap();
}
});
}
Expand All @@ -139,7 +146,7 @@ pub(crate) async fn recursive_iter(

let to_rtn_stream = stream! {
while let Some(v) = to_rtn_rx.recv().await {
yield Ok(v)
yield v
}
};

Expand Down
15 changes: 15 additions & 0 deletions tests/integration/io/test_list_files_s3_minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,18 @@ def test_single_file_directory_listing(minio_io_config, recursive):
s3fs_result = s3fs_recursive_list(fs, path=f"s3://{bucket_name}/c/cc/ccc")
assert len(daft_ls_result) == 1
compare_s3_result(daft_ls_result, s3fs_result)


@pytest.mark.integration()
@pytest.mark.parametrize(
"recursive",
[False, True],
)
def test_missing_file_path(minio_io_config, recursive):
bucket_name = "bucket"
with minio_create_bucket(minio_io_config, bucket_name=bucket_name) as fs:
files = ["a", "b/bb", "c/cc/ccc"]
for name in files:
fs.write_bytes(f"s3://{bucket_name}/{name}", b"")
with pytest.raises(FileNotFoundError, match=f"s3://{bucket_name}/c/cc/ddd"):
daft_ls_result = io_list(f"s3://{bucket_name}/c/cc/ddd", io_config=minio_io_config, recursive=recursive)

0 comments on commit 325bc33

Please sign in to comment.