Skip to content

Commit

Permalink
[FEAT][ScanOperator 1/3] Add MVP e2e ScanOperator integration. (#1559)
Browse files Browse the repository at this point in the history
This PR adds an e2e integration for the new `ScanOperator` for reading
from external sources, integrating with logical plan building, logical
-> physical plan translation, physical plan scheduling, physical task
execution, and the actual `MicroPartition`-based reading.

## TODOs (possibly before merging)

- [ ] Implement Python I/O backend at `MicroPartition` level.
- [ ] Implement reads for non-Parquet formats at `MicroPartition` level.
- [x] Consolidate filter/limit pushdowns to use the same `Pushdown`
struct.
- [x] Look to reinstate non-optional `TableMetadata` at the
`MicroPartition` level. (#1563)
- [x] Look to reinstate non-optional `TableStatistics` when data is
unloaded at the `MicroPartition` level. (#1563)
- [x] Integrate with globbing `ScanOperator` implementation. (#1564)
- [ ] Support different row group selection per Parquet file (currently
applies a single row group selection to all files in a scan task
bundle).
- [ ] Misc. cleanup.
- [ ] (?) Add basic validation that `ScanTask` configurations are
compatible when merging into a `ScanTaskBatch` bundle.

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
Co-authored-by: Jay Chia <[email protected]>
  • Loading branch information
3 people authored Nov 7, 2023
1 parent c8fe883 commit e176f2e
Show file tree
Hide file tree
Showing 53 changed files with 1,353 additions and 598 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 46 additions & 5 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,7 @@ class ParquetSourceConfig:
Configuration of a Parquet data source.
"""

# Whether or not to use a multithreaded tokio runtime for processing I/O
multithreaded_io: bool

def __init__(self, multithreaded_io: bool): ...
def __init__(self, coerce_int96_timestamp_unit: PyTimeUnit | None = None, row_groups: list[int] | None = None): ...

class CsvSourceConfig:
"""
Expand Down Expand Up @@ -339,9 +336,11 @@ class NativeStorageConfig:
Storage configuration for the Rust-native I/O layer.
"""

# Whether or not to use a multithreaded tokio runtime for processing I/O
multithreaded_io: bool
io_config: IOConfig

def __init__(self, io_config: IOConfig | None = None): ...
def __init__(self, multithreaded_io: bool, io_config: IOConfig | None = None): ...

class PythonStorageConfig:
"""
Expand Down Expand Up @@ -374,6 +373,42 @@ class StorageConfig:
@property
def config(self) -> NativeStorageConfig | PythonStorageConfig: ...

class ScanTask:
"""
A batch of scan tasks for reading data from an external source.
"""

def num_rows(self) -> int:
"""
Get number of rows that will be scanned by this ScanTask.
"""
...
def size_bytes(self) -> int:
"""
Get number of bytes that will be scanned by this ScanTask.
"""
...

class ScanOperatorHandle:
"""
A handle to a scan operator.
"""

@staticmethod
def anonymous_scan(
files: list[str],
schema: PySchema,
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
) -> ScanOperatorHandle: ...
@staticmethod
def glob_scan(
glob_path: str,
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
schema: PySchema | None = None,
) -> ScanOperatorHandle: ...

def read_parquet(
uri: str,
columns: list[str] | None = None,
Expand Down Expand Up @@ -722,6 +757,8 @@ class PyMicroPartition:
@staticmethod
def empty(schema: PySchema | None = None) -> PyMicroPartition: ...
@staticmethod
def from_scan_task(scan_task: ScanTask) -> PyMicroPartition: ...
@staticmethod
def from_tables(tables: list[PyTable]) -> PyMicroPartition: ...
@staticmethod
def from_arrow_record_batches(record_batches: list[pyarrow.RecordBatch], schema: PySchema) -> PyMicroPartition: ...
Expand Down Expand Up @@ -814,6 +851,10 @@ class LogicalPlanBuilder:
partition_key: str, cache_entry: PartitionCacheEntry, schema: PySchema, num_partitions: int
) -> LogicalPlanBuilder: ...
@staticmethod
def table_scan_with_scan_operator(
scan_operator: ScanOperatorHandle, schema_hint: PySchema | None
) -> LogicalPlanBuilder: ...
@staticmethod
def table_scan(
file_infos: FileInfos, schema: PySchema, file_format_config: FileFormatConfig, storage_config: StorageConfig
) -> LogicalPlanBuilder: ...
Expand Down
1 change: 0 additions & 1 deletion daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,6 @@ def _handle_tabular_files_scan(
schema=self.schema,
storage_config=self.storage_config,
read_options=read_options,
multithreaded_io=format_config.multithreaded_io,
)
for fp in filepaths
]
Expand Down
44 changes: 44 additions & 0 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Iterator, TypeVar, cast

from daft.daft import (
Expand All @@ -10,17 +11,60 @@
PySchema,
PyTable,
ResourceRequest,
ScanTask,
StorageConfig,
)
from daft.execution import execution_step, physical_plan
from daft.expressions import Expression, ExpressionsProjection
from daft.logical.map_partition_ops import MapPartitionOp
from daft.logical.schema import Schema
from daft.runners.partitioning import PartialPartitionMetadata
from daft.table import Table

PartitionT = TypeVar("PartitionT")


def scan_with_tasks(
scan_tasks: list[ScanTask],
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
"""child_plan represents partitions with filenames.
Yield a plan to read those filenames.
"""
# TODO(Clark): Currently hardcoded to have 1 file per instruction
# We can instead right-size and bundle the ScanTask into single-instruction bulk reads.
for scan_task in scan_tasks:
scan_step = execution_step.PartitionTaskBuilder[PartitionT](inputs=[], partial_metadatas=None,).add_instruction(
instruction=ScanWithTask(scan_task),
# Set the filesize as the memory request.
# (Note: this is very conservative; file readers empirically use much more peak memory than 1x file size.)
resource_request=ResourceRequest(memory_bytes=scan_task.size_bytes()),
)
yield scan_step


@dataclass(frozen=True)
class ScanWithTask(execution_step.SingleOutputInstruction):
scan_task: ScanTask

def run(self, inputs: list[Table]) -> list[Table]:
return self._scan(inputs)

def _scan(self, inputs: list[Table]) -> list[Table]:
assert len(inputs) == 0
return [Table._from_scan_task(self.scan_task)]

def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) -> list[PartialPartitionMetadata]:
assert len(input_metadatas) == 0

return [
PartialPartitionMetadata(
num_rows=self.scan_task.num_rows(),
size_bytes=None,
)
]


def tabular_scan(
schema: PySchema,
columns_to_read: list[str] | None,
Expand Down
2 changes: 1 addition & 1 deletion daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def read_csv(
)
file_format_config = FileFormatConfig.from_csv_config(csv_config)
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(io_config))
storage_config = StorageConfig.native(NativeStorageConfig(True, io_config))
else:
storage_config = StorageConfig.python(PythonStorageConfig(None, io_config=io_config))
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config)
Expand Down
8 changes: 2 additions & 6 deletions daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,9 @@ def read_parquet(
# This is because each Ray worker process receives its own pool of thread workers and connections
multithreaded_io = not context.get_context().is_ray_runner if _multithreaded_io is None else _multithreaded_io

file_format_config = FileFormatConfig.from_parquet_config(
ParquetSourceConfig(
multithreaded_io=multithreaded_io,
)
)
file_format_config = FileFormatConfig.from_parquet_config(ParquetSourceConfig())
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(io_config))
storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config))
else:
storage_config = StorageConfig.python(PythonStorageConfig(None, io_config=io_config))

