Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] [New Query Planner] Refactor file globbing logic by exposing FileInfos to Python #1307

Merged
merged 1 commit into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,6 @@ class ReadFile(SingleOutputInstruction):
fs: fsspec.AbstractFileSystem | None
columns_to_read: list[str] | None
file_format_config: FileFormatConfig
filepaths_column_name: str

def run(self, inputs: list[Table]) -> list[Table]:
return self._read_file(inputs)
Expand Down Expand Up @@ -352,10 +351,7 @@ def _handle_tabular_files_scan(
filepaths_partition: Table,
) -> Table:
data = filepaths_partition.to_pydict()
assert (
self.filepaths_column_name in data
), f"TabularFilesScan should be ran on vPartitions with '{self.filepaths_column_name}' column"
filepaths = data[self.filepaths_column_name]
filepaths = data["file_paths"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this a constant somewhere?

Copy link
Contributor Author

@clarkzinzow clarkzinzow Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh we weren't using a constant for "file_sizes" or "num_rows", and I personally think that we can keep the contract informal for now since we're going to continue to rework this as (1) we move to the new query planner, (2) we move to the Rust-native file globber, and (3) we move file globbing to execution time.

If you feel strongly about this, I can try to expose the column names on FileInfo and FileInfos somehow, and propagate them as sidecar data whenever we pass around the Table representation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naw I don't feel strongly about this 😛 lgtm!


if self.index is not None:
filepaths = [filepaths[self.index]]
Expand Down
7 changes: 3 additions & 4 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ def file_read(
fs: fsspec.AbstractFileSystem | None,
columns_to_read: list[str] | None,
file_format_config: FileFormatConfig,
filepaths_column_name: str,
) -> InProgressPhysicalPlan[PartitionT]:
"""child_plan represents partitions with filenames.

Expand All @@ -85,8 +84,9 @@ def file_read(
done_task = materializations.popleft()

vpartition = done_task.vpartition()
file_sizes_bytes = vpartition.to_pydict()["size"]
file_rows = vpartition.to_pydict()["rows"]
file_infos = vpartition.to_pydict()
file_sizes_bytes = file_infos["file_sizes"]
file_rows = file_infos["num_rows"]

# Emit one partition for each file (NOTE: hardcoded for now).
for i in range(len(vpartition)):
Expand All @@ -102,7 +102,6 @@ def file_read(
fs=fs,
columns_to_read=columns_to_read,
file_format_config=file_format_config,
filepaths_column_name=filepaths_column_name,
),
# Set the filesize as the memory request.
# (Note: this is very conservative; file readers empirically use much more peak memory than 1x file size.)
Expand Down
1 change: 0 additions & 1 deletion daft/execution/physical_plan_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ def _get_physical_plan(node: LogicalPlan, psets: dict[str, list[PartitionT]]) ->
fs=node._fs,
columns_to_read=node._column_names,
file_format_config=node._file_format_config,
filepaths_column_name=node._filepaths_column_name,
)

elif isinstance(node, logical_plan.Filter):
Expand Down
5 changes: 1 addition & 4 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,13 @@ def tabular_scan(
parts_t = cast(Iterator[PartitionT], parts)

file_info_iter = physical_plan.partition_read(iter(parts_t))
filepaths_column_name = get_context().runner().runner_io().FS_LISTING_PATH_COLUMN_NAME
pyschema = Schema._from_pyschema(schema)
return physical_plan.file_read(
child_plan=file_info_iter,
limit_rows=limit,
schema=pyschema,
schema=Schema._from_pyschema(schema),
fs=None,
columns_to_read=columns_to_read,
file_format_config=file_format_config,
filepaths_column_name=filepaths_column_name,
)


Expand Down
19 changes: 11 additions & 8 deletions daft/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
_resolve_filesystem_and_path,
)

from daft.daft import FileFormat, FileFormatConfig, ParquetSourceConfig
from daft.daft import FileFormat, FileFormatConfig, FileInfos, ParquetSourceConfig
from daft.table import Table

_CACHED_FSES: dict[str, FileSystem] = {}
Expand Down Expand Up @@ -295,7 +295,7 @@ def glob_path_with_stats(
path: str,
file_format_config: FileFormatConfig | None,
fs: fsspec.AbstractFileSystem,
) -> list[ListingInfo]:
) -> FileInfos:
"""Glob a path, returning a list ListingInfo."""
protocol = get_protocol_from_path(path)

Expand All @@ -307,13 +307,11 @@ def glob_path_with_stats(
for path, details in globbed_data.items():
path = _ensure_path_protocol(protocol, path)
filepaths_to_infos[path]["size"] = details["size"]
filepaths_to_infos[path]["type"] = details["type"]

elif fs.isfile(path):
file_info = fs.info(path)

filepaths_to_infos[path]["size"] = file_info["size"]
filepaths_to_infos[path]["type"] = file_info["type"]

elif fs.isdir(path):
files_info = fs.ls(path, detail=True)
Expand All @@ -322,7 +320,6 @@ def glob_path_with_stats(
path = file_info["name"]
path = _ensure_path_protocol(protocol, path)
filepaths_to_infos[path]["size"] = file_info["size"]
filepaths_to_infos[path]["type"] = file_info["type"]

else:
raise FileNotFoundError(f"File or directory not found: {path}")
Expand All @@ -342,9 +339,15 @@ def glob_path_with_stats(
for path, parquet_metadata in zip(filepaths_to_infos.keys(), parquet_metadatas):
filepaths_to_infos[path]["rows"] = parquet_metadata.num_rows

return [
ListingInfo(path=_ensure_path_protocol(protocol, path), **infos) for path, infos in filepaths_to_infos.items()
]
file_paths = []
file_sizes = []
num_rows = []
for path, infos in filepaths_to_infos.items():
file_paths.append(_ensure_path_protocol(protocol, path))
file_sizes.append(infos.get("size"))
num_rows.append(infos.get("rows"))

return FileInfos.from_infos(file_paths=file_paths, file_sizes=file_sizes, num_rows=num_rows)


def _get_parquet_metadata_single(path: str) -> pa.parquet.FileMetadata:
Expand Down
14 changes: 12 additions & 2 deletions daft/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,21 @@ def _get_tabular_files_scan(
"""Returns a TabularFilesScan LogicalPlan for a given glob filepath."""
paths = path if isinstance(path, list) else [str(path)]
schema_hint = _get_schema_from_hints(schema_hints) if schema_hints is not None else None
# Glob the path using the Runner
runner_io = get_context().runner().runner_io()
file_infos = runner_io.glob_paths_details(paths, file_format_config, fs)

# Infer schema if no hints provided
inferred_or_provided_schema = (
schema_hint
if schema_hint is not None
else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, fs)
)
# Construct plan
builder_cls = get_context().logical_plan_builder_class()
builder = builder_cls.from_tabular_scan(
paths=paths,
schema_hint=schema_hint,
file_infos=file_infos,
schema=inferred_or_provided_schema,
file_format_config=file_format_config,
fs=fs,
)
Expand Down
12 changes: 8 additions & 4 deletions daft/io/file_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from daft.context import get_context
from daft.daft import PartitionScheme, PartitionSpec
from daft.dataframe import DataFrame
from daft.runners.pyrunner import LocalPartitionSet
from daft.table import Table


@PublicAPI
Expand Down Expand Up @@ -43,12 +45,14 @@ def from_glob_path(path: str, fs: Optional[fsspec.AbstractFileSystem] = None) ->
"""
context = get_context()
runner_io = context.runner().runner_io()
partition_set = runner_io.glob_paths_details([path], fs=fs)
cache_entry = context.runner().put_partition_set_into_cache(partition_set)
file_infos = runner_io.glob_paths_details([path], fs=fs)
file_infos_table = Table._from_pytable(file_infos.to_table())
partition = LocalPartitionSet({0: file_infos_table})
cache_entry = context.runner().put_partition_set_into_cache(partition)
builder_cls = context.logical_plan_builder_class()
builder = builder_cls.from_in_memory_scan(
cache_entry,
schema=runner_io.FS_LISTING_SCHEMA,
partition_spec=PartitionSpec(PartitionScheme.Unknown, partition_set.num_partitions()),
schema=file_infos_table.schema(),
partition_spec=PartitionSpec(PartitionScheme.Unknown, partition.num_partitions()),
)
return DataFrame(builder)
5 changes: 3 additions & 2 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from daft.daft import (
FileFormat,
FileFormatConfig,
FileInfos,
JoinType,
PartitionScheme,
PartitionSpec,
Expand Down Expand Up @@ -80,9 +81,9 @@ def from_in_memory_scan(
def from_tabular_scan(
cls,
*,
paths: list[str],
file_infos: FileInfos,
schema: Schema,
file_format_config: FileFormatConfig,
schema_hint: Schema | None,
fs: fsspec.AbstractFileSystem | None,
) -> LogicalPlanBuilder:
pass
Expand Down
40 changes: 12 additions & 28 deletions daft/logical/logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from daft.daft import (
FileFormat,
FileFormatConfig,
FileInfos,
JoinType,
PartitionScheme,
PartitionSpec,
Expand All @@ -28,6 +29,7 @@
from daft.logical.map_partition_ops import ExplodeOp, MapPartitionOp
from daft.logical.schema import Schema
from daft.runners.partitioning import PartitionCacheEntry
from daft.runners.pyrunner import LocalPartitionSet
from daft.table import Table

if TYPE_CHECKING:
Expand Down Expand Up @@ -111,39 +113,30 @@
def from_tabular_scan(
cls,
*,
paths: list[str],
file_infos: FileInfos,
schema: Schema,
file_format_config: FileFormatConfig,
schema_hint: Schema | None,
fs: fsspec.AbstractFileSystem | None,
) -> PyLogicalPlanBuilder:
# Glob the path using the Runner
runner_io = get_context().runner().runner_io()
file_info_partition_set = runner_io.glob_paths_details(paths, file_format_config, fs)

# Infer schema if no hints provided
inferred_or_provided_schema = (
schema_hint
if schema_hint is not None
else runner_io.get_schema_from_first_filepath(file_info_partition_set, file_format_config, fs)
)
cache_entry = get_context().runner().put_partition_set_into_cache(file_info_partition_set)
file_infos_table = Table._from_pytable(file_infos.to_table())
partition = LocalPartitionSet({0: file_infos_table})
cache_entry = get_context().runner().put_partition_set_into_cache(partition)
filepath_plan = InMemoryScan(
cache_entry=cache_entry,
schema=runner_io.FS_LISTING_SCHEMA,
partition_spec=PartitionSpec(PartitionScheme.Unknown, file_info_partition_set.num_partitions()),
schema=file_infos_table.schema(),
partition_spec=PartitionSpec(PartitionScheme.Unknown, len(file_infos)),
)

return TabularFilesScan(
schema=inferred_or_provided_schema,
schema=schema,
predicate=None,
columns=None,
file_format_config=file_format_config,
fs=fs,
filepaths_child=filepath_plan,
filepaths_column_name=runner_io.FS_LISTING_PATH_COLUMN_NAME,
# WARNING: This is currently hardcoded to be the same number of partitions as rows!! This is because we emit
# one partition per filepath. This will change in the future and our logic here should change accordingly.
num_partitions=len(file_info_partition_set),
num_partitions=len(file_infos),
).to_builder()

def project(
Expand Down Expand Up @@ -445,7 +438,6 @@
predicate: ExpressionsProjection | None = None,
columns: list[str] | None = None,
filepaths_child: LogicalPlan,
filepaths_column_name: str,
num_partitions: int | None = None,
limit_rows: int | None = None,
) -> None:
Expand All @@ -472,12 +464,7 @@
self._fs = fs
self._limit_rows = limit_rows

# TabularFilesScan has a single child node that provides the filepaths to read from.
assert (
filepaths_child.schema()[filepaths_column_name] is not None
), f"TabularFileScan requires a child with '{filepaths_column_name}' column"
self._register_child(filepaths_child)
self._filepaths_column_name = filepaths_column_name

@property
def _filepaths_child(self) -> LogicalPlan:
Expand All @@ -493,7 +480,7 @@
)

def required_columns(self) -> list[set[str]]:
return [{self._filepaths_column_name} | self._predicate.required_columns()]
return [{"file_paths"} | self._predicate.required_columns()]

Check warning on line 483 in daft/logical/logical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/logical/logical_plan.py#L483

Added line #L483 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codecov comment is interesting, is this not covered by tests running on the old planner?

Copy link
Contributor Author

@clarkzinzow clarkzinzow Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm it appears that it is not. The only place that it could happen is in the (Projection, LogicalPlan) optimization rule, and that rule doesn't apply to TabularFileScan nodes:

if isinstance(child, Projection) or isinstance(child, LocalAggregate) or isinstance(child, TabularFilesScan):
return None


def input_mapping(self) -> list[dict[str, str]]:
return [dict()]
Expand All @@ -505,7 +492,6 @@
and self._predicate == other._predicate
and self._columns == other._columns
and self._file_format_config == other._file_format_config
and self._filepaths_column_name == other._filepaths_column_name
)

def rebuild(self) -> LogicalPlan:
Expand All @@ -517,7 +503,6 @@
predicate=self._predicate if self._predicate is not None else None,
columns=self._column_names,
filepaths_child=child,
filepaths_column_name=self._filepaths_column_name,
)

def copy_with_new_children(self, new_children: list[LogicalPlan]) -> LogicalPlan:
Expand All @@ -529,7 +514,6 @@
predicate=self._predicate,
columns=self._column_names,
filepaths_child=new_children[0],
filepaths_column_name=self._filepaths_column_name,
)


