Skip to content

Commit

Permalink
Docs: Add docs to RepartitionExec and architecture guide
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 17, 2023
1 parent b0d2491 commit 7de4552
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 11 deletions.
19 changes: 12 additions & 7 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,17 @@
//! ```
//!
//! [`ExecutionPlan`]s process data using the [Apache Arrow] memory
//! format, largely with functions from the [arrow] crate. When
//! [`execute`] is called, a [`SendableRecordBatchStream`] is returned
//! that produces the desired output as a [`Stream`] of [`RecordBatch`]es.
//! format, making heavy use of functions from the [arrow]
//! crate. Calling [`execute`] produces 1 or more partitions of data,
//! consisting an operator that implements
//! [`SendableRecordBatchStream`].
//!
//! Values are
//! represented with [`ColumnarValue`], which are either single
//! constant values ([`ScalarValue`]) or Arrow Arrays ([`ArrayRef`]).
//! Values are represented with [`ColumnarValue`], which are either
//! [`ScalarValue`] (single constant values) or [`ArrayRef`] (Arrow
//! Arrays).
//!
//! Balanced parallelism is achieved using [`RepartitionExec`], which
//! implements a [Volcano style] "Exchange".
//!
//! [`execute`]: physical_plan::ExecutionPlan::execute
//! [`SendableRecordBatchStream`]: crate::physical_plan::SendableRecordBatchStream
Expand All @@ -345,9 +349,10 @@
//! [`ArrayRef`]: arrow::array::ArrayRef
//! [`Stream`]: futures::stream::Stream
//!
//!
//! See the [implementors of `ExecutionPlan`] for a list of physical operators available.
//!
//! [`RepartitionExec`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html
//! [Volcano style]: https://w6113.github.io/files/papers/volcanoparallelism-89.pdf
//! [implementors of `ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#implementors
//!
//! ## State Management and Configuration
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
fn schema(&self) -> SchemaRef;
}

/// Trait for a stream of record batches.
/// Trait for a [`Stream`] of [`RecordBatch`]es
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;

/// EmptyRecordBatchStream can be used to create a RecordBatchStream
Expand Down
63 changes: 61 additions & 2 deletions datafusion/core/src/physical_plan/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,67 @@ impl BatchPartitioner {
}
}

/// The repartition operator maps N input partitions to M output partitions based on a
/// partitioning scheme. No guarantees are made about the order of the resulting partitions.
/// Maps `N` input partitions to `M output partitions based on a
/// [`Partitioning`] scheme.
///
/// # Background
///
/// DataFusion, like most other commercial systems, with the the
/// notable exception of DuckDB, uses the "Exchange Operator" based
/// approach to parallelism which works well in practice given
/// sufficient care in implementation.
///
/// DataFusion's planner picks the target number of partitions and
/// then `RepartionExec` redistributes [`RecordBatch`]es to that number
/// of output partitions.
///
/// For example, given `target_partitions=3` (trying to use 3 cores)
/// but scanning an input with 2 partitions, `RepartitionExec` can be
/// used to get 3 even streams of `RecordBatch`es
///
///
///```text
/// ▲ ▲ ▲
/// │ │ │
/// │ │ │
/// │ │ │
///┌───────────────┐ ┌───────────────┐ ┌───────────────┐
///│ GroupBy │ │ GroupBy │ │ GroupBy │
///│ (Partial) │ │ (Partial) │ │ (Partial) │
///└───────────────┘ └───────────────┘ └───────────────┘
/// ▲ ▲ ▲
/// └──────────────────┼──────────────────┘
/// │
/// ┌─────────────────────────┐
/// │ RepartitionExec │
/// │ (hash/round robin) │
/// └─────────────────────────┘
/// ▲ ▲
/// ┌───────────┘ └───────────┐
/// │ │
/// │ │
/// .─────────. .─────────.
/// ,─' '─. ,─' '─.
/// ; Input : ; Input :
/// : Partition 0 ; : Partition 1 ;
/// ╲ ╱ ╲ ╱
/// '─. ,─' '─. ,─'
/// `───────' `───────'
///```
///
/// # Output Ordering
///
/// No guarantees are made about the order of the resulting
/// partitions unless `preserve_order` is set.
///
/// # Footnote
///
/// The "Exchange Operator" was first described in the 1989 paper
/// [Encapsulation of parallelism in the Volcano query processing
/// system
/// Paper](https://w6113.github.io/files/papers/volcanoparallelism-89.pdf)
/// which uses the term "Exchange" for the concept of repartitioning
/// data across threads.
#[derive(Debug)]
pub struct RepartitionExec {
/// Input execution plan
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub enum LogicalPlan {
Join(Join),
/// Apply Cross Join to two logical plans
CrossJoin(CrossJoin),
/// Repartition the plan based on a partitioning scheme.
/// Repartition the plan based on a partitioning scheme
Repartition(Repartition),
/// Union multiple inputs
Union(Union),
Expand Down

0 comments on commit 7de4552

Please sign in to comment.