Expand Down
53 changes: 50 additions & 3 deletions daft/io/common.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING

from daft.context import get_context
from daft.daft import (
FileFormatConfig,
NativeStorageConfig,
PythonStorageConfig,
ScanOperatorHandle,
StorageConfig,
)
from daft.datatype import DataType
Expand All @@ -31,9 +33,6 @@ def _get_tabular_files_scan(
storage_config: StorageConfig,
) -> LogicalPlanBuilder:
"""Returns a TabularFilesScan LogicalPlan for a given glob filepath."""
paths = path if isinstance(path, list) else [str(path)]
schema_hint = _get_schema_from_hints(schema_hints) if schema_hints is not None else None

# Glob the path using the Runner
# NOTE: Globbing will always need the IOConfig, regardless of whether "native reads" are used
io_config = None
Expand All @@ -44,6 +43,54 @@ def _get_tabular_files_scan(
else:
raise NotImplementedError(f"Tabular scan with config not implemented: {storage_config.config}")

schema_hint = _get_schema_from_hints(schema_hints) if schema_hints is not None else None

### FEATURE_FLAG: $DAFT_V2_SCANS
#
# This environment variable will make Daft use the new "v2 scans" and MicroPartitions when building Daft logical plans
if os.getenv("DAFT_V2_SCANS", "0") == "1":
assert (
os.getenv("DAFT_MICROPARTITIONS", "0") == "1"
), "DAFT_V2_SCANS=1 requires DAFT_MICROPARTITIONS=1 to be set as well"

scan_op: ScanOperatorHandle
if isinstance(path, list):
# Eagerly globs each path and fallback to AnonymousScanOperator.
# NOTE: We could instead have GlobScanOperator take a list of paths and mux the glob output streams
runner_io = get_context().runner().runner_io()
file_infos = runner_io.glob_paths_details(path, file_format_config=file_format_config, io_config=io_config)

# TODO: Should we move this into the AnonymousScanOperator itself?
# Infer schema if no hints provided
inferred_or_provided_schema = (
schema_hint
if schema_hint is not None
else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config)
)

scan_op = ScanOperatorHandle.anonymous_scan(
file_infos.file_paths,
inferred_or_provided_schema._schema,
file_format_config,
storage_config,
)
elif isinstance(path, str):
scan_op = ScanOperatorHandle.glob_scan(
path,
file_format_config,
storage_config,
schema=schema_hint._schema if schema_hint is not None else None,
)
else:
raise NotImplementedError(f"_get_tabular_files_scan cannot construct ScanOperatorHandle for input: {path}")

