Skip to content

Commit

Permalink
Remove io_list
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Oct 3, 2023
1 parent caa086b commit c86ea5c
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 120 deletions.
61 changes: 0 additions & 61 deletions src/daft-io/src/object_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use common_error::DaftError;
use futures::stream::{BoxStream, Stream};
use futures::StreamExt;

use tokio::sync::mpsc::Sender;
use tokio::sync::OwnedSemaphorePermit;

use crate::local::{collect_file, LocalFile};
Expand Down Expand Up @@ -144,63 +143,3 @@ pub(crate) trait ObjectSource: Sync + Send {
Ok(s.boxed())
}
}

pub(crate) async fn recursive_iter(
source: Arc<dyn ObjectSource>,
uri: &str,
) -> super::Result<BoxStream<'static, super::Result<FileMetadata>>> {
log::debug!(target: "recursive_iter", "starting recursive_iter: with top level of: {uri}");
let (to_rtn_tx, mut to_rtn_rx) = tokio::sync::mpsc::channel(16 * 1024);
fn add_to_channel(
source: Arc<dyn ObjectSource>,
tx: Sender<super::Result<FileMetadata>>,
dir: String,
) {
log::debug!(target: "recursive_iter", "recursive_iter: spawning task to list: {dir}");
let source = source.clone();
tokio::spawn(async move {
let s = source.iter_dir(&dir, "/", true, Some(1000)).await;
log::debug!(target: "recursive_iter", "started listing task for {dir}");
let mut s = match s {
Ok(s) => s,
Err(e) => {
log::debug!(target: "recursive_iter", "Error occurred when listing {dir}\nerror:\n{e}");
tx.send(Err(e)).await.map_err(|se| {
super::Error::UnableToSendDataOverChannel { source: se.into() }
})?;
return super::Result::<_, super::Error>::Ok(());
}
};
let tx = &tx;
while let Some(tr) = s.next().await {
match tr {
Ok(fm) => {
if matches!(fm.filetype, FileType::Directory) {
add_to_channel(source.clone(), tx.clone(), fm.filepath.clone())
}
tx.send(Ok(fm)).await.map_err(|e| {
super::Error::UnableToSendDataOverChannel { source: e.into() }
})?;
}
Err(e) => {
tx.send(Err(e)).await.map_err(|se| {
super::Error::UnableToSendDataOverChannel { source: se.into() }
})?;
}
}
}
log::debug!(target: "recursive_iter", "completed listing task for {dir}");
super::Result::Ok(())
});
}

add_to_channel(source, to_rtn_tx, uri.to_string());

let to_rtn_stream = stream! {
while let Some(v) = to_rtn_rx.recv().await {
yield v
}
};

Ok(to_rtn_stream.boxed())
}
52 changes: 1 addition & 51 deletions src/daft-io/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub use common_io_config::python::{AzureConfig, GCSConfig, IOConfig};
pub use py::register_modules;

mod py {
use crate::{get_io_client, get_runtime, object_io::recursive_iter, parse_url};
use crate::{get_io_client, get_runtime, parse_url};
use common_error::DaftResult;
use futures::TryStreamExt;
use pyo3::{
Expand Down Expand Up @@ -51,63 +51,13 @@ mod py {
Ok(PyList::new(py, to_rtn))
}

#[pyfunction]
fn io_list(
py: Python,
path: String,
recursive: Option<bool>,
multithreaded_io: Option<bool>,
io_config: Option<common_io_config::python::IOConfig>,
) -> PyResult<&PyList> {
let lsr: DaftResult<Vec<_>> = py.allow_threads(|| {
let io_client = get_io_client(
multithreaded_io.unwrap_or(true),
io_config.unwrap_or_default().config.into(),
)?;
let (scheme, path) = parse_url(&path)?;
let runtime_handle = get_runtime(true)?;
let _rt_guard = runtime_handle.enter();

runtime_handle.block_on(async move {
let source = io_client.get_source(&scheme).await?;
let files = if recursive.is_some_and(|r| r) {
recursive_iter(source, &path)
.await?
.try_collect::<Vec<_>>()
.await?
} else {
source
.iter_dir(&path, "/", true, None)
.await?
.try_collect::<Vec<_>>()
.await?
.into_iter()
.collect()
};

Ok(files)
})
});
let lsr = lsr?;
let mut to_rtn = vec![];
for file in lsr {
let dict = PyDict::new(py);
dict.set_item("type", format!("{:?}", file.filetype))?;
dict.set_item("path", file.filepath)?;
dict.set_item("size", file.size)?;
to_rtn.push(dict);
}
Ok(PyList::new(py, to_rtn))
}

#[pyfunction]
fn set_io_pool_num_threads(num_threads: i64) -> PyResult<bool> {
Ok(crate::set_io_pool_num_threads(num_threads as usize))
}

pub fn register_modules(py: Python, parent: &PyModule) -> PyResult<()> {
common_io_config::python::register_modules(py, parent)?;
parent.add_function(wrap_pyfunction!(io_list, parent)?)?;
parent.add_function(wrap_pyfunction!(io_glob, parent)?)?;
parent.add_function(wrap_pyfunction!(set_io_pool_num_threads, parent)?)?;

Expand Down
9 changes: 1 addition & 8 deletions tests/integration/io/benchmarks/test_benchmark_glob.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest
import s3fs

from daft.daft import io_glob, io_list
from daft.daft import io_glob

from ..conftest import minio_create_bucket

Expand Down Expand Up @@ -219,10 +219,3 @@ def test_benchmark_glob_daft(benchmark, setup_bucket, minio_io_config, fanout_li
)
)
assert len(results) == setup_bucket


@pytest.mark.benchmark(group="glob")
@pytest.mark.integration()
def test_benchmark_io_list_recursive_daft(benchmark, setup_bucket, minio_io_config):
results = benchmark(lambda: io_list(f"s3://{BUCKET}/", io_config=minio_io_config, recursive=True))
assert len([r for r in results if r["type"] == "File"]) == setup_bucket

0 comments on commit c86ea5c

Please sign in to comment.