Skip to content

Commit

Permalink
[CHORE] Convert GlobScanOperator to perform streaming into result and…
Browse files Browse the repository at this point in the history
… take a list of glob paths (#1577)

1. Converts GlobScanOperator to utilize Iterators and tokio channels to
provide end-to-end streaming of glob results
2. Allows `GlobScanOperator` to be created from a list of glob files,
which will chain the results into a single return stream

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Nov 8, 2023
1 parent a59b946 commit 96b73d6
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 81 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ class ScanOperatorHandle:
) -> ScanOperatorHandle: ...
@staticmethod
def glob_scan(
glob_path: str,
glob_path: list[str],
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
schema: PySchema | None = None,
Expand Down
21 changes: 4 additions & 17 deletions daft/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,15 @@ def _get_tabular_files_scan(

scan_op: ScanOperatorHandle
if isinstance(path, list):
# Eagerly globs each path and fallback to AnonymousScanOperator.
# NOTE: We could instead have GlobScanOperator take a list of paths and mux the glob output streams
runner_io = get_context().runner().runner_io()
file_infos = runner_io.glob_paths_details(path, file_format_config=file_format_config, io_config=io_config)

# TODO: Should we move this into the AnonymousScanOperator itself?
# Infer schema if no hints provided
inferred_or_provided_schema = (
schema_hint
if schema_hint is not None
else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config)
)

scan_op = ScanOperatorHandle.anonymous_scan(
file_infos.file_paths,
inferred_or_provided_schema._schema,
scan_op = ScanOperatorHandle.glob_scan(
path,
file_format_config,
storage_config,
schema=schema_hint._schema if schema_hint is not None else None,
)
elif isinstance(path, str):
scan_op = ScanOperatorHandle.glob_scan(
path,
[path],
file_format_config,
storage_config,
schema=schema_hint._schema if schema_hint is not None else None,
Expand Down
2 changes: 1 addition & 1 deletion src/daft-io/src/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ impl ObjectSource for AzureBlobSource {
page_size: Option<i32>,
limit: Option<usize>,
io_stats: Option<IOStatsRef>,
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
) -> super::Result<BoxStream<'static, super::Result<FileMetadata>>> {
use crate::object_store_glob::glob;

// Ensure fanout_limit is not None to prevent runaway concurrency
Expand Down
2 changes: 1 addition & 1 deletion src/daft-io/src/google_cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ impl ObjectSource for GCSSource {
page_size: Option<i32>,
limit: Option<usize>,
io_stats: Option<IOStatsRef>,
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
) -> super::Result<BoxStream<'static, super::Result<FileMetadata>>> {
use crate::object_store_glob::glob;

// Ensure fanout_limit is not None to prevent runaway concurrency
Expand Down
2 changes: 1 addition & 1 deletion src/daft-io/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl ObjectSource for HttpSource {
_page_size: Option<i32>,
limit: Option<usize>,
io_stats: Option<IOStatsRef>,
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
) -> super::Result<BoxStream<'static, super::Result<FileMetadata>>> {
use crate::object_store_glob::glob;

// Ensure fanout_limit is None because HTTP ObjectSource does not support prefix listing
Expand Down
14 changes: 6 additions & 8 deletions src/daft-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tokio::runtime::RuntimeFlavor;

use std::{borrow::Cow, collections::HashMap, hash::Hash, ops::Range, sync::Arc};

use futures::{StreamExt, TryStreamExt};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};

use snafu::Snafu;
use url::ParseError;
Expand Down Expand Up @@ -168,18 +168,16 @@ impl IOClient {

pub async fn glob(
&self,
input: &str,
input: String,
fanout_limit: Option<usize>,
page_size: Option<i32>,
limit: Option<usize>,
io_stats: Option<Arc<IOStatsContext>>,
) -> Result<Vec<FileMetadata>> {
let (scheme, _) = parse_url(input)?;
) -> Result<BoxStream<'static, Result<FileMetadata>>> {
let (scheme, _) = parse_url(input.as_str())?;
let source = self.get_source(&scheme).await?;
let files: Vec<FileMetadata> = source
.glob(input, fanout_limit, page_size, limit, io_stats)
.await?
.try_collect()
let files = source
.glob(input.as_str(), fanout_limit, page_size, limit, io_stats)
.await?;
Ok(files)
}
Expand Down
2 changes: 1 addition & 1 deletion src/daft-io/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl ObjectSource for LocalSource {
_page_size: Option<i32>,
limit: Option<usize>,
io_stats: Option<IOStatsRef>,
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
) -> super::Result<BoxStream<'static, super::Result<FileMetadata>>> {
use crate::object_store_glob::glob;

// Ensure fanout_limit is None because Local ObjectSource does not support prefix listing
Expand Down
2 changes: 1 addition & 1 deletion src/daft-io/src/object_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub(crate) trait ObjectSource: Sync + Send {
page_size: Option<i32>,
limit: Option<usize>,
io_stats: Option<IOStatsRef>,
) -> super::Result<BoxStream<super::Result<FileMetadata>>>;
) -> super::Result<BoxStream<'static, super::Result<FileMetadata>>>;

async fn ls(
&self,
Expand Down
2 changes: 1 addition & 1 deletion src/daft-io/src/s3_like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ impl ObjectSource for S3LikeSource {
page_size: Option<i32>,
limit: Option<usize>,
io_stats: Option<IOStatsRef>,
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
) -> super::Result<BoxStream<'static, super::Result<FileMetadata>>> {
use crate::object_store_glob::glob;

// Ensure fanout_limit is not None to prevent runaway concurrency
Expand Down
1 change: 1 addition & 0 deletions src/daft-scan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ daft-io = {path = "../daft-io", default-features = false}
daft-parquet = {path = "../daft-parquet", default-features = false}
daft-stats = {path = "../daft-stats", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
futures = {workspace = true}
pyo3 = {workspace = true, optional = true}
pyo3-log = {workspace = true}
serde = {workspace = true}
Expand Down
Loading

0 comments on commit 96b73d6

Please sign in to comment.