builder = LogicalPlanBuilder.from_tabular_scan_with_scan_operator(
scan_operator=scan_op,
schema_hint=schema_hint,
)
return builder

paths = path if isinstance(path, list) else [str(path)]
runner_io = get_context().runner().runner_io()
file_infos = runner_io.glob_paths_details(paths, file_format_config=file_format_config, io_config=io_config)

Expand Down
18 changes: 17 additions & 1 deletion daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@

from daft.daft import CountMode, FileFormat, FileFormatConfig, FileInfos, JoinType
from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder
from daft.daft import PartitionScheme, ResourceRequest, StorageConfig
from daft.daft import (
PartitionScheme,
ResourceRequest,
ScanOperatorHandle,
StorageConfig,
)
from daft.expressions import Expression, col
from daft.logical.schema import Schema
from daft.runners.partitioning import PartitionCacheEntry
Expand Down Expand Up @@ -66,6 +71,17 @@ def from_in_memory_scan(
builder = _LogicalPlanBuilder.in_memory_scan(partition.key, partition, schema._schema, num_partitions)
return cls(builder)

@classmethod
def from_tabular_scan_with_scan_operator(
cls,
*,
scan_operator: ScanOperatorHandle,
schema_hint: Schema | None,
) -> LogicalPlanBuilder:
pyschema = schema_hint._schema if schema_hint is not None else None
builder = _LogicalPlanBuilder.table_scan_with_scan_operator(scan_operator, pyschema)
return cls(builder)

@classmethod
def from_tabular_scan(
cls,
Expand Down
6 changes: 6 additions & 0 deletions daft/table/micropartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from daft.daft import IOConfig, JoinType
from daft.daft import PyMicroPartition as _PyMicroPartition
from daft.daft import PyTable as _PyTable
from daft.daft import ScanTask as _ScanTask
from daft.datatype import DataType, TimeUnit
from daft.expressions import Expression, ExpressionsProjection
from daft.logical.schema import Schema
Expand Down Expand Up @@ -64,6 +65,11 @@ def empty(schema: Schema | None = None) -> MicroPartition:
pyt = _PyMicroPartition.empty(None) if schema is None else _PyMicroPartition.empty(schema._schema)
return MicroPartition._from_pymicropartition(pyt)

@staticmethod
def _from_scan_task(scan_task: _ScanTask) -> MicroPartition:
assert isinstance(scan_task, _ScanTask)
return MicroPartition._from_pymicropartition(_PyMicroPartition.from_scan_task(scan_task))

@staticmethod
def _from_pytable(pyt: _PyTable) -> MicroPartition:
assert isinstance(pyt, _PyTable)
Expand Down
5 changes: 5 additions & 0 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from daft.arrow_utils import ensure_table
from daft.daft import JoinType
from daft.daft import PyTable as _PyTable
from daft.daft import ScanTask as _ScanTask
from daft.daft import read_csv as _read_csv
from daft.daft import read_parquet as _read_parquet
from daft.daft import read_parquet_bulk as _read_parquet_bulk
Expand Down Expand Up @@ -78,6 +79,10 @@ def empty(schema: Schema | None = None) -> Table:
pyt = _PyTable.empty(None) if schema is None else _PyTable.empty(schema._schema)
return Table._from_pytable(pyt)

@staticmethod
def _from_scan_task(_: _ScanTask) -> Table:
raise NotImplementedError("_from_scan_task is not implemented for legacy Python Table.")

@staticmethod
def _from_pytable(pyt: _PyTable) -> Table:
assert isinstance(pyt, _PyTable)
Expand Down
3 changes: 1 addition & 2 deletions daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ def read_parquet(
storage_config: StorageConfig | None = None,
read_options: TableReadOptions = TableReadOptions(),
parquet_options: TableParseParquetOptions = TableParseParquetOptions(),
multithreaded_io: bool | None = None,
) -> Table:
"""Reads a Table from a Parquet file
Expand All @@ -131,7 +130,7 @@ def read_parquet(
num_rows=read_options.num_rows,
io_config=config.io_config,
coerce_int96_timestamp_unit=parquet_options.coerce_int96_timestamp_unit,
multithreaded_io=multithreaded_io,
multithreaded_io=config.multithreaded_io,
)
return _cast_table_to_schema(tbl, read_options=read_options, schema=schema)

Expand Down
1 change: 1 addition & 0 deletions src/daft-micropartition/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ daft-scan = {path = "../daft-scan", default-features = false}
daft-stats = {path = "../daft-stats", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
indexmap = {workspace = true, features = ["serde"]}
log = {workspace = true}
parquet2 = {workspace = true}
pyo3 = {workspace = true, optional = true}
pyo3-log = {workspace = true}
Expand Down
Loading

0 comments on commit e176f2e

Please sign in to comment.