Expand Down
2 changes: 0 additions & 2 deletions daft/logical/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,6 @@ def _push_down_local_limit_into_scan(self, parent: LocalLimit, child: TabularFil
file_format_config=child._file_format_config,
fs=child._fs,
filepaths_child=child._filepaths_child,
filepaths_column_name=child._filepaths_column_name,
limit_rows=new_limit_rows,
)
return new_scan
Expand All @@ -359,7 +358,6 @@ def _push_down_projections_into_scan(self, parent: Projection, child: TabularFil
file_format_config=child._file_format_config,
fs=child._fs,
filepaths_child=child._filepaths_child,
filepaths_column_name=child._filepaths_column_name,
)
if any(not e._is_column() for e in parent._projection):
return parent.copy_with_new_children([new_scan])
Expand Down
25 changes: 4 additions & 21 deletions daft/logical/rust_logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import fsspec

from daft import DataType, col
from daft.context import get_context
from daft.daft import CountMode, FileFormat, FileFormatConfig, JoinType
from daft.daft import CountMode, FileFormat, FileFormatConfig, FileInfos, JoinType
from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder
from daft.daft import PartitionScheme, PartitionSpec, ResourceRequest
from daft.errors import ExpressionTypeError
Expand Down Expand Up @@ -62,30 +61,14 @@ def from_in_memory_scan(
def from_tabular_scan(
cls,
*,
paths: list[str],
file_infos: FileInfos,
schema: Schema,
file_format_config: FileFormatConfig,
schema_hint: Schema | None,
fs: fsspec.AbstractFileSystem | None,
) -> RustLogicalPlanBuilder:
if fs is not None:
raise ValueError("fsspec filesystems not supported for Rust query planner.")
# Glob the path using the Runner
runner_io = get_context().runner().runner_io()
file_info_partition_set = runner_io.glob_paths_details(paths, file_format_config, fs)

# Infer schema if no hints provided
inferred_or_provided_schema = (
schema_hint
if schema_hint is not None
else runner_io.get_schema_from_first_filepath(file_info_partition_set, file_format_config, fs)
)
paths_details = file_info_partition_set.to_pydict()
filepaths = paths_details[runner_io.FS_LISTING_PATH_COLUMN_NAME]
filesizes = paths_details[runner_io.FS_LISTING_SIZE_COLUMN_NAME]
filerows = paths_details[runner_io.FS_LISTING_ROWS_COLUMN_NAME]

rs_schema = inferred_or_provided_schema._schema
builder = _LogicalPlanBuilder.table_scan(filepaths, filesizes, filerows, rs_schema, file_format_config)
builder = _LogicalPlanBuilder.table_scan(file_infos, schema._schema, file_format_config)
return cls(builder)

def project(
Expand Down
Loading
Loading