From 7d6985d90c66e59f317dc2a3b012fb3d233d403e Mon Sep 17 00:00:00 2001 From: clarkzinzow Date: Fri, 25 Aug 2023 17:14:32 -0700 Subject: [PATCH] Unify file globbing logic by exposing FileInfos to Python. --- daft/execution/execution_step.py | 6 +- daft/execution/physical_plan.py | 7 +- daft/execution/physical_plan_factory.py | 1 - daft/execution/rust_physical_plan_shim.py | 5 +- daft/filesystem.py | 19 +- daft/io/common.py | 14 +- daft/io/file_path.py | 12 +- daft/logical/builder.py | 5 +- daft/logical/logical_plan.py | 40 ++--- daft/logical/optimizer.py | 2 - daft/logical/rust_logical_plan.py | 25 +-- daft/runners/pyrunner.py | 54 ++---- daft/runners/ray_runner.py | 72 +++----- daft/runners/runner_io.py | 32 +--- src/daft-plan/src/builder.rs | 10 +- src/daft-plan/src/lib.rs | 5 +- src/daft-plan/src/ops/source.rs | 4 +- src/daft-plan/src/physical_plan.rs | 20 +-- src/daft-plan/src/source_info.rs | 166 +++++++++++++++--- src/daft-plan/src/test/mod.rs | 4 +- tests/cookbook/test_dataloading.py | 38 ++-- tests/cookbook/test_image.py | 2 +- .../test_pushdown_clauses_into_scan.py | 3 - 23 files changed, 282 insertions(+), 264 deletions(-) diff --git a/daft/execution/execution_step.py b/daft/execution/execution_step.py index 66b5a7e539..9cdcfe5e99 100644 --- a/daft/execution/execution_step.py +++ b/daft/execution/execution_step.py @@ -319,7 +319,6 @@ class ReadFile(SingleOutputInstruction): fs: fsspec.AbstractFileSystem | None columns_to_read: list[str] | None file_format_config: FileFormatConfig - filepaths_column_name: str def run(self, inputs: list[Table]) -> list[Table]: return self._read_file(inputs) @@ -352,10 +351,7 @@ def _handle_tabular_files_scan( filepaths_partition: Table, ) -> Table: data = filepaths_partition.to_pydict() - assert ( - self.filepaths_column_name in data - ), f"TabularFilesScan should be ran on vPartitions with '{self.filepaths_column_name}' column" - filepaths = data[self.filepaths_column_name] + filepaths = data["file_paths"] if self.index is not None: filepaths = [filepaths[self.index]] diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py index 5bba024c5b..b983c78157 100644 --- a/daft/execution/physical_plan.py +++ b/daft/execution/physical_plan.py @@ -70,7 +70,6 @@ def file_read( fs: fsspec.AbstractFileSystem | None, columns_to_read: list[str] | None, file_format_config: FileFormatConfig, - filepaths_column_name: str, ) -> InProgressPhysicalPlan[PartitionT]: """child_plan represents partitions with filenames. @@ -85,8 +84,9 @@ def file_read( done_task = materializations.popleft() vpartition = done_task.vpartition() - file_sizes_bytes = vpartition.to_pydict()["size"] - file_rows = vpartition.to_pydict()["rows"] + file_infos = vpartition.to_pydict() + file_sizes_bytes = file_infos["file_sizes"] + file_rows = file_infos["num_rows"] # Emit one partition for each file (NOTE: hardcoded for now). for i in range(len(vpartition)): @@ -102,7 +102,6 @@ def file_read( fs=fs, columns_to_read=columns_to_read, file_format_config=file_format_config, - filepaths_column_name=filepaths_column_name, ), # Set the filesize as the memory request. # (Note: this is very conservative; file readers empirically use much more peak memory than 1x file size.) diff --git a/daft/execution/physical_plan_factory.py b/daft/execution/physical_plan_factory.py index 45ddd7a823..3189381c97 100644 --- a/daft/execution/physical_plan_factory.py +++ b/daft/execution/physical_plan_factory.py @@ -42,7 +42,6 @@ def _get_physical_plan(node: LogicalPlan, psets: dict[str, list[PartitionT]]) -> fs=node._fs, columns_to_read=node._column_names, file_format_config=node._file_format_config, - filepaths_column_name=node._filepaths_column_name, ) elif isinstance(node, logical_plan.Filter): diff --git a/daft/execution/rust_physical_plan_shim.py b/daft/execution/rust_physical_plan_shim.py index fe65687549..587679bc62 100644 --- a/daft/execution/rust_physical_plan_shim.py +++ b/daft/execution/rust_physical_plan_shim.py @@ -39,16 +39,13 @@ def tabular_scan( parts_t = cast(Iterator[PartitionT], parts) file_info_iter = physical_plan.partition_read(iter(parts_t)) - filepaths_column_name = get_context().runner().runner_io().FS_LISTING_PATH_COLUMN_NAME - pyschema = Schema._from_pyschema(schema) return physical_plan.file_read( child_plan=file_info_iter, limit_rows=limit, - schema=pyschema, + schema=Schema._from_pyschema(schema), fs=None, columns_to_read=columns_to_read, file_format_config=file_format_config, - filepaths_column_name=filepaths_column_name, ) diff --git a/daft/filesystem.py b/daft/filesystem.py index 51574d8db0..c1cf0c9190 100644 --- a/daft/filesystem.py +++ b/daft/filesystem.py @@ -26,7 +26,7 @@ _resolve_filesystem_and_path, ) -from daft.daft import FileFormat, FileFormatConfig, ParquetSourceConfig +from daft.daft import FileFormat, FileFormatConfig, FileInfos, ParquetSourceConfig from daft.table import Table _CACHED_FSES: dict[str, FileSystem] = {} @@ -295,7 +295,7 @@ def glob_path_with_stats( path: str, file_format_config: FileFormatConfig | None, fs: fsspec.AbstractFileSystem, -) -> list[ListingInfo]: +) -> FileInfos: """Glob a path, returning a list ListingInfo.""" protocol = get_protocol_from_path(path) @@ -307,13 +307,11 @@ def glob_path_with_stats( for path, details in globbed_data.items(): path = _ensure_path_protocol(protocol, path) filepaths_to_infos[path]["size"] = details["size"] - filepaths_to_infos[path]["type"] = details["type"] elif fs.isfile(path): file_info = fs.info(path) filepaths_to_infos[path]["size"] = file_info["size"] - filepaths_to_infos[path]["type"] = file_info["type"] elif fs.isdir(path): files_info = fs.ls(path, detail=True) @@ -322,7 +320,6 @@ def glob_path_with_stats( path = file_info["name"] path = _ensure_path_protocol(protocol, path) filepaths_to_infos[path]["size"] = file_info["size"] - filepaths_to_infos[path]["type"] = file_info["type"] else: raise FileNotFoundError(f"File or directory not found: {path}") @@ -342,9 +339,15 @@ def glob_path_with_stats( for path, parquet_metadata in zip(filepaths_to_infos.keys(), parquet_metadatas): filepaths_to_infos[path]["rows"] = parquet_metadata.num_rows - return [ - ListingInfo(path=_ensure_path_protocol(protocol, path), **infos) for path, infos in filepaths_to_infos.items() - ] + file_paths = [] + file_sizes = [] + num_rows = [] + for path, infos in filepaths_to_infos.items(): + file_paths.append(_ensure_path_protocol(protocol, path)) + file_sizes.append(infos.get("size")) + num_rows.append(infos.get("rows")) + + return FileInfos.from_infos(file_paths=file_paths, file_sizes=file_sizes, num_rows=num_rows) def _get_parquet_metadata_single(path: str) -> pa.parquet.FileMetadata: diff --git a/daft/io/common.py b/daft/io/common.py index 4d57ca8c46..03dd25471d 100644 --- a/daft/io/common.py +++ b/daft/io/common.py @@ -25,11 +25,21 @@ def _get_tabular_files_scan( """Returns a TabularFilesScan LogicalPlan for a given glob filepath.""" paths = path if isinstance(path, list) else [str(path)] schema_hint = _get_schema_from_hints(schema_hints) if schema_hints is not None else None + # Glob the path using the Runner + runner_io = get_context().runner().runner_io() + file_infos = runner_io.glob_paths_details(paths, file_format_config, fs) + + # Infer schema if no hints provided + inferred_or_provided_schema = ( + schema_hint + if schema_hint is not None + else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, fs) + ) # Construct plan builder_cls = get_context().logical_plan_builder_class() builder = builder_cls.from_tabular_scan( - paths=paths, - schema_hint=schema_hint, + file_infos=file_infos, + schema=inferred_or_provided_schema, file_format_config=file_format_config, fs=fs, ) diff --git a/daft/io/file_path.py b/daft/io/file_path.py index 9b9e254ddf..f94a7e948b 100644 --- a/daft/io/file_path.py +++ b/daft/io/file_path.py @@ -8,6 +8,8 @@ from daft.context import get_context from daft.daft import PartitionScheme, PartitionSpec from daft.dataframe import DataFrame +from daft.runners.pyrunner import LocalPartitionSet +from daft.table import Table @PublicAPI @@ -43,12 +45,14 @@ def from_glob_path(path: str, fs: Optional[fsspec.AbstractFileSystem] = None) -> """ context = get_context() runner_io = context.runner().runner_io() - partition_set = runner_io.glob_paths_details([path], fs=fs) - cache_entry = context.runner().put_partition_set_into_cache(partition_set) + file_infos = runner_io.glob_paths_details([path], fs=fs) + file_infos_table = Table._from_pytable(file_infos.to_table()) + partition = LocalPartitionSet({0: file_infos_table}) + cache_entry = context.runner().put_partition_set_into_cache(partition) builder_cls = context.logical_plan_builder_class() builder = builder_cls.from_in_memory_scan( cache_entry, - schema=runner_io.FS_LISTING_SCHEMA, - partition_spec=PartitionSpec(PartitionScheme.Unknown, partition_set.num_partitions()), + schema=file_infos_table.schema(), + partition_spec=PartitionSpec(PartitionScheme.Unknown, partition.num_partitions()), ) return DataFrame(builder) diff --git a/daft/logical/builder.py b/daft/logical/builder.py index 6f8179e2b7..dd7dac6ffa 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -9,6 +9,7 @@ from daft.daft import ( FileFormat, FileFormatConfig, + FileInfos, JoinType, PartitionScheme, PartitionSpec, @@ -80,9 +81,9 @@ def from_in_memory_scan( def from_tabular_scan( cls, *, - paths: list[str], + file_infos: FileInfos, + schema: Schema, file_format_config: FileFormatConfig, - schema_hint: Schema | None, fs: fsspec.AbstractFileSystem | None, ) -> LogicalPlanBuilder: pass diff --git a/daft/logical/logical_plan.py b/daft/logical/logical_plan.py index 3c7d80392f..cf6f05b29f 100644 --- a/daft/logical/logical_plan.py +++ b/daft/logical/logical_plan.py @@ -13,6 +13,7 @@ from daft.daft import ( FileFormat, FileFormatConfig, + FileInfos, JoinType, PartitionScheme, PartitionSpec, @@ -28,6 +29,7 @@ from daft.logical.map_partition_ops import ExplodeOp, MapPartitionOp from daft.logical.schema import Schema from daft.runners.partitioning import PartitionCacheEntry +from daft.runners.pyrunner import LocalPartitionSet from daft.table import Table if TYPE_CHECKING: @@ -111,39 +113,30 @@ def from_in_memory_scan( def from_tabular_scan( cls, *, - paths: list[str], + file_infos: FileInfos, + schema: Schema, file_format_config: FileFormatConfig, - schema_hint: Schema | None, fs: fsspec.AbstractFileSystem | None, ) -> PyLogicalPlanBuilder: - # Glob the path using the Runner - runner_io = get_context().runner().runner_io() - file_info_partition_set = runner_io.glob_paths_details(paths, file_format_config, fs) - - # Infer schema if no hints provided - inferred_or_provided_schema = ( - schema_hint - if schema_hint is not None - else runner_io.get_schema_from_first_filepath(file_info_partition_set, file_format_config, fs) - ) - cache_entry = get_context().runner().put_partition_set_into_cache(file_info_partition_set) + file_infos_table = Table._from_pytable(file_infos.to_table()) + partition = LocalPartitionSet({0: file_infos_table}) + cache_entry = get_context().runner().put_partition_set_into_cache(partition) filepath_plan = InMemoryScan( cache_entry=cache_entry, - schema=runner_io.FS_LISTING_SCHEMA, - partition_spec=PartitionSpec(PartitionScheme.Unknown, file_info_partition_set.num_partitions()), + schema=file_infos_table.schema(), + partition_spec=PartitionSpec(PartitionScheme.Unknown, len(file_infos)), ) return TabularFilesScan( - schema=inferred_or_provided_schema, + schema=schema, predicate=None, columns=None, file_format_config=file_format_config, fs=fs, filepaths_child=filepath_plan, - filepaths_column_name=runner_io.FS_LISTING_PATH_COLUMN_NAME, # WARNING: This is currently hardcoded to be the same number of partitions as rows!! This is because we emit # one partition per filepath. This will change in the future and our logic here should change accordingly. - num_partitions=len(file_info_partition_set), + num_partitions=len(file_infos), ).to_builder() def project( @@ -445,7 +438,6 @@ def __init__( predicate: ExpressionsProjection | None = None, columns: list[str] | None = None, filepaths_child: LogicalPlan, - filepaths_column_name: str, num_partitions: int | None = None, limit_rows: int | None = None, ) -> None: @@ -472,12 +464,7 @@ def __init__( self._fs = fs self._limit_rows = limit_rows - # TabularFilesScan has a single child node that provides the filepaths to read from. - assert ( - filepaths_child.schema()[filepaths_column_name] is not None - ), f"TabularFileScan requires a child with '{filepaths_column_name}' column" self._register_child(filepaths_child) - self._filepaths_column_name = filepaths_column_name @property def _filepaths_child(self) -> LogicalPlan: @@ -493,7 +480,7 @@ def __repr__(self) -> str: ) def required_columns(self) -> list[set[str]]: - return [{self._filepaths_column_name} | self._predicate.required_columns()] + return [{"file_paths"} | self._predicate.required_columns()] def input_mapping(self) -> list[dict[str, str]]: return [dict()] @@ -505,7 +492,6 @@ def _local_eq(self, other: Any) -> bool: and self._predicate == other._predicate and self._columns == other._columns and self._file_format_config == other._file_format_config - and self._filepaths_column_name == other._filepaths_column_name ) def rebuild(self) -> LogicalPlan: @@ -517,7 +503,6 @@ def rebuild(self) -> LogicalPlan: predicate=self._predicate if self._predicate is not None else None, columns=self._column_names, filepaths_child=child, - filepaths_column_name=self._filepaths_column_name, ) def copy_with_new_children(self, new_children: list[LogicalPlan]) -> LogicalPlan: @@ -529,7 +514,6 @@ def copy_with_new_children(self, new_children: list[LogicalPlan]) -> LogicalPlan predicate=self._predicate, columns=self._column_names, filepaths_child=new_children[0], - filepaths_column_name=self._filepaths_column_name, ) diff --git a/daft/logical/optimizer.py b/daft/logical/optimizer.py index 20bdce62e4..d25b3a2389 100644 --- a/daft/logical/optimizer.py +++ b/daft/logical/optimizer.py @@ -335,7 +335,6 @@ def _push_down_local_limit_into_scan(self, parent: LocalLimit, child: TabularFil file_format_config=child._file_format_config, fs=child._fs, filepaths_child=child._filepaths_child, - filepaths_column_name=child._filepaths_column_name, limit_rows=new_limit_rows, ) return new_scan @@ -359,7 +358,6 @@ def _push_down_projections_into_scan(self, parent: Projection, child: TabularFil file_format_config=child._file_format_config, fs=child._fs, filepaths_child=child._filepaths_child, - filepaths_column_name=child._filepaths_column_name, ) if any(not e._is_column() for e in parent._projection): return parent.copy_with_new_children([new_scan]) diff --git a/daft/logical/rust_logical_plan.py b/daft/logical/rust_logical_plan.py index 771e690f33..7e9c5ad433 100644 --- a/daft/logical/rust_logical_plan.py +++ b/daft/logical/rust_logical_plan.py @@ -6,8 +6,7 @@ import fsspec from daft import DataType, col -from daft.context import get_context -from daft.daft import CountMode, FileFormat, FileFormatConfig, JoinType +from daft.daft import CountMode, FileFormat, FileFormatConfig, FileInfos, JoinType from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder from daft.daft import PartitionScheme, PartitionSpec, ResourceRequest from daft.errors import ExpressionTypeError @@ -62,30 +61,14 @@ def from_in_memory_scan( def from_tabular_scan( cls, *, - paths: list[str], + file_infos: FileInfos, + schema: Schema, file_format_config: FileFormatConfig, - schema_hint: Schema | None, fs: fsspec.AbstractFileSystem | None, ) -> RustLogicalPlanBuilder: if fs is not None: raise ValueError("fsspec filesystems not supported for Rust query planner.") - # Glob the path using the Runner - runner_io = get_context().runner().runner_io() - file_info_partition_set = runner_io.glob_paths_details(paths, file_format_config, fs) - - # Infer schema if no hints provided - inferred_or_provided_schema = ( - schema_hint - if schema_hint is not None - else runner_io.get_schema_from_first_filepath(file_info_partition_set, file_format_config, fs) - ) - paths_details = file_info_partition_set.to_pydict() - filepaths = paths_details[runner_io.FS_LISTING_PATH_COLUMN_NAME] - filesizes = paths_details[runner_io.FS_LISTING_SIZE_COLUMN_NAME] - filerows = paths_details[runner_io.FS_LISTING_ROWS_COLUMN_NAME] - - rs_schema = inferred_or_provided_schema._schema - builder = _LogicalPlanBuilder.table_scan(filepaths, filesizes, filerows, rs_schema, file_format_config) + builder = _LogicalPlanBuilder.table_scan(file_infos, schema._schema, file_format_config) return cls(builder) def project( diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index c2b86bffae..e41878a6b3 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -7,10 +7,9 @@ import fsspec import psutil -import pyarrow as pa from loguru import logger -from daft.daft import FileFormatConfig, ResourceRequest +from daft.daft import FileFormatConfig, FileInfos, ResourceRequest from daft.execution import physical_plan from daft.execution.execution_step import Instruction, MaterializedResult, PartitionTask from daft.filesystem import get_filesystem_from_path, glob_path_with_stats @@ -68,64 +67,37 @@ def wait(self) -> None: pass -class PyRunnerIO(runner_io.RunnerIO[Table]): +class PyRunnerIO(runner_io.RunnerIO): def glob_paths_details( self, source_paths: list[str], file_format_config: FileFormatConfig | None = None, fs: fsspec.AbstractFileSystem | None = None, - ) -> LocalPartitionSet: - all_files_infos = [] + ) -> FileInfos: + file_infos = FileInfos() for source_path in source_paths: if fs is None: fs = get_filesystem_from_path(source_path) - files_info = glob_path_with_stats(source_path, file_format_config, fs) + path_file_infos = glob_path_with_stats(source_path, file_format_config, fs) - if len(files_info) == 0: + if len(path_file_infos) == 0: raise FileNotFoundError(f"No files found at {source_path}") - all_files_infos.extend(files_info) - - partition = Table.from_pydict( - { - "path": pa.array([file_info.path for file_info in all_files_infos], type=pa.string()), - "size": pa.array([file_info.size for file_info in all_files_infos], type=pa.int64()), - "type": pa.array([file_info.type for file_info in all_files_infos], type=pa.string()), - "rows": pa.array([file_info.rows for file_info in all_files_infos], type=pa.int64()), - }, - ) - - # Make sure that the schema is consistent with what we expect - assert ( - partition.schema() == PyRunnerIO.FS_LISTING_SCHEMA - ), f"Schema should be expected: {PyRunnerIO.FS_LISTING_SCHEMA}, but received: {partition.schema()}" - - pset = LocalPartitionSet( - { - # Hardcoded to 1 partition - 0: partition, - } - ) - return pset + file_infos.extend(path_file_infos) + + return file_infos def get_schema_from_first_filepath( self, - listing_details_partitions: PartitionSet[Table], + file_infos: FileInfos, file_format_config: FileFormatConfig, fs: fsspec.AbstractFileSystem | None, ) -> Schema: - # Naively retrieve the first filepath in the PartitionSet - nonempty_partitions = [ - p - for p, p_len in zip(listing_details_partitions.values(), listing_details_partitions.len_of_partitions()) - if p_len > 0 - ] - if len(nonempty_partitions) == 0: + if len(file_infos) == 0: raise ValueError("No files to get schema from") - first_filepath = nonempty_partitions[0].to_pydict()[PyRunnerIO.FS_LISTING_PATH_COLUMN_NAME][0] - - return runner_io.sample_schema(first_filepath, file_format_config, fs) + # Naively retrieve the first filepath in the PartitionSet + return runner_io.sample_schema(file_infos[0].file_path, file_format_config, fs) class PyRunner(Runner[Table]): diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index cfd552c53b..b3ff3b16d0 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -23,7 +23,7 @@ ) raise -from daft.daft import FileFormatConfig, ResourceRequest +from daft.daft import FileFormatConfig, FileInfos, ResourceRequest from daft.datatype import DataType from daft.execution.execution_step import ( FanoutInstruction, @@ -65,37 +65,22 @@ @ray.remote -def _glob_path_into_details_vpartitions( +def _glob_path_into_file_infos( paths: list[str], - schema: Schema, file_format_config: FileFormatConfig | None, fs: fsspec.AbstractFileSystem | None, -) -> list[tuple[PartID, Table]]: - all_listing_infos = [] +) -> Table: + file_infos = FileInfos() for path in paths: if fs is None: fs = get_filesystem_from_path(path) - listing_infos = glob_path_with_stats(path, file_format_config, fs) - if len(listing_infos) == 0: + path_file_infos = glob_path_with_stats(path, file_format_config, fs) + if len(path_file_infos) == 0: raise FileNotFoundError(f"No files found at {path}") - all_listing_infos.extend(listing_infos) - - # Hardcoded to 1 partition - partition = Table.from_pydict( - { - "path": pa.array([file_info.path for file_info in all_listing_infos], type=pa.string()), - "size": pa.array([file_info.size for file_info in all_listing_infos], type=pa.int64()), - "type": pa.array([file_info.type for file_info in all_listing_infos], type=pa.string()), - "rows": pa.array([file_info.rows for file_info in all_listing_infos], type=pa.int64()), - }, - ) - assert partition.schema() == schema, f"Schema should be expected: {schema}, but received: {partition.schema()}" - - partition_ref = ray.put(partition) - partition_refs = [(0, partition_ref)] + file_infos.extend(path_file_infos) - return partition_refs + return file_infos @ray.remote @@ -135,18 +120,14 @@ def remote_len_partition(p: Table) -> int: @ray.remote -def sample_schema_from_filepath_vpartition( - p: Table, - filepath_column: str, +def sample_schema_from_filepath( + first_file_path: str, file_format_config: FileFormatConfig, fs: fsspec.AbstractFileSystem | None, ) -> Schema: - """Ray remote function to run schema sampling on top of a Table containing filepaths""" - assert len(p) > 0 - + """Ray remote function to run schema sampling on top of a Table containing a single filepath""" # Currently just samples the Schema from the first file - first_filepath = p.to_pydict()[filepath_column][0] - return runner_io.sample_schema(first_filepath, file_format_config, fs) + return runner_io.sample_schema(first_file_path, file_format_config, fs) @dataclass @@ -221,38 +202,29 @@ def wait(self) -> None: ray.wait([o for o in self._partitions.values()]) -class RayRunnerIO(runner_io.RunnerIO[ray.ObjectRef]): +class RayRunnerIO(runner_io.RunnerIO): def glob_paths_details( self, source_paths: list[str], file_format_config: FileFormatConfig | None = None, fs: fsspec.AbstractFileSystem | None = None, - ) -> RayPartitionSet: - partition_refs = ray.get( - _glob_path_into_details_vpartitions.remote( - source_paths, RayRunnerIO.FS_LISTING_SCHEMA, file_format_config, fs=fs - ) - ) - return RayPartitionSet({part_id: part for part_id, part in partition_refs}) + ) -> FileInfos: + # Synchronously fetch the file infos, for now. + return ray.get(_glob_path_into_file_infos.remote(source_paths, file_format_config, fs=fs)) def get_schema_from_first_filepath( self, - listing_details_partitions: PartitionSet[ray.ObjectRef], + file_infos: FileInfos, file_format_config: FileFormatConfig, fs: fsspec.AbstractFileSystem | None, ) -> Schema: - nonempty_partitions: list[ray.ObjectRef] = [ - p - for p, p_len in zip(listing_details_partitions.values(), listing_details_partitions.len_of_partitions()) - if p_len > 0 - ] - if len(nonempty_partitions) == 0: + if len(file_infos) == 0: raise ValueError("No files to get schema from") - partition: ray.ObjectRef = nonempty_partitions[0] + # Naively retrieve the first filepath in the file info table. + first_path = file_infos[0].file_path return ray.get( - sample_schema_from_filepath_vpartition.remote( - partition, - RayRunnerIO.FS_LISTING_PATH_COLUMN_NAME, + sample_schema_from_filepath.remote( + first_path, file_format_config, fs, ) diff --git a/daft/runners/runner_io.py b/daft/runners/runner_io.py index 02b1822f0e..45be394dbb 100644 --- a/daft/runners/runner_io.py +++ b/daft/runners/runner_io.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import abstractmethod -from typing import Generic, TypeVar +from typing import TypeVar import fsspec @@ -9,61 +9,45 @@ CsvSourceConfig, FileFormat, FileFormatConfig, + FileInfos, JsonSourceConfig, ParquetSourceConfig, ) -from daft.datatype import DataType from daft.filesystem import get_filesystem_from_path from daft.logical.schema import Schema -from daft.runners.partitioning import PartitionSet, TableParseCSVOptions +from daft.runners.partitioning import TableParseCSVOptions from daft.table import schema_inference PartitionT = TypeVar("PartitionT") -class RunnerIO(Generic[PartitionT]): +class RunnerIO: """Reading and writing data from the Runner. This is an abstract class and each runner must write their own implementation. - - This is a generic class and each runner needs to parametrize their implementation with the appropriate - PartitionT that it uses for in-memory data representation. """ - FS_LISTING_PATH_COLUMN_NAME = "path" - FS_LISTING_SIZE_COLUMN_NAME = "size" - FS_LISTING_TYPE_COLUMN_NAME = "type" - FS_LISTING_ROWS_COLUMN_NAME = "rows" - FS_LISTING_SCHEMA = Schema._from_field_name_and_types( - [ - (FS_LISTING_SIZE_COLUMN_NAME, DataType.int64()), - (FS_LISTING_PATH_COLUMN_NAME, DataType.string()), - (FS_LISTING_TYPE_COLUMN_NAME, DataType.string()), - (FS_LISTING_ROWS_COLUMN_NAME, DataType.int64()), - ] - ) - @abstractmethod def glob_paths_details( self, source_path: list[str], file_format_config: FileFormatConfig | None = None, fs: fsspec.AbstractFileSystem | None = None, - ) -> PartitionSet[PartitionT]: - """Globs the specified filepath to construct Partitions containing file and dir metadata + ) -> FileInfos: + """Globs the specified filepath to construct a FileInfos object containing file and dir metadata. Args: source_path (str): path to glob Returns: - PartitionSet[PartitionT]: Partitions containing the listings' metadata + FileInfo: The file infos for the globbed paths. """ raise NotImplementedError() @abstractmethod def get_schema_from_first_filepath( self, - listing_details_partitions: PartitionSet[PartitionT], + file_info: FileInfos, file_format_config: FileFormatConfig, fs: fsspec.AbstractFileSystem | None, ) -> Schema: diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 901e1daa4c..d17cf8c7a3 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -9,7 +9,7 @@ use { planner::plan, sink_info::{OutputFileInfo, SinkInfo}, source_info::{ - ExternalInfo as ExternalSourceInfo, FileInfo as InputFileInfo, InMemoryInfo, + ExternalInfo as ExternalSourceInfo, FileInfos as InputFileInfos, InMemoryInfo, PyFileFormatConfig, SourceInfo, }, FileFormat, JoinType, PartitionScheme, PartitionSpec, PhysicalPlanScheduler, @@ -58,16 +58,14 @@ impl LogicalPlanBuilder { #[staticmethod] pub fn table_scan( - file_paths: Vec, - file_sizes: Vec>, - file_rows: Vec>, + file_infos: InputFileInfos, schema: &PySchema, file_format_config: PyFileFormatConfig, ) -> PyResult { - let num_partitions = file_paths.len(); + let num_partitions = file_infos.len(); let source_info = SourceInfo::ExternalInfo(ExternalSourceInfo::new( schema.schema.clone(), - InputFileInfo::new(file_paths, file_sizes, file_rows).into(), + file_infos.into(), file_format_config.into(), )); let partition_spec = PartitionSpec::new(PartitionScheme::Unknown, num_partitions, None); diff --git a/src/daft-plan/src/lib.rs b/src/daft-plan/src/lib.rs index 48bff0dab2..1f03a2c608 100644 --- a/src/daft-plan/src/lib.rs +++ b/src/daft-plan/src/lib.rs @@ -23,7 +23,8 @@ pub use partitioning::{PartitionScheme, PartitionSpec}; pub use physical_plan::PhysicalPlanScheduler; pub use resource_request::ResourceRequest; pub use source_info::{ - CsvSourceConfig, FileFormat, JsonSourceConfig, ParquetSourceConfig, PyFileFormatConfig, + CsvSourceConfig, FileFormat, FileInfo, FileInfos, JsonSourceConfig, ParquetSourceConfig, + PyFileFormatConfig, }; #[cfg(feature = "python")] @@ -42,6 +43,8 @@ pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { parent.add_class::()?; parent.add_class::()?; parent.add_class::()?; + parent.add_class::()?; + parent.add_class::()?; Ok(()) } diff --git a/src/daft-plan/src/ops/source.rs b/src/daft-plan/src/ops/source.rs index c0feae5585..8e19c9c33e 100644 --- a/src/daft-plan/src/ops/source.rs +++ b/src/daft-plan/src/ops/source.rs @@ -66,11 +66,11 @@ impl Source { match self.source_info.as_ref() { SourceInfo::ExternalInfo(ExternalInfo { source_schema, - file_info, + file_infos, file_format_config, }) => { res.push(format!("Source: {:?}", file_format_config.var_name())); - for fp in file_info.file_paths.iter() { + for fp in file_infos.file_paths.iter() { res.push(format!("File paths = {}", fp)); } res.push(format!("File schema = {}", source_schema.short_string())); diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index 91cbbbca8b..6e3f50ae76 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -3,14 +3,13 @@ use { crate::{ sink_info::OutputFileInfo, source_info::{ - ExternalInfo, FileFormat, FileFormatConfig, FileInfo, InMemoryInfo, PyFileFormatConfig, + ExternalInfo, FileFormat, FileFormatConfig, FileInfos, InMemoryInfo, PyFileFormatConfig, }, }, daft_core::python::schema::PySchema, daft_core::schema::SchemaRef, daft_dsl::python::PyExpr, daft_dsl::Expr, - daft_table::python::PyTable, pyo3::{ exceptions::PyValueError, pyclass, pymethods, @@ -123,11 +122,10 @@ fn tabular_scan( py: Python<'_>, source_schema: &SchemaRef, projection_schema: &SchemaRef, - file_info: &Arc, + file_infos: &Arc, file_format_config: &Arc, limit: &Option, ) -> PyResult { - let file_info_table: PyTable = file_info.to_table()?.into(); let columns_to_read = projection_schema .fields .iter() @@ -140,7 +138,7 @@ fn tabular_scan( .call1(( PySchema::from(source_schema.clone()), columns_to_read, - file_info_table, + file_infos.to_table()?, PyFileFormatConfig::from(file_format_config.clone()), *limit, ))?; @@ -203,7 +201,7 @@ impl PhysicalPlan { external_info: ExternalInfo { source_schema, - file_info, + file_infos, file_format_config, .. }, @@ -213,7 +211,7 @@ impl PhysicalPlan { py, source_schema, projection_schema, - file_info, + file_infos, file_format_config, limit, ), @@ -222,7 +220,7 @@ impl PhysicalPlan { external_info: ExternalInfo { source_schema, - file_info, + file_infos, file_format_config, .. }, @@ -232,7 +230,7 @@ impl PhysicalPlan { py, source_schema, projection_schema, - file_info, + file_infos, file_format_config, limit, ), @@ -241,7 +239,7 @@ impl PhysicalPlan { external_info: ExternalInfo { source_schema, - file_info, + file_infos, file_format_config, .. }, @@ -251,7 +249,7 @@ impl PhysicalPlan { py, source_schema, projection_schema, - file_info, + file_infos, file_format_config, limit, ), diff --git a/src/daft-plan/src/source_info.rs b/src/daft-plan/src/source_info.rs index 6b84accd43..52e26614a7 100644 --- a/src/daft-plan/src/source_info.rs +++ b/src/daft-plan/src/source_info.rs @@ -16,8 +16,9 @@ use daft_table::Table; #[cfg(feature = "python")] use { daft_io::python::IOConfig as PyIOConfig, + daft_table::python::PyTable, pyo3::{ - exceptions::PyValueError, + exceptions::{PyKeyError, PyValueError}, pyclass, pyclass::CompareOp, pymethods, @@ -154,33 +155,119 @@ impl Hash for InMemoryInfo { #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct ExternalInfo { pub source_schema: SchemaRef, - pub file_info: Arc, + pub file_infos: Arc, pub file_format_config: Arc, } impl ExternalInfo { pub fn new( source_schema: SchemaRef, - file_info: Arc, + file_infos: Arc, file_format_config: Arc, ) -> Self { Self { source_schema, - file_info, + file_infos, file_format_config, } } } -#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[cfg_attr(feature = "python", pyclass(module = "daft.daft", get_all))] pub struct FileInfo { + pub file_path: String, + pub file_size: Option, + pub num_rows: Option, +} + +#[cfg(feature = "python")] +#[pymethods] +impl FileInfo { + #[new] + pub fn new(file_path: String, file_size: Option, num_rows: Option) -> Self { + Self::new_internal(file_path, file_size, num_rows) + } +} + +impl FileInfo { + pub fn new_internal(file_path: String, file_size: Option, num_rows: Option) -> Self { + Self { + file_path, + file_size, + num_rows, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[cfg_attr(feature = "python", pyclass(module = "daft.daft", get_all))] +pub struct FileInfos { pub file_paths: Vec, pub file_sizes: Vec>, pub num_rows: Vec>, } -impl FileInfo { - pub fn new( +#[cfg(feature = "python")] +#[pymethods] +impl FileInfos { + #[new] + #[pyo3(signature = (*args))] + pub fn new(args: &PyTuple) -> PyResult { + match args.len() { + // Create an empty FileInfos, to be overridden by __setstate__ and/or extended with self.extend(). + 0 => Ok(Self::new_internal(vec![], vec![], vec![])), + _ => Err(PyValueError::new_err(format!( + "expected no arguments to make new FileInfos, got : {}", + args.len() + ))), + } + } + + #[staticmethod] + pub fn from_infos( + file_paths: Vec, + file_sizes: Vec>, + num_rows: Vec>, + ) -> Self { + Self::new_internal(file_paths, file_sizes, num_rows) + } + + #[staticmethod] + pub fn from_table(table: PyTable) -> PyResult { + Ok(Self::from_table_internal(table.table)?) + } + + pub fn extend(&mut self, new_infos: Self) { + self.file_paths.extend(new_infos.file_paths); + self.file_sizes.extend(new_infos.file_sizes); + self.num_rows.extend(new_infos.num_rows); + } + + pub fn __getitem__(&self, idx: isize) -> PyResult { + if idx as usize >= self.len() { + return Err(PyKeyError::new_err(idx)); + } + Ok(FileInfo::new_internal( + self.file_paths[0].clone(), + self.file_sizes[0], + self.num_rows[0], + )) + } + + pub fn to_table(&self) -> PyResult { + Ok(self.to_table_internal()?.into()) + } + + pub fn __len__(&self) -> PyResult { + Ok(self.len()) + } +} + +impl_bincode_py_state_serialization!(FileInfos); + +impl FileInfos { + pub fn new_internal( file_paths: Vec, file_sizes: Vec>, num_rows: Vec>, @@ -191,29 +278,62 @@ impl FileInfo { num_rows, } } - pub fn to_table(&self) -> DaftResult { - let file_paths: Vec> = - self.file_paths.iter().map(|s| Some(s.as_str())).collect(); - let num_files = file_paths.len(); + + pub fn from_table_internal(table: Table) -> DaftResult { + let file_paths = table + .get_column("file_paths")? + .utf8()? + .data() + .as_any() + .downcast_ref::>() + .unwrap() + .iter() + .map(|s| s.unwrap().to_string()) + .collect::>(); + let file_sizes = table + .get_column("file_sizes")? + .i64()? + .data() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|n| n.cloned()) + .collect::>(); + let num_rows = table + .get_column("num_rows")? + .i64()? + .data() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|n| n.cloned()) + .collect::>(); + Ok(Self::new_internal(file_paths, file_sizes, num_rows)) + } + + pub fn len(&self) -> usize { + self.file_paths.len() + } + + pub fn is_empty(&self) -> bool { + self.file_paths.is_empty() + } + + pub fn to_table_internal(&self) -> DaftResult
{ let columns = vec![ Series::try_from(( - "path", - arrow2::array::Utf8Array::::from(file_paths).to_boxed(), + "file_paths", + arrow2::array::Utf8Array::::from_iter_values(self.file_paths.iter()) + .to_boxed(), ))?, Series::try_from(( - "size", + "file_sizes", arrow2::array::PrimitiveArray::::from(&self.file_sizes).to_boxed(), ))?, Series::try_from(( - "type", - arrow2::array::Utf8Array::::new_null( - arrow2::datatypes::DataType::LargeUtf8, - num_files, - ) - .to_boxed(), - ))?, - Series::try_from(( - "rows", + "num_rows", arrow2::array::PrimitiveArray::::from(&self.num_rows).to_boxed(), ))?, ]; diff --git a/src/daft-plan/src/test/mod.rs b/src/daft-plan/src/test/mod.rs index ad38710d4d..f763afe14b 100644 --- a/src/daft-plan/src/test/mod.rs +++ b/src/daft-plan/src/test/mod.rs @@ -4,7 +4,7 @@ use daft_core::{datatypes::Field, schema::Schema}; use crate::{ ops::Source, - source_info::{ExternalInfo, FileFormatConfig, FileInfo, SourceInfo}, + source_info::{ExternalInfo, FileFormatConfig, FileInfos, SourceInfo}, JsonSourceConfig, PartitionSpec, }; @@ -15,7 +15,7 @@ pub fn dummy_scan_node(fields: Vec) -> Source { schema.clone(), SourceInfo::ExternalInfo(ExternalInfo::new( schema.clone(), - FileInfo::new(vec!["/foo".to_string()], vec![None], vec![None]).into(), + FileInfos::new_internal(vec!["/foo".to_string()], vec![None], vec![None]).into(), FileFormatConfig::Json(JsonSourceConfig {}).into(), )) .into(), diff --git a/tests/cookbook/test_dataloading.py b/tests/cookbook/test_dataloading.py index 2cb7045207..7eb9f9ecbe 100644 --- a/tests/cookbook/test_dataloading.py +++ b/tests/cookbook/test_dataloading.py @@ -84,12 +84,12 @@ def test_glob_files(tmpdir): daft_df = daft.from_glob_path(f"{tmpdir}/*.foo") daft_pd_df = daft_df.to_pandas() pd_df = pd.DataFrame.from_records( - {"path": str(path), "size": size, "type": "file", "rows": None} + {"file_paths": str(path), "file_sizes": size, "num_rows": None} for path, size in zip(filepaths, list(range(10))) ) - pd_df = pd_df[~pd_df["path"].str.endswith(".bar")] - pd_df = pd_df.astype({"rows": float}) - assert_df_equals(daft_pd_df, pd_df, sort_key="path") + pd_df = pd_df[~pd_df["file_paths"].str.endswith(".bar")] + pd_df = pd_df.astype({"num_rows": float}) + assert_df_equals(daft_pd_df, pd_df, sort_key="file_paths") def test_glob_files_single_file(tmpdir): @@ -97,9 +97,9 @@ def test_glob_files_single_file(tmpdir): filepath.write_text("b" * 10) daft_df = daft.from_glob_path(f"{tmpdir}/file.foo") daft_pd_df = daft_df.to_pandas() - pd_df = pd.DataFrame.from_records([{"path": str(filepath), "size": 10, "type": "file", "rows": None}]) - pd_df = pd_df.astype({"rows": float}) - assert_df_equals(daft_pd_df, pd_df, sort_key="path") + pd_df = pd.DataFrame.from_records([{"file_paths": str(filepath), "file_sizes": 10, "num_rows": None}]) + pd_df = pd_df.astype({"num_rows": float}) + assert_df_equals(daft_pd_df, pd_df, sort_key="file_paths") def test_glob_files_directory(tmpdir): @@ -116,16 +116,16 @@ def test_glob_files_directory(tmpdir): daft_pd_df = daft_df.to_pandas() listing_records = [ - {"path": str(path), "size": size, "type": "file", "rows": None} + {"file_paths": str(path), "file_sizes": size, "num_rows": None} for path, size in zip(filepaths, [i for i in range(10) for _ in range(2)]) ] listing_records = listing_records + [ - {"path": str(extra_empty_dir), "size": extra_empty_dir.stat().st_size, "type": "directory", "rows": None} + {"file_paths": str(extra_empty_dir), "file_sizes": extra_empty_dir.stat().st_size, "num_rows": None} ] pd_df = pd.DataFrame.from_records(listing_records) - pd_df = pd_df.astype({"rows": float}) + pd_df = pd_df.astype({"num_rows": float}) - assert_df_equals(daft_pd_df, pd_df, sort_key="path") + assert_df_equals(daft_pd_df, pd_df, sort_key="file_paths") def test_glob_files_recursive(tmpdir): @@ -142,16 +142,16 @@ def test_glob_files_recursive(tmpdir): daft_pd_df = daft_df.to_pandas() listing_records = [ - {"path": str(path), "size": size, "type": "file", "rows": None} + {"file_paths": str(path), "file_sizes": size, "num_rows": None} for path, size in zip(paths, [i for i in range(10) for _ in range(2)]) ] listing_records = listing_records + [ - {"path": str(nested_dir_path), "size": nested_dir_path.stat().st_size, "type": "directory", "rows": None} + {"file_paths": str(nested_dir_path), "file_sizes": nested_dir_path.stat().st_size, "num_rows": None} ] pd_df = pd.DataFrame.from_records(listing_records) - pd_df = pd_df.astype({"rows": float}) + pd_df = pd_df.astype({"num_rows": float}) - assert_df_equals(daft_pd_df, pd_df, sort_key="path") + assert_df_equals(daft_pd_df, pd_df, sort_key="file_paths") @pytest.mark.skipif(get_context().runner_config.name not in {"py"}, reason="requires PyRunner to be in use") @@ -176,9 +176,9 @@ def test_glob_files_custom_fs(tmpdir): daft_pd_df = daft_df.to_pandas() pd_df = pd.DataFrame.from_records( - {"path": str(path), "size": size, "type": "file", "rows": None} + {"file_paths": str(path), "file_sizes": size, "num_rows": None} for path, size in zip(filepaths, list(range(10))) ) - pd_df = pd_df[~pd_df["path"].str.endswith(".bar")] - pd_df = pd_df.astype({"rows": float}) - assert_df_equals(daft_pd_df, pd_df, sort_key="path") + pd_df = pd_df[~pd_df["file_paths"].str.endswith(".bar")] + pd_df = pd_df.astype({"num_rows": float}) + assert_df_equals(daft_pd_df, pd_df, sort_key="file_paths") diff --git a/tests/cookbook/test_image.py b/tests/cookbook/test_image.py index 02f5d3efad..94a449286e 100644 --- a/tests/cookbook/test_image.py +++ b/tests/cookbook/test_image.py @@ -68,7 +68,7 @@ def test_image_decode() -> None: df = ( daft.from_glob_path(f"{ASSET_FOLDER}/images/**") .into_partitions(2) - .with_column("image", col("path").url.download().image.decode().image.resize(10, 10)) + .with_column("image", col("file_paths").url.download().image.decode().image.resize(10, 10)) ) target_dtype = DataType.image() assert df.schema()["image"].dtype == target_dtype diff --git a/tests/optimizer/test_pushdown_clauses_into_scan.py b/tests/optimizer/test_pushdown_clauses_into_scan.py index 2d63b94bf8..2105a080ca 100644 --- a/tests/optimizer/test_pushdown_clauses_into_scan.py +++ b/tests/optimizer/test_pushdown_clauses_into_scan.py @@ -29,7 +29,6 @@ def test_push_projection_scan_all_cols(valid_data_json_path: str, optimizer): file_format_config=df_unoptimized_scan._get_current_builder()._plan._file_format_config, fs=df_unoptimized_scan._get_current_builder()._plan._fs, filepaths_child=df_unoptimized_scan._get_current_builder()._plan._filepaths_child, - filepaths_column_name=df_unoptimized_scan._get_current_builder()._plan._filepaths_column_name, ).to_builder() ) @@ -50,7 +49,6 @@ def test_push_projection_scan_all_cols_alias(valid_data_json_path: str, optimize file_format_config=df_unoptimized_scan._get_current_builder()._plan._file_format_config, fs=df_unoptimized_scan._get_current_builder()._plan._fs, filepaths_child=df_unoptimized_scan._get_current_builder()._plan._filepaths_child, - filepaths_column_name=df_unoptimized_scan._get_current_builder()._plan._filepaths_column_name, ).to_builder() ) df_optimized = df_optimized.select(col("sepal_length").alias("foo")) @@ -72,7 +70,6 @@ def test_push_projection_scan_some_cols_aliases(valid_data_json_path: str, optim file_format_config=df_unoptimized_scan._get_current_builder()._plan._file_format_config, fs=df_unoptimized_scan._get_current_builder()._plan._fs, filepaths_child=df_unoptimized_scan._get_current_builder()._plan._filepaths_child, - filepaths_column_name=df_unoptimized_scan._get_current_builder()._plan._filepaths_column_name, ).to_builder() ) df_optimized = df_optimized.select(col("sepal_length").alias("foo"), col("sepal_width") + 1)