Skip to content

Commit

Permalink
Minor: Move streams out of main physical_plan module
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Aug 8, 2023
1 parent 19988a8 commit ea3dc96
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 55 deletions.
50 changes: 5 additions & 45 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,59 +35,19 @@ pub use datafusion_expr::Accumulator;
pub use datafusion_expr::ColumnarValue;
use datafusion_physical_expr::equivalence::OrderingEquivalenceProperties;
pub use display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
use futures::stream::{Stream, TryStreamExt};
use futures::stream::TryStreamExt;
use std::fmt;
use std::fmt::Debug;
use tokio::task::JoinSet;

use datafusion_common::tree_node::Transformed;
use datafusion_common::DataFusionError;
use std::any::Any;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{any::Any, pin::Pin};

/// Trait for types that stream [arrow::record_batch::RecordBatch]
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
/// Returns the schema of this `RecordBatchStream`.
///
/// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this
/// stream should have the same schema as returned from this method.
fn schema(&self) -> SchemaRef;
}

/// Trait for a [`Stream`](futures::stream::Stream) of [`RecordBatch`]es
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;

/// EmptyRecordBatchStream can be used to create a RecordBatchStream
/// that will produce no results
pub struct EmptyRecordBatchStream {
/// Schema wrapped by Arc
schema: SchemaRef,
}

impl EmptyRecordBatchStream {
/// Create an empty RecordBatchStream
pub fn new(schema: SchemaRef) -> Self {
Self { schema }
}
}

impl RecordBatchStream for EmptyRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

impl Stream for EmptyRecordBatchStream {
type Item = Result<RecordBatch>;

fn poll_next(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}
}
// backwards compatibility
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
pub use stream::EmptyRecordBatchStream;

/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
///
Expand Down
51 changes: 41 additions & 10 deletions datafusion/core/src/physical_plan/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

//! Stream wrappers for physical operators

use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use crate::physical_plan::displayable;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
Expand Down Expand Up @@ -231,9 +234,9 @@ impl Stream for RecordBatchReceiverStream {
type Item = Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
Expand Down Expand Up @@ -276,10 +279,7 @@ where
{
type Item = Result<RecordBatch>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().stream.poll_next(cx)
}

Expand All @@ -297,6 +297,37 @@ where
}
}

/// EmptyRecordBatchStream can be used to create a RecordBatchStream
/// that will produce no results
pub struct EmptyRecordBatchStream {
/// Schema wrapped by Arc
schema: SchemaRef,
}

impl EmptyRecordBatchStream {
/// Create an empty RecordBatchStream
pub fn new(schema: SchemaRef) -> Self {
Self { schema }
}
}

impl RecordBatchStream for EmptyRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

impl Stream for EmptyRecordBatchStream {
type Item = Result<RecordBatch>;

fn poll_next(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}
}

/// Stream wrapper that records `BaselineMetrics` for a particular
/// `[SendableRecordBatchStream]` (likely a partition)
pub(crate) struct ObservedStream {
Expand Down Expand Up @@ -326,9 +357,9 @@ impl futures::Stream for ObservedStream {
type Item = Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = self.inner.poll_next_unpin(cx);
self.baseline_metrics.record_poll(poll)
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ name = "datafusion_execution"
path = "src/lib.rs"

[dependencies]
arrow = { workspace = true }
dashmap = "5.4.0"
datafusion-common = { path = "../common", version = "28.0.0" }
datafusion-expr = { path = "../expr", version = "28.0.0" }
futures = "0.3"
hashbrown = { version = "0.14", features = ["raw"] }
log = "^0.4"
object_store = "0.6.1"
Expand Down
2 changes: 2 additions & 0 deletions datafusion/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ pub mod memory_pool;
pub mod object_store;
pub mod registry;
pub mod runtime_env;
mod stream;
mod task;

pub use disk_manager::DiskManager;
pub use registry::FunctionRegistry;
pub use stream::{RecordBatchStream, SendableRecordBatchStream};
pub use task::TaskContext;
33 changes: 33 additions & 0 deletions datafusion/execution/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::Result;
use futures::Stream;
use std::pin::Pin;

/// Trait for types that stream [arrow::record_batch::RecordBatch]
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
/// Returns the schema of this `RecordBatchStream`.
///
/// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this
/// stream should have the same schema as returned from this method.
fn schema(&self) -> SchemaRef;
}

/// Trait for a [`Stream`](futures::stream::Stream) of [`RecordBatch`]es
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;

0 comments on commit ea3dc96

Please sign in to comment.