Skip to content

Commit

Permalink
refactor: remove handle_query from the RegionEngine trait
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed May 15, 2024
1 parent 5d7416b commit 04a15c2
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 65 deletions.
9 changes: 0 additions & 9 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use common_function::function::FunctionRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::prelude::ScalarUdf;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use common_runtime::Runtime;
use query::dataframe::DataFrame;
use query::plan::LogicalPlan;
Expand Down Expand Up @@ -181,14 +180,6 @@ impl RegionEngine for MockRegionEngine {
Ok(RegionResponse::new(0))
}

async fn handle_query(
&self,
_region_id: RegionId,
_request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
unimplemented!()
}

async fn handle_partitioned_query(
&self,
_region_id: RegionId,
Expand Down
28 changes: 14 additions & 14 deletions src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ impl FileRegionEngine {
inner: Arc::new(EngineInner::new(object_store)),
}
}

async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.inner
.get_region(region_id)
.await
.context(RegionNotFoundSnafu { region_id })
.map_err(BoxedError::new)?
.query(request)
.map_err(BoxedError::new)
}
}

#[async_trait]
Expand All @@ -70,20 +84,6 @@ impl RegionEngine for FileRegionEngine {
.map_err(BoxedError::new)
}

async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.inner
.get_region(region_id)
.await
.context(RegionNotFoundSnafu { region_id })
.map_err(BoxedError::new)?
.query(request)
.map_err(BoxedError::new)
}

async fn handle_partitioned_query(
&self,
region_id: RegionId,
Expand Down
24 changes: 12 additions & 12 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,18 +157,6 @@ impl RegionEngine for MetricEngine {
})
}

/// Handles substrait query and return a stream of record batches
async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.inner
.read_region(region_id, request)
.await
.map_err(BoxedError::new)
}

async fn handle_partitioned_query(
&self,
region_id: RegionId,
Expand Down Expand Up @@ -263,6 +251,18 @@ impl MetricEngine {
.logical_regions(physical_region_id)
.await
}

/// Handles substrait query and return a stream of record batches
async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.inner
.read_region(region_id, request)
.await
.map_err(BoxedError::new)
}
}

struct MetricEngineInner {
Expand Down
28 changes: 14 additions & 14 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,20 @@ impl MitoEngine {
Ok(region.region_usage().await)
}

/// Handle substrait query and return a stream of record batches
#[tracing::instrument(skip_all)]
pub async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
self.scanner(region_id, request)
.map_err(BoxedError::new)?
.scan()
.await
.map_err(BoxedError::new)
}

/// Returns a scanner to scan for `request`.
fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
self.scan_region(region_id, request)?.scanner()
Expand Down Expand Up @@ -322,20 +336,6 @@ impl RegionEngine for MitoEngine {
.map_err(BoxedError::new)
}

/// Handle substrait query and return a stream of record batches
#[tracing::instrument(skip_all)]
async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
self.scanner(region_id, request)
.map_err(BoxedError::new)?
.scan()
.await
.map_err(BoxedError::new)
}

#[tracing::instrument(skip_all)]
async fn handle_partitioned_query(
&self,
Expand Down
9 changes: 0 additions & 9 deletions src/query/src/optimizer/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use api::v1::SemanticType;
use async_trait::async_trait;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::ColumnSchema;
use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
Expand Down Expand Up @@ -63,14 +62,6 @@ impl RegionEngine for MetaRegionEngine {
unimplemented!()
}

async fn handle_query(
&self,
_region_id: RegionId,
_request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
unimplemented!()
}

async fn handle_partitioned_query(
&self,
_region_id: RegionId,
Expand Down
7 changes: 0 additions & 7 deletions src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,6 @@ pub trait RegionEngine: Send + Sync {
request: RegionRequest,
) -> Result<RegionResponse, BoxedError>;

/// Handles substrait query and return a stream of record batches
async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError>;

/// Handles query and return a scanner that can be used to scan the region concurrently.
async fn handle_partitioned_query(
&self,
Expand Down

0 comments on commit 04a15c2

Please sign in to comment.