From e176f2e3e9875ada6d3a3cc96308934f564a2e0c Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Mon, 6 Nov 2023 19:28:56 -0800 Subject: [PATCH] [FEAT][ScanOperator 1/3] Add MVP e2e `ScanOperator` integration. (#1559) This PR adds an e2e integration for the new `ScanOperator` for reading from external sources, integrating with logical plan building, logical -> physical plan translation, physical plan scheduling, physical task execution, and the actual `MicroPartition`-based reading. ## TODOs (possibly before merging) - [ ] Implement Python I/O backend at `MicroPartition` level. - [ ] Implement reads for non-Parquet formats at `MicroPartition` level. - [x] Consolidate filter/limit pushdowns to use the same `Pushdown` struct. - [x] Look to reinstate non-optional `TableMetadata` at the `MicroPartition` level. (#1563) - [x] Look to reinstate non-optional `TableStatistics` when data is unloaded at the `MicroPartition` level. (#1563) - [x] Integrate with globbing `ScanOperator` implementation. (#1564) - [ ] Support different row group selection per Parquet file (currently applies a single row group selection to all files in a scan task bundle). - [ ] Misc. cleanup. - [ ] (?) Add basic validation that `ScanTask` configurations are compatible when merging into a `ScanTaskBatch` bundle. --------- Co-authored-by: Jay Chia Co-authored-by: Jay Chia <17691182+jaychia@users.noreply.github.com> --- Cargo.lock | 5 + daft/daft.pyi | 51 ++- daft/execution/execution_step.py | 1 - daft/execution/rust_physical_plan_shim.py | 44 ++ daft/io/_csv.py | 2 +- daft/io/_parquet.py | 8 +- daft/io/common.py | 53 ++- daft/logical/builder.py | 18 +- daft/table/micropartition.py | 6 + daft/table/table.py | 5 + daft/table/table_io.py | 3 +- src/daft-micropartition/Cargo.toml | 1 + src/daft-micropartition/src/micropartition.rs | 415 +++++++++++++----- src/daft-micropartition/src/ops/agg.rs | 16 +- .../src/ops/cast_to_schema.rs | 13 +- .../src/ops/eval_expressions.rs | 19 +- src/daft-micropartition/src/ops/filter.rs | 14 +- src/daft-micropartition/src/ops/join.rs | 10 +- src/daft-micropartition/src/ops/partition.rs | 10 +- src/daft-micropartition/src/ops/slice.rs | 17 +- src/daft-micropartition/src/ops/sort.rs | 7 +- src/daft-micropartition/src/ops/take.rs | 25 +- src/daft-micropartition/src/python.rs | 64 ++- src/daft-plan/Cargo.toml | 1 + src/daft-plan/src/builder.rs | 60 ++- src/daft-plan/src/lib.rs | 16 +- src/daft-plan/src/logical_ops/source.rs | 21 +- src/daft-plan/src/optimization/optimizer.rs | 2 +- .../optimization/rules/drop_repartition.rs | 2 +- .../optimization/rules/push_down_filter.rs | 30 +- .../src/optimization/rules/push_down_limit.rs | 47 +- .../rules/push_down_projection.rs | 37 +- src/daft-plan/src/physical_ops/csv.rs | 6 +- src/daft-plan/src/physical_ops/json.rs | 6 +- src/daft-plan/src/physical_ops/mod.rs | 2 + src/daft-plan/src/physical_ops/parquet.rs | 2 +- src/daft-plan/src/physical_ops/scan.rs | 21 + src/daft-plan/src/physical_plan.rs | 28 +- src/daft-plan/src/planner.rs | 30 +- src/daft-plan/src/source_info/mod.rs | 27 +- src/daft-plan/src/test/mod.rs | 9 +- src/daft-scan/Cargo.toml | 5 +- src/daft-scan/src/anonymous.rs | 99 ++--- .../src}/file_format.rs | 68 ++- src/daft-scan/src/glob.rs | 231 ++++++---- src/daft-scan/src/lib.rs | 267 ++++++++--- .../src}/py_object_serde.rs | 11 +- src/daft-scan/src/python.rs | 91 +++- .../src}/storage_config.rs | 17 +- src/daft-stats/src/partition_spec.rs | 2 +- src/daft-stats/src/table_metadata.rs | 2 +- tests/table/table_io/test_csv.py | 2 +- tests/table/table_io/test_parquet.py | 2 +- 53 files changed, 1353 insertions(+), 598 deletions(-) create mode 100644 src/daft-plan/src/physical_ops/scan.rs rename src/{daft-plan/src/source_info => daft-scan/src}/file_format.rs (74%) rename src/{daft-plan/src/source_info => daft-scan/src}/py_object_serde.rs (88%) rename src/{daft-plan/src/source_info => daft-scan/src}/storage_config.rs (91%) diff --git a/Cargo.lock b/Cargo.lock index 5763bfaebf..95ee5698e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1224,6 +1224,7 @@ dependencies = [ "daft-stats", "daft-table", "indexmap 2.0.2", + "log", "parquet2", "pyo3", "pyo3-log", @@ -1269,6 +1270,7 @@ dependencies = [ "common-io-config", "daft-core", "daft-dsl", + "daft-scan", "daft-table", "indexmap 2.0.2", "log", @@ -1284,7 +1286,9 @@ dependencies = [ name = "daft-scan" version = "0.1.10" dependencies = [ + "bincode", "common-error", + "common-io-config", "daft-core", "daft-csv", "daft-dsl", @@ -1295,6 +1299,7 @@ dependencies = [ "pyo3", "pyo3-log", "serde", + "serde_json", "snafu", "tokio", ] diff --git a/daft/daft.pyi b/daft/daft.pyi index ecc1cb11d0..3f15e13c31 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -178,10 +178,7 @@ class ParquetSourceConfig: Configuration of a Parquet data source. """ - # Whether or not to use a multithreaded tokio runtime for processing I/O - multithreaded_io: bool - - def __init__(self, multithreaded_io: bool): ... + def __init__(self, coerce_int96_timestamp_unit: PyTimeUnit | None = None, row_groups: list[int] | None = None): ... class CsvSourceConfig: """ @@ -339,9 +336,11 @@ class NativeStorageConfig: Storage configuration for the Rust-native I/O layer. """ + # Whether or not to use a multithreaded tokio runtime for processing I/O + multithreaded_io: bool io_config: IOConfig - def __init__(self, io_config: IOConfig | None = None): ... + def __init__(self, multithreaded_io: bool, io_config: IOConfig | None = None): ... class PythonStorageConfig: """ @@ -374,6 +373,42 @@ class StorageConfig: @property def config(self) -> NativeStorageConfig | PythonStorageConfig: ... +class ScanTask: + """ + A batch of scan tasks for reading data from an external source. + """ + + def num_rows(self) -> int: + """ + Get number of rows that will be scanned by this ScanTask. + """ + ... + def size_bytes(self) -> int: + """ + Get number of bytes that will be scanned by this ScanTask. + """ + ... + +class ScanOperatorHandle: + """ + A handle to a scan operator. + """ + + @staticmethod + def anonymous_scan( + files: list[str], + schema: PySchema, + file_format_config: FileFormatConfig, + storage_config: StorageConfig, + ) -> ScanOperatorHandle: ... + @staticmethod + def glob_scan( + glob_path: str, + file_format_config: FileFormatConfig, + storage_config: StorageConfig, + schema: PySchema | None = None, + ) -> ScanOperatorHandle: ... + def read_parquet( uri: str, columns: list[str] | None = None, @@ -722,6 +757,8 @@ class PyMicroPartition: @staticmethod def empty(schema: PySchema | None = None) -> PyMicroPartition: ... @staticmethod + def from_scan_task(scan_task: ScanTask) -> PyMicroPartition: ... + @staticmethod def from_tables(tables: list[PyTable]) -> PyMicroPartition: ... @staticmethod def from_arrow_record_batches(record_batches: list[pyarrow.RecordBatch], schema: PySchema) -> PyMicroPartition: ... @@ -814,6 +851,10 @@ class LogicalPlanBuilder: partition_key: str, cache_entry: PartitionCacheEntry, schema: PySchema, num_partitions: int ) -> LogicalPlanBuilder: ... @staticmethod + def table_scan_with_scan_operator( + scan_operator: ScanOperatorHandle, schema_hint: PySchema | None + ) -> LogicalPlanBuilder: ... + @staticmethod def table_scan( file_infos: FileInfos, schema: PySchema, file_format_config: FileFormatConfig, storage_config: StorageConfig ) -> LogicalPlanBuilder: ... diff --git a/daft/execution/execution_step.py b/daft/execution/execution_step.py index da03267d5e..998bcbb2df 100644 --- a/daft/execution/execution_step.py +++ b/daft/execution/execution_step.py @@ -404,7 +404,6 @@ def _handle_tabular_files_scan( schema=self.schema, storage_config=self.storage_config, read_options=read_options, - multithreaded_io=format_config.multithreaded_io, ) for fp in filepaths ] diff --git a/daft/execution/rust_physical_plan_shim.py b/daft/execution/rust_physical_plan_shim.py index b0aa071d47..2e3d0ace20 100644 --- a/daft/execution/rust_physical_plan_shim.py +++ b/daft/execution/rust_physical_plan_shim.py @@ -1,5 +1,6 @@ from __future__ import annotations +from dataclasses import dataclass from typing import Iterator, TypeVar, cast from daft.daft import ( @@ -10,17 +11,60 @@ PySchema, PyTable, ResourceRequest, + ScanTask, StorageConfig, ) from daft.execution import execution_step, physical_plan from daft.expressions import Expression, ExpressionsProjection from daft.logical.map_partition_ops import MapPartitionOp from daft.logical.schema import Schema +from daft.runners.partitioning import PartialPartitionMetadata from daft.table import Table PartitionT = TypeVar("PartitionT") +def scan_with_tasks( + scan_tasks: list[ScanTask], +) -> physical_plan.InProgressPhysicalPlan[PartitionT]: + """child_plan represents partitions with filenames. + + Yield a plan to read those filenames. + """ + # TODO(Clark): Currently hardcoded to have 1 file per instruction + # We can instead right-size and bundle the ScanTask into single-instruction bulk reads. + for scan_task in scan_tasks: + scan_step = execution_step.PartitionTaskBuilder[PartitionT](inputs=[], partial_metadatas=None,).add_instruction( + instruction=ScanWithTask(scan_task), + # Set the filesize as the memory request. + # (Note: this is very conservative; file readers empirically use much more peak memory than 1x file size.) + resource_request=ResourceRequest(memory_bytes=scan_task.size_bytes()), + ) + yield scan_step + + +@dataclass(frozen=True) +class ScanWithTask(execution_step.SingleOutputInstruction): + scan_task: ScanTask + + def run(self, inputs: list[Table]) -> list[Table]: + return self._scan(inputs) + + def _scan(self, inputs: list[Table]) -> list[Table]: + assert len(inputs) == 0 + return [Table._from_scan_task(self.scan_task)] + + def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) -> list[PartialPartitionMetadata]: + assert len(input_metadatas) == 0 + + return [ + PartialPartitionMetadata( + num_rows=self.scan_task.num_rows(), + size_bytes=None, + ) + ] + + def tabular_scan( schema: PySchema, columns_to_read: list[str] | None, diff --git a/daft/io/_csv.py b/daft/io/_csv.py index babb6c4c35..ff9ea0fd48 100644 --- a/daft/io/_csv.py +++ b/daft/io/_csv.py @@ -70,7 +70,7 @@ def read_csv( ) file_format_config = FileFormatConfig.from_csv_config(csv_config) if use_native_downloader: - storage_config = StorageConfig.native(NativeStorageConfig(io_config)) + storage_config = StorageConfig.native(NativeStorageConfig(True, io_config)) else: storage_config = StorageConfig.python(PythonStorageConfig(None, io_config=io_config)) builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config) diff --git a/daft/io/_parquet.py b/daft/io/_parquet.py index e14b751870..43dfa4053b 100644 --- a/daft/io/_parquet.py +++ b/daft/io/_parquet.py @@ -54,13 +54,9 @@ def read_parquet( # This is because each Ray worker process receives its own pool of thread workers and connections multithreaded_io = not context.get_context().is_ray_runner if _multithreaded_io is None else _multithreaded_io - file_format_config = FileFormatConfig.from_parquet_config( - ParquetSourceConfig( - multithreaded_io=multithreaded_io, - ) - ) + file_format_config = FileFormatConfig.from_parquet_config(ParquetSourceConfig()) if use_native_downloader: - storage_config = StorageConfig.native(NativeStorageConfig(io_config)) + storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config)) else: storage_config = StorageConfig.python(PythonStorageConfig(None, io_config=io_config)) diff --git a/daft/io/common.py b/daft/io/common.py index 7709d9d831..68bf279b91 100644 --- a/daft/io/common.py +++ b/daft/io/common.py @@ -1,5 +1,6 @@ from __future__ import annotations +import os from typing import TYPE_CHECKING from daft.context import get_context @@ -7,6 +8,7 @@ FileFormatConfig, NativeStorageConfig, PythonStorageConfig, + ScanOperatorHandle, StorageConfig, ) from daft.datatype import DataType @@ -31,9 +33,6 @@ def _get_tabular_files_scan( storage_config: StorageConfig, ) -> LogicalPlanBuilder: """Returns a TabularFilesScan LogicalPlan for a given glob filepath.""" - paths = path if isinstance(path, list) else [str(path)] - schema_hint = _get_schema_from_hints(schema_hints) if schema_hints is not None else None - # Glob the path using the Runner # NOTE: Globbing will always need the IOConfig, regardless of whether "native reads" are used io_config = None @@ -44,6 +43,54 @@ def _get_tabular_files_scan( else: raise NotImplementedError(f"Tabular scan with config not implemented: {storage_config.config}") + schema_hint = _get_schema_from_hints(schema_hints) if schema_hints is not None else None + + ### FEATURE_FLAG: $DAFT_V2_SCANS + # + # This environment variable will make Daft use the new "v2 scans" and MicroPartitions when building Daft logical plans + if os.getenv("DAFT_V2_SCANS", "0") == "1": + assert ( + os.getenv("DAFT_MICROPARTITIONS", "0") == "1" + ), "DAFT_V2_SCANS=1 requires DAFT_MICROPARTITIONS=1 to be set as well" + + scan_op: ScanOperatorHandle + if isinstance(path, list): + # Eagerly globs each path and fallback to AnonymousScanOperator. + # NOTE: We could instead have GlobScanOperator take a list of paths and mux the glob output streams + runner_io = get_context().runner().runner_io() + file_infos = runner_io.glob_paths_details(path, file_format_config=file_format_config, io_config=io_config) + + # TODO: Should we move this into the AnonymousScanOperator itself? + # Infer schema if no hints provided + inferred_or_provided_schema = ( + schema_hint + if schema_hint is not None + else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config) + ) + + scan_op = ScanOperatorHandle.anonymous_scan( + file_infos.file_paths, + inferred_or_provided_schema._schema, + file_format_config, + storage_config, + ) + elif isinstance(path, str): + scan_op = ScanOperatorHandle.glob_scan( + path, + file_format_config, + storage_config, + schema=schema_hint._schema if schema_hint is not None else None, + ) + else: + raise NotImplementedError(f"_get_tabular_files_scan cannot construct ScanOperatorHandle for input: {path}") + + builder = LogicalPlanBuilder.from_tabular_scan_with_scan_operator( + scan_operator=scan_op, + schema_hint=schema_hint, + ) + return builder + + paths = path if isinstance(path, list) else [str(path)] runner_io = get_context().runner().runner_io() file_infos = runner_io.glob_paths_details(paths, file_format_config=file_format_config, io_config=io_config) diff --git a/daft/logical/builder.py b/daft/logical/builder.py index 311c5e29fd..2447e54333 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -5,7 +5,12 @@ from daft.daft import CountMode, FileFormat, FileFormatConfig, FileInfos, JoinType from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder -from daft.daft import PartitionScheme, ResourceRequest, StorageConfig +from daft.daft import ( + PartitionScheme, + ResourceRequest, + ScanOperatorHandle, + StorageConfig, +) from daft.expressions import Expression, col from daft.logical.schema import Schema from daft.runners.partitioning import PartitionCacheEntry @@ -66,6 +71,17 @@ def from_in_memory_scan( builder = _LogicalPlanBuilder.in_memory_scan(partition.key, partition, schema._schema, num_partitions) return cls(builder) + @classmethod + def from_tabular_scan_with_scan_operator( + cls, + *, + scan_operator: ScanOperatorHandle, + schema_hint: Schema | None, + ) -> LogicalPlanBuilder: + pyschema = schema_hint._schema if schema_hint is not None else None + builder = _LogicalPlanBuilder.table_scan_with_scan_operator(scan_operator, pyschema) + return cls(builder) + @classmethod def from_tabular_scan( cls, diff --git a/daft/table/micropartition.py b/daft/table/micropartition.py index af25430725..05ccc545f4 100644 --- a/daft/table/micropartition.py +++ b/daft/table/micropartition.py @@ -8,6 +8,7 @@ from daft.daft import IOConfig, JoinType from daft.daft import PyMicroPartition as _PyMicroPartition from daft.daft import PyTable as _PyTable +from daft.daft import ScanTask as _ScanTask from daft.datatype import DataType, TimeUnit from daft.expressions import Expression, ExpressionsProjection from daft.logical.schema import Schema @@ -64,6 +65,11 @@ def empty(schema: Schema | None = None) -> MicroPartition: pyt = _PyMicroPartition.empty(None) if schema is None else _PyMicroPartition.empty(schema._schema) return MicroPartition._from_pymicropartition(pyt) + @staticmethod + def _from_scan_task(scan_task: _ScanTask) -> MicroPartition: + assert isinstance(scan_task, _ScanTask) + return MicroPartition._from_pymicropartition(_PyMicroPartition.from_scan_task(scan_task)) + @staticmethod def _from_pytable(pyt: _PyTable) -> MicroPartition: assert isinstance(pyt, _PyTable) diff --git a/daft/table/table.py b/daft/table/table.py index d4e28fb80a..30491e4c8f 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -8,6 +8,7 @@ from daft.arrow_utils import ensure_table from daft.daft import JoinType from daft.daft import PyTable as _PyTable +from daft.daft import ScanTask as _ScanTask from daft.daft import read_csv as _read_csv from daft.daft import read_parquet as _read_parquet from daft.daft import read_parquet_bulk as _read_parquet_bulk @@ -78,6 +79,10 @@ def empty(schema: Schema | None = None) -> Table: pyt = _PyTable.empty(None) if schema is None else _PyTable.empty(schema._schema) return Table._from_pytable(pyt) + @staticmethod + def _from_scan_task(_: _ScanTask) -> Table: + raise NotImplementedError("_from_scan_task is not implemented for legacy Python Table.") + @staticmethod def _from_pytable(pyt: _PyTable) -> Table: assert isinstance(pyt, _PyTable) diff --git a/daft/table/table_io.py b/daft/table/table_io.py index 90eaa3d5e3..bff7c028b2 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -106,7 +106,6 @@ def read_parquet( storage_config: StorageConfig | None = None, read_options: TableReadOptions = TableReadOptions(), parquet_options: TableParseParquetOptions = TableParseParquetOptions(), - multithreaded_io: bool | None = None, ) -> Table: """Reads a Table from a Parquet file @@ -131,7 +130,7 @@ def read_parquet( num_rows=read_options.num_rows, io_config=config.io_config, coerce_int96_timestamp_unit=parquet_options.coerce_int96_timestamp_unit, - multithreaded_io=multithreaded_io, + multithreaded_io=config.multithreaded_io, ) return _cast_table_to_schema(tbl, read_options=read_options, schema=schema) diff --git a/src/daft-micropartition/Cargo.toml b/src/daft-micropartition/Cargo.toml index b5eb54f447..f74c46a53c 100644 --- a/src/daft-micropartition/Cargo.toml +++ b/src/daft-micropartition/Cargo.toml @@ -11,6 +11,7 @@ daft-scan = {path = "../daft-scan", default-features = false} daft-stats = {path = "../daft-stats", default-features = false} daft-table = {path = "../daft-table", default-features = false} indexmap = {workspace = true, features = ["serde"]} +log = {workspace = true} parquet2 = {workspace = true} pyo3 = {workspace = true, optional = true} pyo3-log = {workspace = true} diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index b3b45d44e3..ac3d5f69d0 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -11,9 +11,11 @@ use daft_csv::read::read_csv; use daft_parquet::read::{ read_parquet_bulk, read_parquet_metadata_bulk, ParquetSchemaInferenceOptions, }; +use daft_scan::file_format::{CsvSourceConfig, FileFormatConfig, ParquetSourceConfig}; +use daft_scan::storage_config::{NativeStorageConfig, StorageConfig}; +use daft_scan::{DataFileSource, ScanTask}; use daft_table::Table; -use serde::{Deserialize, Serialize}; use snafu::ResultExt; use crate::DaftCoreComputeSnafu; @@ -22,33 +24,24 @@ use daft_io::{IOConfig, IOStatsRef}; use daft_stats::TableMetadata; use daft_stats::TableStatistics; -#[derive(Clone, Serialize, Deserialize)] -enum FormatParams { - Parquet { - row_groups: Option>>, - inference_options: ParquetSchemaInferenceOptions, - }, -} - -#[derive(Clone, Serialize, Deserialize)] -pub(crate) struct DeferredLoadingParams { - format_params: FormatParams, - urls: Vec, - io_config: Arc, - multithreaded_io: bool, - limit: Option, - columns: Option>, -} pub(crate) enum TableState { - Unloaded(DeferredLoadingParams), + Unloaded(Arc), Loaded(Arc>), } impl Display for TableState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - TableState::Unloaded(params) => { - write!(f, "TableState: Unloaded. To load from: {:?}", params.urls) + TableState::Unloaded(scan_task) => { + write!( + f, + "TableState: Unloaded. To load from: {:#?}", + scan_task + .sources + .iter() + .map(|s| s.get_path()) + .collect::>() + ) } TableState::Loaded(tables) => { writeln!(f, "TableState: Loaded. {} tables", tables.len())?; @@ -67,47 +60,257 @@ pub(crate) struct MicroPartition { pub(crate) statistics: Option, } +/// Helper to run all the IO and compute required to materialize a ScanTask into a Vec +/// +/// # Arguments +/// +/// * `scan_task` - a batch of ScanTasks to materialize as Tables +/// * `cast_to_schema` - an Optional schema to cast all the resulting Tables to. If not provided, will use the schema +/// provided by the ScanTask +/// * `io_stats` - an optional IOStats object to record the IO operations performed +fn materialize_scan_task( + scan_task: Arc, + cast_to_schema: Option, + io_stats: Option, +) -> crate::Result> { + log::debug!("Materializing ScanTask: {scan_task:?}"); + + let column_names = scan_task + .columns + .as_ref() + .map(|v| v.iter().map(|s| s.as_ref()).collect::>()); + let urls = scan_task.sources.iter().map(|s| s.get_path()); + + // Schema to cast resultant tables into, ensuring that all Tables have the same schema. + // Note that we need to apply column pruning here if specified by the ScanTask + let cast_to_schema = cast_to_schema.unwrap_or_else(|| scan_task.schema.clone()); + let cast_to_schema = match &column_names { + None => Ok(cast_to_schema), + Some(column_names) => Ok(Arc::new( + Schema::new( + cast_to_schema + .names() + .iter() + .filter(|name| column_names.contains(&name.as_str())) + .map(|name| cast_to_schema.get_field(name).unwrap().clone()) + .collect(), + ) + .context(DaftCoreComputeSnafu)?, + )), + }?; + + let table_values = match scan_task.storage_config.as_ref() { + StorageConfig::Native(native_storage_config) => { + let runtime_handle = + daft_io::get_runtime(native_storage_config.multithreaded_io).unwrap(); + let io_config = Arc::new( + native_storage_config + .io_config + .as_ref() + .cloned() + .unwrap_or_default(), + ); + let io_client = + daft_io::get_io_client(native_storage_config.multithreaded_io, io_config).unwrap(); + + match scan_task.file_format_config.as_ref() { + // ******************** + // Native Parquet Reads + // ******************** + FileFormatConfig::Parquet(ParquetSourceConfig { + coerce_int96_timestamp_unit, + // TODO(Clark): Support different row group specification per file. + row_groups, + }) => { + let inference_options = + ParquetSchemaInferenceOptions::new(Some(*coerce_int96_timestamp_unit)); + let urls = urls.collect::>(); + daft_parquet::read::read_parquet_bulk( + urls.as_slice(), + column_names.as_deref(), + None, + scan_task.limit, + row_groups + .as_ref() + .map(|row_groups| vec![row_groups.clone(); urls.len()]), + io_client.clone(), + io_stats, + 8, + runtime_handle, + &inference_options, + ) + .context(DaftCoreComputeSnafu)? + } + + // **************** + // Native CSV Reads + // **************** + FileFormatConfig::Csv(cfg @ CsvSourceConfig { .. }) => { + urls.map(|url| { + daft_csv::read::read_csv( + url, + None, + None, // column_names.clone(), NOTE: `read_csv` seems to be buggy when provided with out-of-order column_names + scan_task.limit, + cfg.has_headers, + Some(cfg.delimiter.as_bytes()[0]), + cfg.double_quote, + io_client.clone(), + io_stats.clone(), + native_storage_config.multithreaded_io, + None, // Allow read_csv to perform its own schema inference + cfg.buffer_size, + cfg.chunk_size, + None, // max_chunks_in_flight + ) + .context(DaftCoreComputeSnafu) + }) + .collect::>>()? + } + + // **************** + // Native JSON Reads + // **************** + FileFormatConfig::Json(_) => { + todo!("TODO: Implement MicroPartition native reads for JSON."); + } + } + } + #[cfg(feature = "python")] + StorageConfig::Python(_) => { + todo!("TODO: Implement Python I/O backend for MicroPartitions.") + } + }; + + let casted_table_values = table_values + .iter() + .map(|tbl| tbl.cast_to_schema(cast_to_schema.as_ref())) + .collect::>>() + .context(DaftCoreComputeSnafu)?; + Ok(casted_table_values) +} + impl MicroPartition { - pub fn new( + pub fn new_unloaded( schema: SchemaRef, - state: TableState, + scan_task: Arc, metadata: TableMetadata, - statistics: Option, + statistics: TableStatistics, ) -> Self { - if let TableState::Unloaded(..) = state && statistics.is_none() { - panic!("MicroPartition does not allow the Table without Statistics") + if statistics.columns.len() != schema.fields.len() { + panic!("MicroPartition: TableStatistics and Schema have differing lengths") } - if let Some(stats) = &statistics { - if stats.columns.len() != schema.fields.len() { - panic!("MicroPartition: TableStatistics and Schema have differing lengths") - } - if !stats - .columns - .keys() - .zip(schema.fields.keys()) - .all(|(l, r)| l == r) - { - panic!("MicroPartition: TableStatistics and Schema have different column names\nTableStats:\n{},\nSchema\n{}", stats, schema); - } + if !statistics + .columns + .keys() + .zip(schema.fields.keys()) + .all(|(l, r)| l == r) + { + panic!("MicroPartition: TableStatistics and Schema have different column names\nTableStats:\n{},\nSchema\n{}", statistics, schema); } MicroPartition { schema, - state: Mutex::new(state), + state: Mutex::new(TableState::Unloaded(scan_task)), metadata, + statistics: Some(statistics), + } + } + + pub fn new_loaded( + schema: SchemaRef, + tables: Arc>, + statistics: Option, + ) -> Self { + let tables_len_sum = tables.iter().map(|t| t.len()).sum(); + MicroPartition { + schema, + state: Mutex::new(TableState::Loaded(tables)), + metadata: TableMetadata { + length: tables_len_sum, + }, statistics, } } + pub fn from_scan_task( + scan_task: Arc, + io_stats: Option, + ) -> crate::Result { + let schema = scan_task.schema.clone(); + let statistics = scan_task.statistics.clone(); + match ( + &scan_task.metadata, + &scan_task.statistics, + scan_task.file_format_config.as_ref(), + scan_task.storage_config.as_ref(), + ) { + // CASE: ScanTask provides all required metadata. + // If the scan_task provides metadata (e.g. retrieved from a catalog) we can use it to create an unloaded MicroPartition + (Some(metadata), Some(statistics), _, _) => Ok(Self::new_unloaded( + schema, + scan_task.clone(), + metadata.clone(), + statistics.clone(), + )), + + // CASE: ScanTask does not provide metadata, but the file format supports metadata retrieval + // We can perform an eager **metadata** read to create an unloaded MicroPartition + ( + _, + _, + FileFormatConfig::Parquet(ParquetSourceConfig { + coerce_int96_timestamp_unit, + row_groups, + }), + StorageConfig::Native(cfg), + ) => { + let columns = scan_task + .columns + .as_ref() + .map(|cols| cols.iter().map(|s| s.as_str()).collect::>()); + + read_parquet_into_micropartition( + scan_task + .sources + .iter() + .map(|s| s.get_path()) + .collect::>() + .as_slice(), + columns.as_deref(), + None, + scan_task.limit, + row_groups.clone().map(|rg| { + std::iter::repeat(rg) + .take(scan_task.sources.len()) + .collect::>() + }), // HACK: Properly propagate multi-file row_groups + cfg.io_config + .clone() + .map(|c| Arc::new(c.clone())) + .unwrap_or_default(), + io_stats, + if scan_task.sources.len() == 1 { 1 } else { 128 }, // Hardcoded for to 128 bulk reads + cfg.multithreaded_io, + &ParquetSchemaInferenceOptions { + coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit, + }, + ) + .context(DaftCoreComputeSnafu) + } + + // CASE: Last resort fallback option + // Perform an eager **data** read + _ => { + let tables = materialize_scan_task(scan_task, None, io_stats)?; + Ok(Self::new_loaded(schema, Arc::new(tables), statistics)) + } + } + } + pub fn empty(schema: Option) -> Self { let schema = schema.unwrap_or(Schema::empty().into()); - - Self::new( - schema, - TableState::Loaded(Arc::new(vec![])), - TableMetadata { length: 0 }, - None, - ) + Self::new_loaded(schema, Arc::new(vec![]), None) } pub fn column_names(&self) -> Vec { @@ -146,55 +349,20 @@ impl MicroPartition { io_stats: Option, ) -> crate::Result>> { let mut guard = self.state.lock().unwrap(); - if let TableState::Unloaded(params) = guard.deref() { - let runtime_handle = daft_io::get_runtime(params.multithreaded_io).unwrap(); - let _rt_guard = runtime_handle.enter(); + match guard.deref() { + TableState::Unloaded(scan_task) => { + let table_values = Arc::new(materialize_scan_task( + scan_task.clone(), + Some(self.schema.clone()), + io_stats, + )?); - let table_values: Vec<_> = match ¶ms.format_params { - FormatParams::Parquet { - row_groups, - inference_options, - } => { - let io_client = - daft_io::get_io_client(params.multithreaded_io, params.io_config.clone()) - .unwrap(); - let column_names = params - .columns - .as_ref() - .map(|v| v.iter().map(|s| s.as_ref()).collect::>()); - let urls = params.urls.iter().map(|s| s.as_str()).collect::>(); - let all_tables = daft_parquet::read::read_parquet_bulk( - urls.as_slice(), - column_names.as_deref(), - None, - params.limit, - row_groups.clone(), - io_client.clone(), - io_stats, - 8, - runtime_handle, - inference_options, - ) - .context(DaftCoreComputeSnafu)?; - all_tables - .into_iter() - .map(|t| t.cast_to_schema(&self.schema)) - .collect::>>() - .context(DaftCoreComputeSnafu)? - } - }; - let casted_table_values = table_values - .iter() - .map(|tbl| tbl.cast_to_schema(self.schema.as_ref())) - .collect::>>() - .context(DaftCoreComputeSnafu)?; - *guard = TableState::Loaded(Arc::new(casted_table_values)); - }; + // Cache future accesses by setting the state to TableState::Loaded + *guard = TableState::Loaded(table_values.clone()); - if let TableState::Loaded(tables) = guard.deref() { - Ok(tables.clone()) - } else { - unreachable!() + Ok(table_values) + } + TableState::Loaded(tables) => Ok(tables.clone()), } } @@ -311,11 +479,9 @@ pub(crate) fn read_csv_into_micropartition( .collect::>>()?; // Construct MicroPartition from tables and unioned schema - let total_len = tables.iter().map(|t| t.len()).sum(); - Ok(MicroPartition::new( + Ok(MicroPartition::new_loaded( unioned_schema.clone(), - TableState::Loaded(Arc::new(tables)), - TableMetadata { length: total_len }, + Arc::new(tables), None, )) } @@ -401,19 +567,35 @@ pub(crate) fn read_parquet_into_micropartition( if let Some(stats) = stats { let owned_urls = uris.iter().map(|s| s.to_string()).collect::>(); - let owned_columns = columns.map(|c| c.iter().map(|s| s.to_string()).collect::>()); - let params = DeferredLoadingParams { - format_params: FormatParams::Parquet { - row_groups, - inference_options: *schema_infer_options, - }, - urls: owned_urls, - io_config: io_config.clone(), - multithreaded_io, - limit: num_rows, - columns: owned_columns, - }; + let daft_schema = Arc::new(daft_schema); + let scan_task = ScanTask::new( + owned_urls + .into_iter() + .map(|url| DataFileSource::AnonymousDataFile { + path: url, + metadata: None, + partition_spec: None, + statistics: None, + }) + .collect::>(), + FileFormatConfig::Parquet(ParquetSourceConfig { + coerce_int96_timestamp_unit: schema_infer_options.coerce_int96_timestamp_unit, + row_groups: None, + }) + .into(), + daft_schema.clone(), + StorageConfig::Native( + NativeStorageConfig::new_internal( + multithreaded_io, + Some(io_config.as_ref().clone()), + ) + .into(), + ) + .into(), + columns.map(|cols| Arc::new(cols.iter().map(|v| v.to_string()).collect::>())), + num_rows, + ); let exprs = daft_schema .fields @@ -421,13 +603,13 @@ pub(crate) fn read_parquet_into_micropartition( .map(|n| daft_dsl::col(n.as_str())) .collect::>(); // use schema to update stats - let stats = stats.eval_expression_list(exprs.as_slice(), &daft_schema)?; + let stats = stats.eval_expression_list(exprs.as_slice(), daft_schema.as_ref())?; - Ok(MicroPartition::new( - Arc::new(daft_schema), - TableState::Unloaded(params), + Ok(MicroPartition::new_unloaded( + scan_task.schema.clone(), + Arc::new(scan_task), TableMetadata { length: total_rows }, - Some(stats), + stats, )) } else { let all_tables = read_parquet_bulk( @@ -446,10 +628,9 @@ pub(crate) fn read_parquet_into_micropartition( .into_iter() .map(|t| t.cast_to_schema(&daft_schema)) .collect::>>()?; - Ok(MicroPartition::new( + Ok(MicroPartition::new_loaded( Arc::new(daft_schema), - TableState::Loaded(all_tables.into()), - TableMetadata { length: total_rows }, + all_tables.into(), None, )) } diff --git a/src/daft-micropartition/src/ops/agg.rs b/src/daft-micropartition/src/ops/agg.rs index 117395c0fc..695a709e13 100644 --- a/src/daft-micropartition/src/ops/agg.rs +++ b/src/daft-micropartition/src/ops/agg.rs @@ -2,9 +2,7 @@ use common_error::DaftResult; use daft_dsl::Expr; use daft_table::Table; -use crate::micropartition::{MicroPartition, TableState}; - -use daft_stats::TableMetadata; +use crate::micropartition::MicroPartition; impl MicroPartition { pub fn agg(&self, to_agg: &[Expr], group_by: &[Expr]) -> DaftResult { @@ -14,21 +12,17 @@ impl MicroPartition { [] => { let empty_table = Table::empty(Some(self.schema.clone()))?; let agged = empty_table.agg(to_agg, group_by)?; - let agged_len = agged.len(); - Ok(MicroPartition::new( + Ok(MicroPartition::new_loaded( agged.schema.clone(), - TableState::Loaded(vec![agged].into()), - TableMetadata { length: agged_len }, + vec![agged].into(), None, )) } [t] => { let agged = t.agg(to_agg, group_by)?; - let agged_len = agged.len(); - Ok(MicroPartition::new( + Ok(MicroPartition::new_loaded( agged.schema.clone(), - TableState::Loaded(vec![agged].into()), - TableMetadata { length: agged_len }, + vec![agged].into(), None, )) } diff --git a/src/daft-micropartition/src/ops/cast_to_schema.rs b/src/daft-micropartition/src/ops/cast_to_schema.rs index 22a6bc5dc8..7cd32ccd9d 100644 --- a/src/daft-micropartition/src/ops/cast_to_schema.rs +++ b/src/daft-micropartition/src/ops/cast_to_schema.rs @@ -20,22 +20,21 @@ impl MicroPartition { let guard = self.state.lock().unwrap(); match guard.deref() { // Replace schema if Unloaded, which should be applied when data is lazily loaded - TableState::Unloaded(params) => Ok(MicroPartition::new( + TableState::Unloaded(scan_task) => Ok(MicroPartition::new_unloaded( schema.clone(), - TableState::Unloaded(params.clone()), + scan_task.clone(), self.metadata.clone(), - pruned_statistics, + pruned_statistics.expect("Unloaded MicroPartition should have statistics"), )), // If Tables are already loaded, we map `Table::cast_to_schema` on each Table - TableState::Loaded(tables) => Ok(MicroPartition::new( + TableState::Loaded(tables) => Ok(MicroPartition::new_loaded( schema.clone(), - TableState::Loaded(Arc::new( + Arc::new( tables .iter() .map(|tbl| tbl.cast_to_schema(schema.as_ref())) .collect::>>()?, - )), - self.metadata.clone(), + ), pruned_statistics, )), } diff --git a/src/daft-micropartition/src/ops/eval_expressions.rs b/src/daft-micropartition/src/ops/eval_expressions.rs index f4f8a1ee81..d193707289 100644 --- a/src/daft-micropartition/src/ops/eval_expressions.rs +++ b/src/daft-micropartition/src/ops/eval_expressions.rs @@ -5,15 +5,10 @@ use daft_core::schema::Schema; use daft_dsl::Expr; use snafu::ResultExt; -use crate::{ - micropartition::{MicroPartition, TableState}, - DaftCoreComputeSnafu, -}; +use crate::{micropartition::MicroPartition, DaftCoreComputeSnafu}; use daft_stats::{ColumnRangeStatistics, TableStatistics}; -use daft_stats::TableMetadata; - fn infer_schema(exprs: &[Expr], schema: &Schema) -> DaftResult { let fields = exprs .iter() @@ -48,10 +43,9 @@ impl MicroPartition { .map(|s| s.eval_expression_list(exprs, &expected_schema)) .transpose()?; - Ok(MicroPartition::new( + Ok(MicroPartition::new_loaded( expected_schema.into(), - TableState::Loaded(Arc::new(evaluated_tables)), - TableMetadata { length: self.len() }, + Arc::new(evaluated_tables), eval_stats, )) } @@ -87,12 +81,9 @@ impl MicroPartition { } } - let new_len = evaluated_tables.iter().map(|t| t.len()).sum(); - - Ok(MicroPartition::new( + Ok(MicroPartition::new_loaded( Arc::new(expected_schema), - TableState::Loaded(Arc::new(evaluated_tables)), - TableMetadata { length: new_len }, + Arc::new(evaluated_tables), eval_stats, )) } diff --git a/src/daft-micropartition/src/ops/filter.rs b/src/daft-micropartition/src/ops/filter.rs index 91cec19cca..85e5d705f3 100644 --- a/src/daft-micropartition/src/ops/filter.rs +++ b/src/daft-micropartition/src/ops/filter.rs @@ -2,15 +2,10 @@ use common_error::DaftResult; use daft_dsl::Expr; use snafu::ResultExt; -use crate::{ - micropartition::{MicroPartition, TableState}, - DaftCoreComputeSnafu, -}; +use crate::{micropartition::MicroPartition, DaftCoreComputeSnafu}; use daft_stats::TruthValue; -use daft_stats::TableMetadata; - impl MicroPartition { pub fn filter(&self, predicate: &[Expr]) -> DaftResult { if predicate.is_empty() { @@ -37,12 +32,9 @@ impl MicroPartition { .collect::>>() .context(DaftCoreComputeSnafu)?; - let new_len = tables.iter().map(|t| t.len()).sum(); - - Ok(Self::new( + Ok(Self::new_loaded( self.schema.clone(), - TableState::Loaded(tables.into()), - TableMetadata { length: new_len }, + tables.into(), self.statistics.clone(), // update these values based off the filter we just ran )) } diff --git a/src/daft-micropartition/src/ops/join.rs b/src/daft-micropartition/src/ops/join.rs index 5e49734d80..71b6a5d2cf 100644 --- a/src/daft-micropartition/src/ops/join.rs +++ b/src/daft-micropartition/src/ops/join.rs @@ -3,12 +3,10 @@ use daft_core::array::ops::DaftCompare; use daft_dsl::Expr; use daft_table::infer_join_schema; -use crate::micropartition::{MicroPartition, TableState}; +use crate::micropartition::MicroPartition; use daft_stats::TruthValue; -use daft_stats::TableMetadata; - impl MicroPartition { pub fn join(&self, right: &Self, left_on: &[Expr], right_on: &[Expr]) -> DaftResult { let join_schema = infer_join_schema(&self.schema, &right.schema, left_on, right_on)?; @@ -43,11 +41,9 @@ impl MicroPartition { ([], _) | (_, []) => Ok(Self::empty(Some(join_schema.into()))), ([lt], [rt]) => { let joined_table = lt.join(rt, left_on, right_on)?; - let joined_len = joined_table.len(); - Ok(MicroPartition::new( + Ok(MicroPartition::new_loaded( join_schema.into(), - TableState::Loaded(vec![joined_table].into()), - TableMetadata { length: joined_len }, + vec![joined_table].into(), None, )) } diff --git a/src/daft-micropartition/src/ops/partition.rs b/src/daft-micropartition/src/ops/partition.rs index 2b90216ff8..4b4af2150e 100644 --- a/src/daft-micropartition/src/ops/partition.rs +++ b/src/daft-micropartition/src/ops/partition.rs @@ -4,9 +4,7 @@ use common_error::DaftResult; use daft_dsl::Expr; use daft_table::Table; -use crate::micropartition::{MicroPartition, TableState}; - -use daft_stats::TableMetadata; +use crate::micropartition::MicroPartition; fn transpose2(v: Vec>) -> Vec> { if v.is_empty() { @@ -33,11 +31,9 @@ impl MicroPartition { Ok(part_tables .into_iter() .map(|v| { - let new_len = v.iter().map(|t| t.len()).sum(); - MicroPartition::new( + MicroPartition::new_loaded( self.schema.clone(), - TableState::Loaded(Arc::new(v)), - TableMetadata { length: new_len }, + Arc::new(v), self.statistics.clone(), ) }) diff --git a/src/daft-micropartition/src/ops/slice.rs b/src/daft-micropartition/src/ops/slice.rs index 63e74bffbb..37587fd919 100644 --- a/src/daft-micropartition/src/ops/slice.rs +++ b/src/daft-micropartition/src/ops/slice.rs @@ -1,8 +1,6 @@ use common_error::DaftResult; -use crate::micropartition::{MicroPartition, TableState}; - -use daft_stats::TableMetadata; +use crate::micropartition::MicroPartition; impl MicroPartition { pub fn slice(&self, start: usize, end: usize) -> DaftResult { @@ -34,14 +32,11 @@ impl MicroPartition { } } - let new_len = slices_tables.iter().map(|t| t.len()).sum(); - - Ok(MicroPartition { - schema: self.schema.clone(), - state: TableState::Loaded(slices_tables.into()).into(), - metadata: TableMetadata { length: new_len }, - statistics: self.statistics.clone(), - }) + Ok(MicroPartition::new_loaded( + self.schema.clone(), + slices_tables.into(), + self.statistics.clone(), + )) } pub fn head(&self, num: usize) -> DaftResult { diff --git a/src/daft-micropartition/src/ops/sort.rs b/src/daft-micropartition/src/ops/sort.rs index 527a2abae9..fe0d7921ac 100644 --- a/src/daft-micropartition/src/ops/sort.rs +++ b/src/daft-micropartition/src/ops/sort.rs @@ -5,7 +5,7 @@ use daft_core::Series; use daft_dsl::Expr; use daft_table::Table; -use crate::micropartition::{MicroPartition, TableState}; +use crate::micropartition::MicroPartition; impl MicroPartition { pub fn sort(&self, sort_keys: &[Expr], descending: &[bool]) -> DaftResult { @@ -14,10 +14,9 @@ impl MicroPartition { [] => Ok(Self::empty(Some(self.schema.clone()))), [single] => { let sorted = single.sort(sort_keys, descending)?; - Ok(Self::new( + Ok(Self::new_loaded( self.schema.clone(), - TableState::Loaded(Arc::new(vec![sorted])), - self.metadata.clone(), + Arc::new(vec![sorted]), self.statistics.clone(), )) } diff --git a/src/daft-micropartition/src/ops/take.rs b/src/daft-micropartition/src/ops/take.rs index 7212630514..1c2d986673 100644 --- a/src/daft-micropartition/src/ops/take.rs +++ b/src/daft-micropartition/src/ops/take.rs @@ -4,8 +4,7 @@ use common_error::DaftResult; use daft_core::Series; use daft_table::Table; -use crate::micropartition::{MicroPartition, TableState}; -use daft_stats::TableMetadata; +use crate::micropartition::MicroPartition; impl MicroPartition { pub fn take(&self, idx: &Series) -> DaftResult { @@ -15,19 +14,17 @@ impl MicroPartition { [] => { let empty_table = Table::empty(Some(self.schema.clone()))?; let taken = empty_table.take(idx)?; - Ok(Self::new( + Ok(Self::new_loaded( self.schema.clone(), - TableState::Loaded(Arc::new(vec![taken])), - TableMetadata { length: idx.len() }, + Arc::new(vec![taken]), self.statistics.clone(), )) } [single] => { let taken = single.take(idx)?; - Ok(Self::new( + Ok(Self::new_loaded( self.schema.clone(), - TableState::Loaded(Arc::new(vec![taken])), - TableMetadata { length: idx.len() }, + Arc::new(vec![taken]), self.statistics.clone(), )) } @@ -42,11 +39,9 @@ impl MicroPartition { [] => Ok(Self::empty(Some(self.schema.clone()))), [single] => { let taken = single.sample(num)?; - let taken_len = taken.len(); - Ok(Self::new( + Ok(Self::new_loaded( self.schema.clone(), - TableState::Loaded(Arc::new(vec![taken])), - TableMetadata { length: taken_len }, + Arc::new(vec![taken]), self.statistics.clone(), )) } @@ -60,11 +55,9 @@ impl MicroPartition { [] => Ok(Self::empty(Some(self.schema.clone()))), [single] => { let taken = single.quantiles(num)?; - let taken_len = taken.len(); - Ok(Self::new( + Ok(Self::new_loaded( self.schema.clone(), - TableState::Loaded(Arc::new(vec![taken])), - TableMetadata { length: taken_len }, + Arc::new(vec![taken]), self.statistics.clone(), )) } diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 4f430eb237..fb2a82c243 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -1,5 +1,3 @@ -#![allow(unused)] // MAKE SURE TO REMOVE THIS - use std::{ ops::Deref, sync::{Arc, Mutex}, @@ -7,25 +5,19 @@ use std::{ use common_error::DaftResult; use daft_core::{ - ffi, python::{datatype::PyTimeUnit, schema::PySchema, PySeries}, schema::Schema, Series, }; use daft_dsl::python::PyExpr; -use daft_io::{get_io_client, python::IOConfig, IOStatsContext}; +use daft_io::{python::IOConfig, IOStatsContext}; use daft_parquet::read::ParquetSchemaInferenceOptions; +use daft_scan::{python::pylib::PyScanTask, ScanTask}; use daft_stats::TableStatistics; -use daft_table::{python::PyTable, Table}; -use indexmap::IndexMap; -use pyo3::{ - exceptions::PyValueError, - prelude::*, - types::{PyBytes, PyDict, PyList, PyTuple}, - Python, -}; +use daft_table::python::PyTable; +use pyo3::{exceptions::PyValueError, prelude::*, types::PyBytes, Python}; -use crate::micropartition::{DeferredLoadingParams, MicroPartition, TableState}; +use crate::micropartition::{MicroPartition, TableState}; use daft_stats::TableMetadata; use pyo3::PyTypeInfo; @@ -77,18 +69,20 @@ impl PyMicroPartition { } // Creation Methods + #[staticmethod] + pub fn from_scan_task(scan_task: PyScanTask) -> PyResult { + Ok(MicroPartition::from_scan_task(scan_task.into(), None)?.into()) + } + #[staticmethod] pub fn from_tables(tables: Vec) -> PyResult { match &tables[..] { [] => Ok(MicroPartition::empty(None).into()), [first, ..] => { let tables = Arc::new(tables.iter().map(|t| t.table.clone()).collect::>()); - Ok(MicroPartition::new( + Ok(MicroPartition::new_loaded( first.table.schema.clone(), - TableState::Loaded(tables.clone()), - TableMetadata { - length: tables.iter().map(|t| t.len()).sum(), - }, + tables, // Don't compute statistics if data is already materialized None, ) @@ -118,18 +112,11 @@ impl PyMicroPartition { .map(|rb| daft_table::ffi::record_batches_to_table(py, &[rb], schema.schema.clone())) .collect::>>()?; - let total_len = tables.iter().map(|tbl| tbl.len()).sum(); - Ok(MicroPartition::new( - schema.schema.clone(), - TableState::Loaded(Arc::new(tables)), - TableMetadata { length: total_len }, - None, - ) - .into()) + Ok(MicroPartition::new_loaded(schema.schema.clone(), Arc::new(tables), None).into()) } // Export Methods - pub fn to_table(&self, py: Python) -> PyResult { + pub fn to_table(&self) -> PyResult { let concatted = self.inner.concat_or_get()?; match &concatted.as_ref()[..] { [] => PyTable::empty(Some(self.schema()?)), @@ -469,25 +456,24 @@ impl PyMicroPartition { #[staticmethod] pub fn _from_unloaded_table_state( - py: Python, schema_bytes: &PyBytes, - loading_params_bytes: &PyBytes, + loading_scan_task_bytes: &PyBytes, metadata_bytes: &PyBytes, statistics_bytes: &PyBytes, ) -> PyResult { let schema = bincode::deserialize::(schema_bytes.as_bytes()).unwrap(); - let params = - bincode::deserialize::(loading_params_bytes.as_bytes()).unwrap(); + let scan_task = + bincode::deserialize::(loading_scan_task_bytes.as_bytes()).unwrap(); let metadata = bincode::deserialize::(metadata_bytes.as_bytes()).unwrap(); let statistics = bincode::deserialize::>(statistics_bytes.as_bytes()).unwrap(); - Ok(MicroPartition::new( - schema.into(), - TableState::Unloaded(params), + Ok(MicroPartition { + schema: Arc::new(schema), + state: Mutex::new(TableState::Unloaded(Arc::new(scan_task))), metadata, statistics, - ) + } .into()) } @@ -513,12 +499,12 @@ impl PyMicroPartition { }) .collect::>>()?; - Ok(MicroPartition::new( - schema.into(), - TableState::Loaded(tables.into()), + Ok(MicroPartition { + schema: Arc::new(schema), + state: Mutex::new(TableState::Loaded(Arc::new(tables))), metadata, statistics, - ) + } .into()) } diff --git a/src/daft-plan/Cargo.toml b/src/daft-plan/Cargo.toml index 079fd75715..f9c683ef77 100644 --- a/src/daft-plan/Cargo.toml +++ b/src/daft-plan/Cargo.toml @@ -5,6 +5,7 @@ common-error = {path = "../common/error", default-features = false} common-io-config = {path = "../common/io-config", default-features = false} daft-core = {path = "../daft-core", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} +daft-scan = {path = "../daft-scan", default-features = false} daft-table = {path = "../daft-table", default-features = false} indexmap = {workspace = true} log = {workspace = true} diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index bd89bb720d..5907c23bd9 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -7,24 +7,27 @@ use crate::{ planner::plan, sink_info::{OutputFileInfo, SinkInfo}, source_info::{ - ExternalInfo as ExternalSourceInfo, FileFormatConfig, FileInfos as InputFileInfos, - PyStorageConfig, SourceInfo, StorageConfig, + ExternalInfo as ExternalSourceInfo, FileInfos as InputFileInfos, LegacyExternalInfo, + SourceInfo, }, - FileFormat, JoinType, PartitionScheme, PhysicalPlanScheduler, ResourceRequest, + JoinType, PartitionScheme, PhysicalPlanScheduler, ResourceRequest, }; use common_error::{DaftError, DaftResult}; use daft_core::schema::SchemaRef; use daft_core::{datatypes::Field, schema::Schema, DataType}; use daft_dsl::Expr; +use daft_scan::{ + file_format::{FileFormat, FileFormatConfig}, + storage_config::{PyStorageConfig, StorageConfig}, + ScanExternalInfo, ScanOperatorRef, +}; #[cfg(feature = "python")] use { - crate::{ - physical_plan::PhysicalPlan, - source_info::{InMemoryInfo, PyFileFormatConfig}, - }, + crate::{physical_plan::PhysicalPlan, source_info::InMemoryInfo}, daft_core::python::schema::PySchema, daft_dsl::python::PyExpr, + daft_scan::{file_format::PyFileFormatConfig, python::pylib::ScanOperatorHandle}, pyo3::prelude::*, }; @@ -64,6 +67,24 @@ impl LogicalPlanBuilder { Ok(logical_plan.into()) } + pub fn table_scan_with_scan_operator( + scan_operator: ScanOperatorRef, + schema_hint: Option, + ) -> DaftResult { + let schema = schema_hint.unwrap_or_else(|| scan_operator.0.schema()); + let partitioning_keys = scan_operator.0.partitioning_keys(); + let source_info = + SourceInfo::ExternalInfo(ExternalSourceInfo::Scan(ScanExternalInfo::new( + scan_operator.clone(), + schema.clone(), + partitioning_keys.into(), + Default::default(), + ))); + let logical_plan: LogicalPlan = + logical_ops::Source::new(schema.clone(), source_info.into(), None).into(); + Ok(logical_plan.into()) + } + pub fn table_scan( file_infos: InputFileInfos, schema: Arc, @@ -80,12 +101,13 @@ impl LogicalPlanBuilder { storage_config: Arc, limit: Option, ) -> DaftResult { - let source_info = SourceInfo::ExternalInfo(ExternalSourceInfo::new( - schema.clone(), - file_infos.into(), - file_format_config, - storage_config, - )); + let source_info = + SourceInfo::ExternalInfo(ExternalSourceInfo::Legacy(LegacyExternalInfo::new( + schema.clone(), + file_infos.into(), + file_format_config, + storage_config, + ))); let logical_plan: LogicalPlan = logical_ops::Source::new(schema.clone(), source_info.into(), limit).into(); Ok(logical_plan.into()) @@ -265,6 +287,18 @@ impl PyLogicalPlanBuilder { .into()) } + #[staticmethod] + pub fn table_scan_with_scan_operator( + scan_operator: ScanOperatorHandle, + schema_hint: Option, + ) -> PyResult { + Ok(LogicalPlanBuilder::table_scan_with_scan_operator( + scan_operator.into(), + schema_hint.map(|s| s.into()), + )? + .into()) + } + #[staticmethod] pub fn table_scan( file_infos: InputFileInfos, diff --git a/src/daft-plan/src/lib.rs b/src/daft-plan/src/lib.rs index a72214e4d8..ec8b4866a3 100644 --- a/src/daft-plan/src/lib.rs +++ b/src/daft-plan/src/lib.rs @@ -1,5 +1,6 @@ #![feature(let_chains)] #![feature(assert_matches)] +#![feature(if_let_guard)] mod builder; mod display; @@ -18,18 +19,23 @@ mod source_info; mod test; pub use builder::{LogicalPlanBuilder, PyLogicalPlanBuilder}; +use daft_scan::{ + file_format::{ + CsvSourceConfig, FileFormat, JsonSourceConfig, ParquetSourceConfig, PyFileFormatConfig, + }, + storage_config::{NativeStorageConfig, PyStorageConfig}, +}; pub use join::JoinType; pub use logical_plan::LogicalPlan; pub use partitioning::{PartitionScheme, PartitionSpec}; pub use physical_plan::PhysicalPlanScheduler; pub use resource_request::ResourceRequest; -pub use source_info::{ - CsvSourceConfig, FileFormat, FileInfo, FileInfos, JsonSourceConfig, NativeStorageConfig, - ParquetSourceConfig, PyFileFormatConfig, PyStorageConfig, -}; +pub use source_info::{FileInfo, FileInfos}; #[cfg(feature = "python")] -use {pyo3::prelude::*, source_info::PythonStorageConfig}; +use daft_scan::storage_config::PythonStorageConfig; +#[cfg(feature = "python")] +use pyo3::prelude::*; #[cfg(feature = "python")] pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { diff --git a/src/daft-plan/src/logical_ops/source.rs b/src/daft-plan/src/logical_ops/source.rs index 7e3738dd05..2c3830f0c0 100644 --- a/src/daft-plan/src/logical_ops/source.rs +++ b/src/daft-plan/src/logical_ops/source.rs @@ -2,8 +2,9 @@ use std::sync::Arc; use daft_core::schema::SchemaRef; use daft_dsl::ExprRef; +use daft_scan::ScanExternalInfo; -use crate::source_info::{ExternalInfo, SourceInfo}; +use crate::source_info::{ExternalInfo, LegacyExternalInfo, SourceInfo}; #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct Source { @@ -14,6 +15,8 @@ pub struct Source { /// Information about the source data location. pub source_info: Arc, + // TODO(Clark): Replace these pushdown fields with the Pushdown struct, where the Pushdown struct would exist + // on the LegacyExternalInfo struct in SourceInfo. /// Optional filters to apply to the source data. pub filters: Vec, /// Optional number of rows to read. @@ -56,12 +59,12 @@ impl Source { let mut res = vec![]; match self.source_info.as_ref() { - SourceInfo::ExternalInfo(ExternalInfo { + SourceInfo::ExternalInfo(ExternalInfo::Legacy(LegacyExternalInfo { source_schema, file_infos, file_format_config, storage_config, - }) => { + })) => { res.push(format!("Source: {}", file_format_config.var_name())); res.push(format!( "File paths = [{}]", @@ -71,6 +74,18 @@ impl Source { res.push(format!("Format-specific config = {:?}", file_format_config)); res.push(format!("Storage config = {:?}", storage_config)); } + SourceInfo::ExternalInfo(ExternalInfo::Scan(ScanExternalInfo { + source_schema, + scan_op, + partitioning_keys, + pushdowns, + })) => { + res.push("Source:".to_string()); + res.push(format!("Scan op = {}", scan_op)); + res.push(format!("File schema = {}", source_schema.short_string())); + res.push(format!("Partitioning keys = {:?}", partitioning_keys)); + res.push(format!("Scan pushdowns = {:?}", pushdowns)); + } #[cfg(feature = "python")] SourceInfo::InMemoryInfo(_) => {} } diff --git a/src/daft-plan/src/optimization/optimizer.rs b/src/daft-plan/src/optimization/optimizer.rs index 346c93d523..d020c0eb99 100644 --- a/src/daft-plan/src/optimization/optimizer.rs +++ b/src/daft-plan/src/optimization/optimizer.rs @@ -536,7 +536,7 @@ mod tests { let expected = "\ Filter: [[[col(a) < lit(2)] | lit(false)] | lit(false)] & lit(true)\ \n Project: col(a) + lit(3) AS c, col(a) + lit(1), col(a) + lit(2) AS b\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64)"; assert_eq!(opt_plan.repr_indent(), expected); Ok(()) } diff --git a/src/daft-plan/src/optimization/rules/drop_repartition.rs b/src/daft-plan/src/optimization/rules/drop_repartition.rs index a5a59a5099..02db0ffb1f 100644 --- a/src/daft-plan/src/optimization/rules/drop_repartition.rs +++ b/src/daft-plan/src/optimization/rules/drop_repartition.rs @@ -98,7 +98,7 @@ mod tests { .build(); let expected = "\ Repartition: Scheme = Hash, Number of partitions = 5, Partition by = col(a)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } diff --git a/src/daft-plan/src/optimization/rules/push_down_filter.rs b/src/daft-plan/src/optimization/rules/push_down_filter.rs index 6fda32511a..0bd738e574 100644 --- a/src/daft-plan/src/optimization/rules/push_down_filter.rs +++ b/src/daft-plan/src/optimization/rules/push_down_filter.rs @@ -261,7 +261,7 @@ mod tests { .build(); let expected = "\ Filter: [col(b) == lit(\"foo\")] & [col(a) < lit(2)]\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -279,7 +279,7 @@ mod tests { let expected = "\ Project: col(a)\ \n Filter: col(a) < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -297,7 +297,7 @@ mod tests { let expected = "\ Project: col(a), col(b)\ \n Filter: [col(a) < lit(2)] & [col(b) == lit(\"foo\")]\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -317,7 +317,7 @@ mod tests { let expected = "\ Filter: col(a) < lit(2)\ \n Project: col(a) + lit(1)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -338,7 +338,7 @@ mod tests { let expected = "\ Project: col(a) + lit(1)\ \n Filter: [col(a) + lit(1)] < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -356,7 +356,7 @@ mod tests { let expected = "\ Sort: Sort by = (col(a), descending)\ \n Filter: col(a) < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; // TODO(Clark): For tests in which we only care about reordering of operators, maybe switch to a form that leverages the single-node display? // let expected = format!("{sort}\n {filter}\n {source}"); assert_optimized_plan_eq(plan, expected)?; @@ -376,7 +376,7 @@ mod tests { let expected = "\ Repartition: Scheme = Hash, Number of partitions = 1, Partition by = col(a)\ \n Filter: col(a) < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -395,9 +395,9 @@ mod tests { let expected = "\ Concat\ \n Filter: col(a) < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)\ + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)\ \n Filter: col(a) < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -423,8 +423,8 @@ mod tests { let expected = "\ Join: Type = Inner, On = col(b), Output schema = a (Int64), b (Utf8), c (Float64)\ \n Filter: col(a) < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)\ - \n Source: Json, File paths = [/foo], File schema = b (Utf8), c (Float64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = b (Utf8), c (Float64)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)\ + \n Source: Json, File paths = [/foo], File schema = b (Utf8), c (Float64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = b (Utf8), c (Float64)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -449,9 +449,9 @@ mod tests { .build(); let expected = "\ Join: Type = Inner, On = col(b), Output schema = a (Int64), b (Utf8), c (Float64)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)\ + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)\ \n Filter: col(c) < lit(2.0)\ - \n Source: Json, File paths = [/foo], File schema = b (Utf8), c (Float64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = b (Utf8), c (Float64)"; + \n Source: Json, File paths = [/foo], File schema = b (Utf8), c (Float64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = b (Utf8), c (Float64)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -475,9 +475,9 @@ mod tests { let expected = "\ Join: Type = Inner, On = col(b), Output schema = a (Int64), b (Int64), c (Float64)\ \n Filter: col(b) < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), c (Float64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Int64), c (Float64)\ + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), c (Float64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Int64), c (Float64)\ \n Filter: col(b) < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = b (Int64)"; + \n Source: Json, File paths = [/foo], File schema = b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = b (Int64)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } diff --git a/src/daft-plan/src/optimization/rules/push_down_limit.rs b/src/daft-plan/src/optimization/rules/push_down_limit.rs index 73085c1a88..d5266a4ddb 100644 --- a/src/daft-plan/src/optimization/rules/push_down_limit.rs +++ b/src/daft-plan/src/optimization/rules/push_down_limit.rs @@ -1,8 +1,13 @@ use std::sync::Arc; use common_error::DaftResult; +use daft_scan::{Pushdowns, ScanExternalInfo}; -use crate::{logical_ops::Limit as LogicalLimit, source_info::SourceInfo, LogicalPlan}; +use crate::{ + logical_ops::Limit as LogicalLimit, + source_info::{ExternalInfo, SourceInfo}, + LogicalPlan, +}; use super::{ApplyOrder, OptimizerRule, Transformed}; @@ -41,20 +46,44 @@ impl OptimizerRule for PushDownLimit { // Limit pushdown is not supported for in-memory sources. #[cfg(feature = "python")] (SourceInfo::InMemoryInfo(_), _) => Ok(Transformed::No(plan)), + + // Legacy external info handling. + // Do not pushdown if Source node is already more limited than `limit` - (SourceInfo::ExternalInfo(_), Some(existing_source_limit)) + (SourceInfo::ExternalInfo(ExternalInfo::Legacy(_)), Some(existing_source_limit)) if (existing_source_limit <= limit) => { Ok(Transformed::No(plan)) } // Pushdown limit into the Source node as a "local" limit - (SourceInfo::ExternalInfo(_), _) => { + (SourceInfo::ExternalInfo(ExternalInfo::Legacy(_)), _) => { let new_source = LogicalPlan::Source(source.with_limit(Some(limit))).into(); let limit_with_local_limited_source = plan.with_new_children(&[new_source]); Ok(Transformed::Yes(limit_with_local_limited_source)) } + + // Scan operator external info handling. + + // Do not pushdown if Source node is already more limited than `limit` + (SourceInfo::ExternalInfo(ExternalInfo::Scan(ScanExternalInfo { pushdowns: Pushdowns { limit: existing_source_limit, .. }, .. })), _) + if let Some(existing_source_limit) = existing_source_limit && existing_source_limit <= &limit => + { + Ok(Transformed::No(plan)) + } + // Pushdown limit into the Source node as a "local" limit + (SourceInfo::ExternalInfo(ExternalInfo::Scan(ScanExternalInfo { scan_op, .. })), _) => { + let new_source = + LogicalPlan::Source(source.with_limit(Some(limit))).into(); + let out_plan = if scan_op.0.can_absorb_limit() { + // Scan can fully absorb the limit, so we can drop the Limit op from the logical plan. + new_source + } else { + plan.with_new_children(&[new_source]) + }; + Ok(Transformed::Yes(out_plan)) + } } } _ => Ok(Transformed::No(plan)), @@ -122,7 +151,7 @@ mod tests { .build(); let expected = "\ Limit: 5\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8), Limit = 5"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8), Limit = 5"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -143,7 +172,7 @@ mod tests { .build(); let expected = "\ Limit: 5\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8), Limit = 3"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8), Limit = 3"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -164,7 +193,7 @@ mod tests { .build(); let expected = "\ Limit: 5\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8), Limit = 5"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8), Limit = 5"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -182,7 +211,7 @@ mod tests { .build(); let expected = "\ Limit: 5\ - \n . Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; + \n . Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -202,7 +231,7 @@ mod tests { let expected = "\ Repartition: Scheme = Hash, Number of partitions = 1, Partition by = col(a)\ \n Limit: 5\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8), Limit = 5"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8), Limit = 5"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -222,7 +251,7 @@ mod tests { let expected = "\ Project: col(a)\ \n Limit: 5\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8), Limit = 5"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8), Limit = 5"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } diff --git a/src/daft-plan/src/optimization/rules/push_down_projection.rs b/src/daft-plan/src/optimization/rules/push_down_projection.rs index c80e7e5fbd..6ebe73c157 100644 --- a/src/daft-plan/src/optimization/rules/push_down_projection.rs +++ b/src/daft-plan/src/optimization/rules/push_down_projection.rs @@ -4,10 +4,12 @@ use common_error::DaftResult; use daft_core::schema::Schema; use daft_dsl::{optimization::replace_columns_with_expressions, Expr}; +use daft_scan::ScanExternalInfo; use indexmap::IndexSet; use crate::{ logical_ops::{Aggregate, Project, Source}, + source_info::{ExternalInfo, SourceInfo}, LogicalPlan, ResourceRequest, }; @@ -153,8 +155,25 @@ impl PushDownProjection { }) .collect::>(); let schema = Schema::new(pruned_upstream_schema)?; - let new_source: LogicalPlan = - Source::new(schema.into(), source.source_info.clone(), source.limit).into(); + let new_source: LogicalPlan = match source.source_info.as_ref() { + SourceInfo::ExternalInfo(ExternalInfo::Scan(scan_external_info)) => { + Source::new( + schema.into(), + Arc::new(SourceInfo::ExternalInfo(ExternalInfo::Scan( + ScanExternalInfo { + pushdowns: scan_external_info.pushdowns.with_columns(Some( + Arc::new(required_columns.iter().cloned().collect()), + )), + ..scan_external_info.clone() + }, + ))), + source.limit, + ) + .into() + } + _ => Source::new(schema.into(), source.source_info.clone(), source.limit) + .into(), + }; let new_plan = plan.with_new_children(&[new_source.into()]); // Retry optimization now that the upstream node is different. @@ -560,7 +579,7 @@ mod tests { let expected = "\ Project: [col(a) + lit(1)] + lit(3), col(b) + lit(2), col(a) + lit(4)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Int64)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Int64)"; assert_optimized_plan_eq(unoptimized, expected)?; Ok(()) } @@ -576,7 +595,7 @@ mod tests { .build(); let expected = "\ - Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Int64)"; + Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Int64)"; assert_optimized_plan_eq(unoptimized, expected)?; Ok(()) @@ -593,7 +612,7 @@ mod tests { let expected = "\ Project: col(b), col(a)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Int64)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Int64)"; assert_optimized_plan_eq(unoptimized, expected)?; Ok(()) @@ -611,7 +630,7 @@ mod tests { let expected = "\ Project: col(b) + lit(3)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = b (Int64)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = b (Int64)"; assert_optimized_plan_eq(unoptimized, expected)?; Ok(()) @@ -637,7 +656,7 @@ mod tests { let expected = "\ Project: col(a), col(b), col(b) AS c\ \n Project: col(b) + lit(3), col(a)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Int64)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Int64)"; assert_optimized_plan_eq(unoptimized, expected)?; Ok(()) @@ -658,7 +677,7 @@ mod tests { let expected = "\ Project: col(a)\ \n Aggregation: mean(col(a)), Group by = col(c), Output schema = c (Int64), a (Float64)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), c (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), c (Int64)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), c (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), c (Int64)"; assert_optimized_plan_eq(unoptimized, expected)?; Ok(()) @@ -679,7 +698,7 @@ mod tests { let expected = "\ Project: col(a)\ \n Filter: col(b)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Boolean), c (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Boolean)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Boolean), c (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Boolean)"; assert_optimized_plan_eq(unoptimized, expected)?; Ok(()) diff --git a/src/daft-plan/src/physical_ops/csv.rs b/src/daft-plan/src/physical_ops/csv.rs index cccedd45d1..8cb99846b8 100644 --- a/src/daft-plan/src/physical_ops/csv.rs +++ b/src/daft-plan/src/physical_ops/csv.rs @@ -4,7 +4,7 @@ use daft_core::schema::SchemaRef; use daft_dsl::ExprRef; use crate::{ - physical_plan::PhysicalPlan, sink_info::OutputFileInfo, source_info::ExternalInfo, + physical_plan::PhysicalPlan, sink_info::OutputFileInfo, source_info::LegacyExternalInfo, PartitionSpec, }; use serde::{Deserialize, Serialize}; @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct TabularScanCsv { pub projection_schema: SchemaRef, - pub external_info: ExternalInfo, + pub external_info: LegacyExternalInfo, pub partition_spec: Arc, pub limit: Option, pub filters: Vec, @@ -21,7 +21,7 @@ pub struct TabularScanCsv { impl TabularScanCsv { pub(crate) fn new( projection_schema: SchemaRef, - external_info: ExternalInfo, + external_info: LegacyExternalInfo, partition_spec: Arc, limit: Option, filters: Vec, diff --git a/src/daft-plan/src/physical_ops/json.rs b/src/daft-plan/src/physical_ops/json.rs index 0d8fce535f..80249f30b7 100644 --- a/src/daft-plan/src/physical_ops/json.rs +++ b/src/daft-plan/src/physical_ops/json.rs @@ -4,7 +4,7 @@ use daft_core::schema::SchemaRef; use daft_dsl::ExprRef; use crate::{ - physical_plan::PhysicalPlan, sink_info::OutputFileInfo, source_info::ExternalInfo, + physical_plan::PhysicalPlan, sink_info::OutputFileInfo, source_info::LegacyExternalInfo, PartitionSpec, }; use serde::{Deserialize, Serialize}; @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct TabularScanJson { pub projection_schema: SchemaRef, - pub external_info: ExternalInfo, + pub external_info: LegacyExternalInfo, pub partition_spec: Arc, pub limit: Option, pub filters: Vec, @@ -21,7 +21,7 @@ pub struct TabularScanJson { impl TabularScanJson { pub(crate) fn new( projection_schema: SchemaRef, - external_info: ExternalInfo, + external_info: LegacyExternalInfo, partition_spec: Arc, limit: Option, filters: Vec, diff --git a/src/daft-plan/src/physical_ops/mod.rs b/src/daft-plan/src/physical_ops/mod.rs index d1e12d6a69..09dc707511 100644 --- a/src/daft-plan/src/physical_ops/mod.rs +++ b/src/daft-plan/src/physical_ops/mod.rs @@ -14,6 +14,7 @@ mod limit; mod parquet; mod project; mod reduce; +mod scan; mod sort; mod split; @@ -33,5 +34,6 @@ pub use limit::Limit; pub use parquet::{TabularScanParquet, TabularWriteParquet}; pub use project::Project; pub use reduce::ReduceMerge; +pub use scan::TabularScan; pub use sort::Sort; pub use split::Split; diff --git a/src/daft-plan/src/physical_ops/parquet.rs b/src/daft-plan/src/physical_ops/parquet.rs index f461432d3f..fdf1241f64 100644 --- a/src/daft-plan/src/physical_ops/parquet.rs +++ b/src/daft-plan/src/physical_ops/parquet.rs @@ -5,7 +5,7 @@ use daft_dsl::ExprRef; use crate::{ physical_plan::PhysicalPlan, sink_info::OutputFileInfo, - source_info::ExternalInfo as ExternalSourceInfo, PartitionSpec, + source_info::LegacyExternalInfo as ExternalSourceInfo, PartitionSpec, }; use serde::{Deserialize, Serialize}; diff --git a/src/daft-plan/src/physical_ops/scan.rs b/src/daft-plan/src/physical_ops/scan.rs new file mode 100644 index 0000000000..b4a5261d70 --- /dev/null +++ b/src/daft-plan/src/physical_ops/scan.rs @@ -0,0 +1,21 @@ +use std::sync::Arc; + +use daft_scan::ScanTask; + +use crate::PartitionSpec; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct TabularScan { + pub scan_tasks: Vec>, + pub partition_spec: Arc, +} + +impl TabularScan { + pub(crate) fn new(scan_tasks: Vec, partition_spec: Arc) -> Self { + Self { + scan_tasks: scan_tasks.into_iter().map(Arc::new).collect(), + partition_spec, + } + } +} diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index eca30d3869..b77320fe55 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -2,15 +2,17 @@ use { crate::{ sink_info::OutputFileInfo, - source_info::{ - ExternalInfo, FileFormat, FileFormatConfig, FileInfos, InMemoryInfo, - PyFileFormatConfig, PyStorageConfig, StorageConfig, - }, + source_info::{FileInfos, InMemoryInfo, LegacyExternalInfo}, }, daft_core::python::schema::PySchema, daft_core::schema::SchemaRef, daft_dsl::python::PyExpr, daft_dsl::Expr, + daft_scan::{ + file_format::{FileFormat, FileFormatConfig, PyFileFormatConfig}, + python::pylib::PyScanTask, + storage_config::{PyStorageConfig, StorageConfig}, + }, pyo3::{ pyclass, pymethods, types::PyBytes, PyObject, PyRef, PyRefMut, PyResult, PyTypeInfo, Python, ToPyObject, @@ -32,6 +34,7 @@ pub enum PhysicalPlan { TabularScanParquet(TabularScanParquet), TabularScanCsv(TabularScanCsv), TabularScanJson(TabularScanJson), + TabularScan(TabularScan), Project(Project), Filter(Filter), Limit(Limit), @@ -58,6 +61,7 @@ impl PhysicalPlan { match self { #[cfg(feature = "python")] Self::InMemoryScan(InMemoryScan { partition_spec, .. }) => partition_spec.clone(), + Self::TabularScan(TabularScan { partition_spec, .. }) => partition_spec.clone(), Self::TabularScanParquet(TabularScanParquet { partition_spec, .. }) => { partition_spec.clone() } @@ -303,10 +307,20 @@ impl PhysicalPlan { .call1((partition_iter,))?; Ok(py_iter.into()) } + PhysicalPlan::TabularScan(TabularScan { scan_tasks, .. }) => { + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "scan_with_tasks"))? + .call1((scan_tasks + .iter() + .map(|scan_task| PyScanTask(scan_task.clone())) + .collect::>(),))?; + Ok(py_iter.into()) + } PhysicalPlan::TabularScanParquet(TabularScanParquet { projection_schema, external_info: - ExternalInfo { + LegacyExternalInfo { source_schema, file_infos, file_format_config, @@ -328,7 +342,7 @@ impl PhysicalPlan { PhysicalPlan::TabularScanCsv(TabularScanCsv { projection_schema, external_info: - ExternalInfo { + LegacyExternalInfo { source_schema, file_infos, file_format_config, @@ -350,7 +364,7 @@ impl PhysicalPlan { PhysicalPlan::TabularScanJson(TabularScanJson { projection_schema, external_info: - ExternalInfo { + LegacyExternalInfo { source_schema, file_infos, file_format_config, diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 8410ad0e1b..3429d72959 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -5,6 +5,8 @@ use std::{cmp::max, collections::HashMap}; use common_error::DaftResult; use daft_core::count_mode::CountMode; use daft_dsl::Expr; +use daft_scan::file_format::FileFormatConfig; +use daft_scan::ScanExternalInfo; use crate::logical_ops::{ Aggregate as LogicalAggregate, Concat as LogicalConcat, Distinct as LogicalDistinct, @@ -15,7 +17,7 @@ use crate::logical_ops::{ use crate::logical_plan::LogicalPlan; use crate::physical_plan::PhysicalPlan; use crate::sink_info::{OutputFileInfo, SinkInfo}; -use crate::source_info::{ExternalInfo as ExternalSourceInfo, FileFormatConfig, SourceInfo}; +use crate::source_info::{ExternalInfo as ExternalSourceInfo, LegacyExternalInfo, SourceInfo}; use crate::{physical_ops::*, PartitionSpec}; use crate::{FileFormat, PartitionScheme}; @@ -31,13 +33,13 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { limit, filters, }) => match source_info.as_ref() { - SourceInfo::ExternalInfo( - ext_info @ ExternalSourceInfo { + SourceInfo::ExternalInfo(ExternalSourceInfo::Legacy( + ext_info @ LegacyExternalInfo { file_format_config, file_infos, .. }, - ) => { + )) => { let partition_spec = Arc::new(PartitionSpec::new_internal( PartitionScheme::Unknown, file_infos.len(), @@ -73,6 +75,26 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { } } } + SourceInfo::ExternalInfo(ExternalSourceInfo::Scan(ScanExternalInfo { + pushdowns, + scan_op, + .. + })) => { + let scan_tasks = scan_op + .0 + .to_scan_tasks(pushdowns.clone())? + .collect::>>()?; + + let partition_spec = Arc::new(PartitionSpec::new_internal( + PartitionScheme::Unknown, + scan_tasks.len(), + None, + )); + Ok(PhysicalPlan::TabularScan(TabularScan::new( + scan_tasks, + partition_spec, + ))) + } #[cfg(feature = "python")] SourceInfo::InMemoryInfo(mem_info) => { let scan = PhysicalPlan::InMemoryScan(InMemoryScan::new( diff --git a/src/daft-plan/src/source_info/mod.rs b/src/daft-plan/src/source_info/mod.rs index d2fc46c329..985940196e 100644 --- a/src/daft-plan/src/source_info/mod.rs +++ b/src/daft-plan/src/source_info/mod.rs @@ -1,23 +1,14 @@ -pub mod file_format; pub mod file_info; -#[cfg(feature = "python")] -mod py_object_serde; -pub mod storage_config; - use daft_core::schema::SchemaRef; -pub use file_format::{ - CsvSourceConfig, FileFormat, FileFormatConfig, JsonSourceConfig, ParquetSourceConfig, - PyFileFormatConfig, -}; +use daft_scan::file_format::FileFormatConfig; +use daft_scan::storage_config::StorageConfig; +use daft_scan::ScanExternalInfo; pub use file_info::{FileInfo, FileInfos}; use serde::{Deserialize, Serialize}; use std::{hash::Hash, sync::Arc}; #[cfg(feature = "python")] -pub use storage_config::PythonStorageConfig; -pub use storage_config::{NativeStorageConfig, PyStorageConfig, StorageConfig}; -#[cfg(feature = "python")] use { - py_object_serde::{deserialize_py_object, serialize_py_object}, + daft_scan::py_object_serde::{deserialize_py_object, serialize_py_object}, pyo3::{PyObject, Python}, std::hash::Hasher, }; @@ -89,15 +80,21 @@ impl Hash for InMemoryInfo { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum ExternalInfo { + Scan(ScanExternalInfo), + Legacy(LegacyExternalInfo), +} + #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub struct ExternalInfo { +pub struct LegacyExternalInfo { pub source_schema: SchemaRef, pub file_infos: Arc, pub file_format_config: Arc, pub storage_config: Arc, } -impl ExternalInfo { +impl LegacyExternalInfo { pub fn new( source_schema: SchemaRef, file_infos: Arc, diff --git a/src/daft-plan/src/test/mod.rs b/src/daft-plan/src/test/mod.rs index 1b7406faaf..9baf63af08 100644 --- a/src/daft-plan/src/test/mod.rs +++ b/src/daft-plan/src/test/mod.rs @@ -1,11 +1,10 @@ use std::sync::Arc; use daft_core::{datatypes::Field, schema::Schema}; +use daft_scan::{file_format::FileFormatConfig, storage_config::StorageConfig}; use crate::{ - builder::LogicalPlanBuilder, - source_info::{FileFormatConfig, FileInfos, StorageConfig}, - JsonSourceConfig, NativeStorageConfig, + builder::LogicalPlanBuilder, source_info::FileInfos, JsonSourceConfig, NativeStorageConfig, }; /// Create a dummy scan node containing the provided fields in its schema. @@ -15,7 +14,7 @@ pub fn dummy_scan_node(fields: Vec) -> LogicalPlanBuilder { FileInfos::new_internal(vec!["/foo".to_string()], vec![None], vec![None]), schema, FileFormatConfig::Json(JsonSourceConfig {}).into(), - StorageConfig::Native(NativeStorageConfig::new_internal(None).into()).into(), + StorageConfig::Native(NativeStorageConfig::new_internal(true, None).into()).into(), ) .unwrap() } @@ -27,7 +26,7 @@ pub fn dummy_scan_node_with_limit(fields: Vec, limit: Option) -> L FileInfos::new_internal(vec!["/foo".to_string()], vec![None], vec![None]), schema, FileFormatConfig::Json(JsonSourceConfig {}).into(), - StorageConfig::Native(NativeStorageConfig::new_internal(None).into()).into(), + StorageConfig::Native(NativeStorageConfig::new_internal(true, None).into()).into(), limit, ) .unwrap() diff --git a/src/daft-scan/Cargo.toml b/src/daft-scan/Cargo.toml index 8603ea3dc6..9573f3c098 100644 --- a/src/daft-scan/Cargo.toml +++ b/src/daft-scan/Cargo.toml @@ -1,5 +1,7 @@ [dependencies] +bincode = {workspace = true} common-error = {path = "../common/error", default-features = false} +common-io-config = {path = "../common/io-config", default-features = false} daft-core = {path = "../daft-core", default-features = false} daft-csv = {path = "../daft-csv", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} @@ -10,12 +12,13 @@ daft-table = {path = "../daft-table", default-features = false} pyo3 = {workspace = true, optional = true} pyo3-log = {workspace = true} serde = {workspace = true} +serde_json = {workspace = true} snafu = {workspace = true} tokio = {workspace = true} [features] default = ["python"] -python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-dsl/python", "daft-table/python", "daft-stats/python"] +python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-dsl/python", "daft-table/python", "daft-stats/python", "common-io-config/python"] [package] edition = {workspace = true} diff --git a/src/daft-scan/src/anonymous.rs b/src/daft-scan/src/anonymous.rs index 4b1f31d55b..76f1768c2f 100644 --- a/src/daft-scan/src/anonymous.rs +++ b/src/daft-scan/src/anonymous.rs @@ -1,26 +1,32 @@ -use std::fmt::Display; +use std::{fmt::Display, sync::Arc}; use common_error::DaftResult; use daft_core::schema::SchemaRef; -use crate::{DataFileSource, FileType, PartitionField, ScanOperator, ScanOperatorRef, ScanTask}; +use crate::{ + file_format::FileFormatConfig, storage_config::StorageConfig, DataFileSource, PartitionField, + Pushdowns, ScanOperator, ScanTask, +}; #[derive(Debug)] pub struct AnonymousScanOperator { - schema: SchemaRef, - file_type: FileType, files: Vec, - columns_to_select: Option>, - limit: Option, + schema: SchemaRef, + file_format_config: Arc, + storage_config: Arc, } impl AnonymousScanOperator { - pub fn new(schema: SchemaRef, file_type: FileType, files: Vec) -> Self { + pub fn new( + files: Vec, + schema: SchemaRef, + file_format_config: Arc, + storage_config: Arc, + ) -> Self { Self { - schema, - file_type, files, - columns_to_select: None, - limit: None, + schema, + file_format_config, + storage_config, } } } @@ -40,51 +46,40 @@ impl ScanOperator for AnonymousScanOperator { &[] } - fn num_partitions(&self) -> common_error::DaftResult { - Ok(self.files.len()) - } - - fn select(self: Box, columns: &[&str]) -> common_error::DaftResult { - for c in columns { - if self.schema.get_field(c).is_err() { - return Err(common_error::DaftError::FieldNotFound(format!( - "{c} not found in {:?}", - self.columns_to_select - ))); - } - } - let mut to_rtn = self; - to_rtn.columns_to_select = Some(columns.iter().map(|s| s.to_string()).collect()); - Ok(to_rtn) + fn can_absorb_filter(&self) -> bool { + false } - - fn limit(self: Box, num: usize) -> DaftResult { - let mut to_rtn = self; - to_rtn.limit = Some(num); - Ok(to_rtn) + fn can_absorb_select(&self) -> bool { + false } - - fn filter(self: Box, _predicate: &daft_dsl::Expr) -> DaftResult<(bool, ScanOperatorRef)> { - Ok((false, self)) + fn can_absorb_limit(&self) -> bool { + false } fn to_scan_tasks( - self: Box, - ) -> DaftResult>>> { - let iter = self.files.clone().into_iter().map(move |f| { - let source = DataFileSource::AnonymousDataFile { - file_type: self.file_type, - path: f, - metadata: None, - partition_spec: None, - statistics: None, - }; - Ok(ScanTask { - source, - columns: self.columns_to_select.clone(), - limit: self.limit, - }) - }); - Ok(Box::new(iter)) + &self, + pushdowns: Pushdowns, + ) -> DaftResult>>> { + let files = self.files.clone(); + let file_format_config = self.file_format_config.clone(); + let schema = self.schema.clone(); + let storage_config = self.storage_config.clone(); + + // Create one ScanTask per file. + Ok(Box::new(files.into_iter().map(move |f| { + Ok(ScanTask::new( + vec![DataFileSource::AnonymousDataFile { + path: f.to_string(), + metadata: None, + partition_spec: None, + statistics: None, + }], + file_format_config.clone(), + schema.clone(), + storage_config.clone(), + pushdowns.columns.clone(), + pushdowns.limit, + )) + }))) } } diff --git a/src/daft-plan/src/source_info/file_format.rs b/src/daft-scan/src/file_format.rs similarity index 74% rename from src/daft-plan/src/source_info/file_format.rs rename to src/daft-scan/src/file_format.rs index 60f6acc863..efd3e8beaf 100644 --- a/src/daft-plan/src/source_info/file_format.rs +++ b/src/daft-scan/src/file_format.rs @@ -1,11 +1,15 @@ -use daft_core::impl_bincode_py_state_serialization; +use common_error::{DaftError, DaftResult}; +use daft_core::{datatypes::TimeUnit, impl_bincode_py_state_serialization}; use serde::{Deserialize, Serialize}; -use std::sync::Arc; +use std::{str::FromStr, sync::Arc}; #[cfg(feature = "python")] -use pyo3::{ - pyclass, pyclass::CompareOp, pymethods, types::PyBytes, IntoPy, PyObject, PyResult, PyTypeInfo, - Python, ToPyObject, +use { + daft_core::python::datatype::PyTimeUnit, + pyo3::{ + exceptions::PyValueError, pyclass, pyclass::CompareOp, pymethods, types::PyBytes, IntoPy, + PyObject, PyResult, PyTypeInfo, Python, ToPyObject, + }, }; /// Format of a file, e.g. Parquet, CSV, JSON. @@ -17,6 +21,27 @@ pub enum FileFormat { Json, } +impl FromStr for FileFormat { + type Err = DaftError; + + fn from_str(file_format: &str) -> DaftResult { + use FileFormat::*; + + if file_format.trim().eq_ignore_ascii_case("parquet") { + Ok(Parquet) + } else if file_format.trim().eq_ignore_ascii_case("csv") { + Ok(Csv) + } else if file_format.trim().eq_ignore_ascii_case("json") { + Ok(Json) + } else { + Err(DaftError::TypeError(format!( + "FileFormat {} not supported!", + file_format + ))) + } + } +} + impl_bincode_py_state_serialization!(FileFormat); impl From<&FileFormatConfig> for FileFormat { @@ -53,7 +78,8 @@ impl FileFormatConfig { #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] #[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] pub struct ParquetSourceConfig { - multithreaded_io: bool, + pub coerce_int96_timestamp_unit: TimeUnit, + pub row_groups: Option>, } #[cfg(feature = "python")] @@ -61,13 +87,23 @@ pub struct ParquetSourceConfig { impl ParquetSourceConfig { /// Create a config for a Parquet data source. #[new] - fn new(multithreaded_io: bool) -> Self { - Self { multithreaded_io } + fn new(coerce_int96_timestamp_unit: Option, row_groups: Option>) -> Self { + Self { + coerce_int96_timestamp_unit: coerce_int96_timestamp_unit + .unwrap_or(TimeUnit::Nanoseconds.into()) + .into(), + row_groups, + } } #[getter] - fn multithreaded_io(&self) -> PyResult { - Ok(self.multithreaded_io) + fn row_groups(&self) -> PyResult>> { + Ok(self.row_groups.clone()) + } + + #[getter] + fn coerce_int96_timestamp_unit(&self) -> PyResult { + Ok(self.coerce_int96_timestamp_unit.into()) } } @@ -102,14 +138,20 @@ impl CsvSourceConfig { double_quote: bool, buffer_size: Option, chunk_size: Option, - ) -> Self { - Self { + ) -> PyResult { + if delimiter.as_bytes().len() != 1 { + return Err(PyValueError::new_err(format!( + "Cannot create CsvSourceConfig with delimiter with length: {}", + delimiter.len() + ))); + } + Ok(Self { delimiter, has_headers, double_quote, buffer_size, chunk_size, - } + }) } } diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 1043a76bf4..9c2c7a9025 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -2,33 +2,34 @@ use std::{fmt::Display, sync::Arc}; use common_error::DaftResult; use daft_core::schema::SchemaRef; -use daft_io::{get_io_client, get_runtime, IOStatsContext}; +use daft_io::{get_io_client, get_runtime, parse_url, IOClient, IOStatsContext}; +use daft_parquet::read::ParquetSchemaInferenceOptions; -use crate::{DataFileSource, FileType, PartitionField, ScanOperator, ScanOperatorRef, ScanTask}; -#[derive(Debug)] +use crate::{ + file_format::{CsvSourceConfig, FileFormatConfig, JsonSourceConfig, ParquetSourceConfig}, + storage_config::StorageConfig, + DataFileSource, PartitionField, Pushdowns, ScanOperator, ScanTask, +}; +#[derive(Debug, PartialEq, Hash)] pub struct GlobScanOperator { glob_path: String, - file_type: FileType, - columns_to_select: Option>, - limit: Option, + file_format_config: Arc, schema: SchemaRef, - io_config: Arc, + storage_config: Arc, } fn run_glob( glob_path: &str, - io_config: Arc, limit: Option, + io_client: Arc, + runtime: Arc, ) -> DaftResult> { - // Use multi-threaded runtime which should be global Arc-ed cached singletons - let runtime = get_runtime(true)?; - let io_client = get_io_client(true, io_config)?; - + let (_, parsed_glob_path) = parse_url(glob_path)?; let _rt_guard = runtime.enter(); runtime.block_on(async { Ok(io_client .as_ref() - .glob(glob_path, None, None, limit, None) + .glob(&parsed_glob_path, None, None, limit, None) .await? .into_iter() .map(|fm| fm.filepath) @@ -36,55 +37,101 @@ fn run_glob( }) } +fn get_io_client_and_runtime( + storage_config: &StorageConfig, +) -> DaftResult<(Arc, Arc)> { + // Grab an IOClient and Runtime + // TODO: This should be cleaned up and hidden behind a better API from daft-io + match storage_config { + StorageConfig::Native(cfg) => { + let multithreaded_io = cfg.multithreaded_io; + Ok(( + get_runtime(multithreaded_io)?, + get_io_client( + multithreaded_io, + Arc::new(cfg.io_config.clone().unwrap_or_default()), + )?, + )) + } + #[cfg(feature = "python")] + StorageConfig::Python(cfg) => { + let multithreaded_io = true; // Hardcode to use multithreaded IO if Python storage config is used for data fetches + Ok(( + get_runtime(multithreaded_io)?, + get_io_client( + multithreaded_io, + Arc::new(cfg.io_config.clone().unwrap_or_default()), + )?, + )) + } + } +} + impl GlobScanOperator { - pub fn _try_new( + pub fn try_new( glob_path: &str, - file_type: FileType, - io_config: Arc, + file_format_config: Arc, + storage_config: Arc, + schema: Option, ) -> DaftResult { - let paths = run_glob(glob_path, io_config.clone(), Some(1))?; - let first_filepath = paths[0].as_str(); - - let schema = match file_type { - FileType::Parquet => { - let io_client = get_io_client(true, io_config.clone())?; // it appears that read_parquet_schema is hardcoded to use multithreaded_io - let io_stats = IOStatsContext::new(format!( - "GlobScanOperator constructor read_parquet_schema: for uri {first_filepath}" - )); - daft_parquet::read::read_parquet_schema( - first_filepath, - io_client, - Some(io_stats), - Default::default(), // TODO: pass-through schema inference options - )? - } - FileType::Csv => { - let io_client = get_io_client(true, io_config.clone())?; // it appears that read_parquet_schema is hardcoded to use multithreaded_io - let io_stats = IOStatsContext::new(format!( - "GlobScanOperator constructor read_csv_schema: for uri {first_filepath}" - )); - let (schema, _, _, _, _) = daft_csv::metadata::read_csv_schema( - first_filepath, - true, // TODO: pass-through schema inference options - None, // TODO: pass-through schema inference options - true, // TODO: pass-through schema inference options - None, // TODO: pass-through schema inference options - io_client, - Some(io_stats), - )?; - schema + let schema = match schema { + Some(s) => s, + None => { + let (io_runtime, io_client) = get_io_client_and_runtime(storage_config.as_ref())?; + let paths = run_glob(glob_path, Some(1), io_client.clone(), io_runtime)?; + let first_filepath = paths[0].as_str(); + let inferred_schema = match file_format_config.as_ref() { + FileFormatConfig::Parquet(ParquetSourceConfig { + coerce_int96_timestamp_unit, + .. + }) => { + let io_stats = IOStatsContext::new(format!( + "GlobScanOperator constructor read_parquet_schema: for uri {first_filepath}" + )); + daft_parquet::read::read_parquet_schema( + first_filepath, + io_client.clone(), + Some(io_stats), + ParquetSchemaInferenceOptions { + coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit, + }, + )? + } + FileFormatConfig::Csv(CsvSourceConfig { + delimiter, + has_headers, + double_quote, + .. + }) => { + let io_stats = IOStatsContext::new(format!( + "GlobScanOperator constructor read_csv_schema: for uri {first_filepath}" + )); + let (schema, _, _, _, _) = daft_csv::metadata::read_csv_schema( + first_filepath, + *has_headers, + Some(delimiter.as_bytes()[0]), + *double_quote, + None, + io_client, + Some(io_stats), + )?; + schema + } + FileFormatConfig::Json(JsonSourceConfig {}) => { + // NOTE: Native JSON reads not yet implemented, so we have to delegate to Python here or implement + // a daft_json crate that gives us native JSON schema inference + todo!("Implement schema inference from JSON in GlobScanOperator"); + } + }; + Arc::new(inferred_schema) } - FileType::Avro => todo!("Schema inference for Avro not implemented"), - FileType::Orc => todo!("Schema inference for Orc not implemented"), }; Ok(Self { glob_path: glob_path.to_string(), - file_type, - columns_to_select: None, - limit: None, - schema: Arc::new(schema), - io_config, + file_format_config, + schema, + storage_config, }) } } @@ -104,52 +151,44 @@ impl ScanOperator for GlobScanOperator { &[] } - fn num_partitions(&self) -> common_error::DaftResult { - unimplemented!("Cannot get number of partitions -- this will not be implemented."); + fn can_absorb_filter(&self) -> bool { + false } - - fn select(self: Box, columns: &[&str]) -> common_error::DaftResult { - for c in columns { - if self.schema.get_field(c).is_err() { - return Err(common_error::DaftError::FieldNotFound(format!( - "{c} not found in {:?}", - self.columns_to_select - ))); - } - } - let mut to_rtn = self; - to_rtn.columns_to_select = Some(columns.iter().map(|s| s.to_string()).collect()); - Ok(to_rtn) - } - - fn limit(self: Box, num: usize) -> DaftResult { - let mut to_rtn = self; - to_rtn.limit = Some(num); - Ok(to_rtn) + fn can_absorb_select(&self) -> bool { + false } - - fn filter(self: Box, _predicate: &daft_dsl::Expr) -> DaftResult<(bool, ScanOperatorRef)> { - Ok((false, self)) + fn can_absorb_limit(&self) -> bool { + false } fn to_scan_tasks( - self: Box, - ) -> DaftResult>>> { - let files = run_glob(self.glob_path.as_str(), self.io_config.clone(), None)?; - let iter = files.into_iter().map(move |f| { - let source = DataFileSource::AnonymousDataFile { - file_type: self.file_type, - path: f, - metadata: None, - partition_spec: None, - statistics: None, - }; - Ok(ScanTask { - source, - columns: self.columns_to_select.clone(), - limit: self.limit, - }) - }); - Ok(Box::new(iter)) + &self, + pushdowns: Pushdowns, + ) -> DaftResult>>> { + let (io_runtime, io_client) = get_io_client_and_runtime(self.storage_config.as_ref())?; + + // TODO: This runs the glob to exhaustion, but we should return an iterator instead + let files = run_glob(self.glob_path.as_str(), None, io_client, io_runtime)?; + let file_format_config = self.file_format_config.clone(); + let schema = self.schema.clone(); + let storage_config = self.storage_config.clone(); + + // Create one ScanTask per file. We should find a way to perform streaming from the glob instead + // of materializing here. + Ok(Box::new(files.into_iter().map(move |f| { + Ok(ScanTask::new( + vec![DataFileSource::AnonymousDataFile { + path: f.to_string(), + metadata: None, + partition_spec: None, + statistics: None, + }], + file_format_config.clone(), + schema.clone(), + storage_config.clone(), + pushdowns.columns.clone(), + pushdowns.limit, + )) + }))) } } diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index f23fbb0cae..ab7ae11de7 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -1,62 +1,38 @@ use std::{ fmt::{Debug, Display}, - str::FromStr, + hash::{Hash, Hasher}, + sync::Arc, }; -use common_error::{DaftError, DaftResult}; +use common_error::DaftResult; use daft_core::{datatypes::Field, schema::SchemaRef}; -use daft_dsl::Expr; +use daft_dsl::{Expr, ExprRef}; use daft_stats::{PartitionSpec, TableMetadata, TableStatistics}; +use file_format::FileFormatConfig; use serde::{Deserialize, Serialize}; mod anonymous; +pub mod file_format; mod glob; +#[cfg(feature = "python")] +pub mod py_object_serde; + #[cfg(feature = "python")] pub mod python; +pub mod storage_config; #[cfg(feature = "python")] pub use python::register_modules; +use storage_config::StorageConfig; -#[derive(Serialize, Deserialize, Clone, Copy, Debug)] -pub enum FileType { - Parquet, - Avro, - Orc, - Csv, -} - -impl FromStr for FileType { - type Err = DaftError; - - fn from_str(file_type: &str) -> DaftResult { - use FileType::*; - if file_type.trim().eq_ignore_ascii_case("parquet") { - Ok(Parquet) - } else if file_type.trim().eq_ignore_ascii_case("avro") { - Ok(Avro) - } else if file_type.trim().eq_ignore_ascii_case("orc") { - Ok(Orc) - } else if file_type.trim().eq_ignore_ascii_case("csv") { - Ok(Csv) - } else { - Err(DaftError::TypeError(format!( - "FileType {} not supported!", - file_type - ))) - } - } -} - -#[derive(Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum DataFileSource { AnonymousDataFile { - file_type: FileType, path: String, metadata: Option, partition_spec: Option, statistics: Option, }, CatalogDataFile { - file_type: FileType, path: String, metadata: TableMetadata, partition_spec: PartitionSpec, @@ -64,31 +40,218 @@ pub enum DataFileSource { }, } -#[derive(Serialize, Deserialize)] +impl DataFileSource { + pub fn get_path(&self) -> &str { + match self { + Self::AnonymousDataFile { path, .. } | Self::CatalogDataFile { path, .. } => path, + } + } + pub fn get_metadata(&self) -> Option<&TableMetadata> { + match self { + Self::AnonymousDataFile { metadata, .. } => metadata.as_ref(), + Self::CatalogDataFile { metadata, .. } => Some(metadata), + } + } + + pub fn get_statistics(&self) -> Option<&TableStatistics> { + match self { + Self::AnonymousDataFile { statistics, .. } + | Self::CatalogDataFile { statistics, .. } => statistics.as_ref(), + } + } +} + +#[derive(Debug, Serialize, Deserialize)] pub struct ScanTask { - // Micropartition will take this in as an input - source: DataFileSource, - columns: Option>, - limit: Option, + pub sources: Vec, + pub file_format_config: Arc, + pub schema: SchemaRef, + pub storage_config: Arc, + // TODO(Clark): Directly use the Pushdowns struct as part of the ScanTask struct? + pub columns: Option>>, + pub limit: Option, + pub metadata: Option, + pub statistics: Option, } -#[derive(Serialize, Deserialize)] + +impl ScanTask { + pub fn new( + sources: Vec, + file_format_config: Arc, + schema: SchemaRef, + storage_config: Arc, + columns: Option>>, + limit: Option, + ) -> Self { + assert!(!sources.is_empty()); + let (length, statistics) = sources + .iter() + .map(|s| { + ( + s.get_metadata().map(|m| m.length), + s.get_statistics().cloned(), + ) + }) + .reduce(|(acc_len, acc_stats), (curr_len, curr_stats)| { + ( + acc_len.and_then(|acc_len| curr_len.map(|curr_len| acc_len + curr_len)), + acc_stats.and_then(|acc_stats| { + curr_stats.map(|curr_stats| acc_stats.union(&curr_stats).unwrap()) + }), + ) + }) + .unwrap(); + let metadata = length.map(|l| TableMetadata { length: l }); + Self { + sources, + file_format_config, + schema, + storage_config, + columns, + limit, + metadata, + statistics, + } + } + + pub fn num_rows(&self) -> Option { + self.metadata.as_ref().map(|m| m.length) + } + + pub fn size_bytes(&self) -> Option { + self.statistics.as_ref().and_then(|s| { + self.num_rows() + .and_then(|num_rows| Some(num_rows * s.estimate_row_size().ok()?)) + }) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct PartitionField { field: Field, source_field: Option, transform: Option, } -pub trait ScanOperator: Send + Display { +pub trait ScanOperator: Send + Sync + Display + Debug { fn schema(&self) -> SchemaRef; fn partitioning_keys(&self) -> &[PartitionField]; - fn num_partitions(&self) -> DaftResult; - - // also returns a bool to indicate if the scan operator can "absorb" the predicate - fn filter(self: Box, predicate: &Expr) -> DaftResult<(bool, ScanOperatorRef)>; - fn select(self: Box, columns: &[&str]) -> DaftResult; - fn limit(self: Box, num: usize) -> DaftResult; - fn to_scan_tasks(self: Box) - -> DaftResult>>>; + + fn can_absorb_filter(&self) -> bool; + fn can_absorb_select(&self) -> bool; + fn can_absorb_limit(&self) -> bool; + fn to_scan_tasks( + &self, + pushdowns: Pushdowns, + ) -> DaftResult>>>; +} + +/// Light transparent wrapper around an Arc that implements Eq/PartialEq/Hash +/// functionality to be performed on the **pointer** instead of on the value in the pointer. +/// +/// This lets us get around having to implement full hashing/equality on [`ScanOperator`]`, which +/// is difficult because we sometimes have weird Python implementations that can be hard to check. +/// +/// [`ScanOperatorRef`] should be thus held by structs that need to check the "sameness" of the +/// underlying ScanOperator instance, for example in the Scan nodes in a logical plan which need +/// to check for sameness of Scan nodes during plan optimization. +#[derive(Debug, Clone)] +pub struct ScanOperatorRef(pub Arc); + +impl Hash for ScanOperatorRef { + fn hash(&self, state: &mut H) { + Arc::as_ptr(&self.0).hash(state) + } } -pub type ScanOperatorRef = Box; +impl PartialEq for ScanOperatorRef { + fn eq(&self, other: &ScanOperatorRef) -> bool { + Arc::ptr_eq(&self.0, &other.0) + } +} + +impl std::cmp::Eq for ScanOperatorRef {} + +impl Display for ScanOperatorRef { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self.0, f) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ScanExternalInfo { + pub scan_op: ScanOperatorRef, + pub source_schema: SchemaRef, + pub partitioning_keys: Vec, + pub pushdowns: Pushdowns, +} + +impl ScanExternalInfo { + pub fn new( + scan_op: ScanOperatorRef, + source_schema: SchemaRef, + partitioning_keys: Vec, + pushdowns: Pushdowns, + ) -> Self { + Self { + scan_op, + source_schema, + partitioning_keys, + pushdowns, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Pushdowns { + /// Optional filters to apply to the source data. + pub filters: Option>>, + /// Optional columns to select from the source data. + pub columns: Option>>, + /// Optional number of rows to read. + pub limit: Option, +} + +impl Default for Pushdowns { + fn default() -> Self { + Self::new(None, None, None) + } +} + +impl Pushdowns { + pub fn new( + filters: Option>>, + columns: Option>>, + limit: Option, + ) -> Self { + Self { + filters, + columns, + limit, + } + } + + pub fn with_limit(&self, limit: Option) -> Self { + Self { + filters: self.filters.clone(), + columns: self.columns.clone(), + limit, + } + } + + pub fn with_filters(&self, filters: Option>>) -> Self { + Self { + filters, + columns: self.columns.clone(), + limit: self.limit, + } + } + + pub fn with_columns(&self, columns: Option>>) -> Self { + Self { + filters: self.filters.clone(), + columns, + limit: self.limit, + } + } +} diff --git a/src/daft-plan/src/source_info/py_object_serde.rs b/src/daft-scan/src/py_object_serde.rs similarity index 88% rename from src/daft-plan/src/source_info/py_object_serde.rs rename to src/daft-scan/src/py_object_serde.rs index f85b1b5678..2dde3fff1b 100644 --- a/src/daft-plan/src/source_info/py_object_serde.rs +++ b/src/daft-scan/src/py_object_serde.rs @@ -4,7 +4,7 @@ use serde::{ }; use std::fmt; -pub(super) fn serialize_py_object(obj: &PyObject, s: S) -> Result +pub fn serialize_py_object(obj: &PyObject, s: S) -> Result where S: Serializer, { @@ -56,7 +56,7 @@ impl<'de> Visitor<'de> for PyObjectVisitor { } #[cfg(feature = "python")] -pub(super) fn deserialize_py_object<'de, D>(d: D) -> Result +pub fn deserialize_py_object<'de, D>(d: D) -> Result where D: Deserializer<'de>, { @@ -69,10 +69,7 @@ where struct PyObjSerdeWrapper<'a>(#[serde(serialize_with = "serialize_py_object")] &'a PyObject); #[cfg(feature = "python")] -pub(super) fn serialize_py_object_optional( - obj: &Option, - s: S, -) -> Result +pub fn serialize_py_object_optional(obj: &Option, s: S) -> Result where S: Serializer, { @@ -108,7 +105,7 @@ impl<'de> Visitor<'de> for OptPyObjectVisitor { } #[cfg(feature = "python")] -pub(super) fn deserialize_py_object_optional<'de, D>(d: D) -> Result, D::Error> +pub fn deserialize_py_object_optional<'de, D>(d: D) -> Result, D::Error> where D: Deserializer<'de>, { diff --git a/src/daft-scan/src/python.rs b/src/daft-scan/src/python.rs index 3a56e6f562..ddf13d97a2 100644 --- a/src/daft-scan/src/python.rs +++ b/src/daft-scan/src/python.rs @@ -1,46 +1,113 @@ use pyo3::prelude::*; pub mod pylib { + use std::sync::Arc; + use pyo3::prelude::*; - use std::str::FromStr; use daft_core::python::schema::PySchema; use pyo3::pyclass; + use serde::{Deserialize, Serialize}; use crate::anonymous::AnonymousScanOperator; - use crate::FileType; - use crate::ScanOperatorRef; + use crate::file_format::PyFileFormatConfig; + use crate::glob::GlobScanOperator; + use crate::storage_config::PyStorageConfig; + use crate::{ScanOperatorRef, ScanTask}; #[pyclass(module = "daft.daft", frozen)] - pub(crate) struct ScanOperator { + #[derive(Debug, Clone)] + pub struct ScanOperatorHandle { scan_op: ScanOperatorRef, } #[pymethods] - impl ScanOperator { + impl ScanOperatorHandle { pub fn __repr__(&self) -> PyResult { Ok(format!("{}", self.scan_op)) } #[staticmethod] pub fn anonymous_scan( - schema: PySchema, - file_type: &str, files: Vec, + schema: PySchema, + file_format_config: PyFileFormatConfig, + storage_config: PyStorageConfig, ) -> PyResult { let schema = schema.schema; - let operator = Box::new(AnonymousScanOperator::new( - schema, - FileType::from_str(file_type)?, + let operator = Arc::new(AnonymousScanOperator::new( files, + schema, + file_format_config.into(), + storage_config.into(), )); - Ok(ScanOperator { scan_op: operator }) + Ok(ScanOperatorHandle { + scan_op: ScanOperatorRef(operator), + }) + } + + #[staticmethod] + pub fn glob_scan( + glob_path: &str, + file_format_config: PyFileFormatConfig, + storage_config: PyStorageConfig, + schema: Option, + ) -> PyResult { + let operator = Arc::new(GlobScanOperator::try_new( + glob_path, + file_format_config.into(), + storage_config.into(), + schema.map(|s| s.schema), + )?); + Ok(ScanOperatorHandle { + scan_op: ScanOperatorRef(operator), + }) + } + } + + impl From for ScanOperatorHandle { + fn from(value: ScanOperatorRef) -> Self { + Self { scan_op: value } + } + } + + impl From for ScanOperatorRef { + fn from(value: ScanOperatorHandle) -> Self { + value.scan_op + } + } + + #[pyclass(module = "daft.daft", name = "ScanTask", frozen)] + #[derive(Debug, Clone, Serialize, Deserialize)] + pub struct PyScanTask(pub Arc); + + #[pymethods] + impl PyScanTask { + pub fn num_rows(&self) -> PyResult> { + Ok(self.0.num_rows().map(i64::try_from).transpose()?) + } + + pub fn size_bytes(&self) -> PyResult> { + Ok(self.0.size_bytes().map(i64::try_from).transpose()?) + } + } + + impl From> for PyScanTask { + fn from(value: Arc) -> Self { + Self(value) + } + } + + impl From for Arc { + fn from(value: PyScanTask) -> Self { + value.0 } } } pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { - parent.add_class::()?; + parent.add_class::()?; + parent.add_class::()?; Ok(()) } diff --git a/src/daft-plan/src/source_info/storage_config.rs b/src/daft-scan/src/storage_config.rs similarity index 91% rename from src/daft-plan/src/source_info/storage_config.rs rename to src/daft-scan/src/storage_config.rs index 5ef40640bc..fa705cf4d4 100644 --- a/src/daft-plan/src/source_info/storage_config.rs +++ b/src/daft-scan/src/storage_config.rs @@ -29,11 +29,15 @@ pub enum StorageConfig { #[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] pub struct NativeStorageConfig { pub io_config: Option, + pub multithreaded_io: bool, } impl NativeStorageConfig { - pub fn new_internal(io_config: Option) -> Self { - Self { io_config } + pub fn new_internal(multithreaded_io: bool, io_config: Option) -> Self { + Self { + io_config, + multithreaded_io, + } } } @@ -41,14 +45,19 @@ impl NativeStorageConfig { #[pymethods] impl NativeStorageConfig { #[new] - pub fn new(io_config: Option) -> Self { - Self::new_internal(io_config.map(|c| c.config)) + pub fn new(multithreaded_io: bool, io_config: Option) -> Self { + Self::new_internal(multithreaded_io, io_config.map(|c| c.config)) } #[getter] pub fn io_config(&self) -> Option { self.io_config.clone().map(|c| c.into()) } + + #[getter] + pub fn multithreaded_io(&self) -> bool { + self.multithreaded_io + } } /// Storage configuration for the legacy Python I/O layer. diff --git a/src/daft-stats/src/partition_spec.rs b/src/daft-stats/src/partition_spec.rs index c75db0344d..a5e60a19bb 100644 --- a/src/daft-stats/src/partition_spec.rs +++ b/src/daft-stats/src/partition_spec.rs @@ -1,6 +1,6 @@ use daft_table::Table; -#[derive(Clone, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PartitionSpec { keys: Table, } diff --git a/src/daft-stats/src/table_metadata.rs b/src/daft-stats/src/table_metadata.rs index 8c6d980dae..55112819f4 100644 --- a/src/daft-stats/src/table_metadata.rs +++ b/src/daft-stats/src/table_metadata.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct TableMetadata { pub length: usize, } diff --git a/tests/table/table_io/test_csv.py b/tests/table/table_io/test_csv.py index bcf01a03ec..e6c70cef15 100644 --- a/tests/table/table_io/test_csv.py +++ b/tests/table/table_io/test_csv.py @@ -18,7 +18,7 @@ def storage_config_from_use_native_downloader(use_native_downloader: bool) -> StorageConfig: if use_native_downloader: - return StorageConfig.native(NativeStorageConfig(None)) + return StorageConfig.native(NativeStorageConfig(True, None)) else: return StorageConfig.python(PythonStorageConfig(None)) diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index 83c6e67e2b..84dc5c12c2 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -23,7 +23,7 @@ def storage_config_from_use_native_downloader(use_native_downloader: bool) -> StorageConfig: if use_native_downloader: - return StorageConfig.native(NativeStorageConfig(None)) + return StorageConfig.native(NativeStorageConfig(True, None)) else: return StorageConfig.python(PythonStorageConfig(None))