Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds RegionScanner trait #3948

Merged
merged 16 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev
datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-physical-plan = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
derive_builder = "0.12"
Expand Down
5 changes: 2 additions & 3 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use common_function::function::FunctionRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::prelude::ScalarUdf;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use common_runtime::Runtime;
use query::dataframe::DataFrame;
use query::plan::LogicalPlan;
Expand All @@ -32,7 +31,7 @@ use query::query_engine::DescribeResult;
use query::{QueryEngine, QueryEngineContext};
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use table::TableRef;
Expand Down Expand Up @@ -193,7 +192,7 @@ impl RegionEngine for MockRegionEngine {
&self,
_region_id: RegionId,
_request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
) -> Result<RegionScannerRef, BoxedError> {
unimplemented!()
}

Expand Down
30 changes: 21 additions & 9 deletions src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use common_telemetry::{error, info};
use object_store::ObjectStore;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_engine::{
RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse, SinglePartitionScanner,
};
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
RegionRequest,
Expand All @@ -49,6 +51,20 @@ impl FileRegionEngine {
inner: Arc::new(EngineInner::new(object_store)),
}
}

async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.inner
.get_region(region_id)
.await
.context(RegionNotFoundSnafu { region_id })
.map_err(BoxedError::new)?
.query(request)
.map_err(BoxedError::new)
}
}

#[async_trait]
Expand All @@ -72,14 +88,10 @@ impl RegionEngine for FileRegionEngine {
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.inner
.get_region(region_id)
.await
.context(RegionNotFoundSnafu { region_id })
.map_err(BoxedError::new)?
.query(request)
.map_err(BoxedError::new)
) -> Result<RegionScannerRef, BoxedError> {
let stream = self.handle_query(region_id, request).await?;
let scanner = Arc::new(SinglePartitionScanner::new(stream));
Ok(scanner)
}

async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
Expand Down
26 changes: 19 additions & 7 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use common_recordbatch::SendableRecordBatchStream;
use mito2::engine::MitoEngine;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_engine::{
RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse, SinglePartitionScanner,
};
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};

Expand Down Expand Up @@ -155,16 +157,14 @@ impl RegionEngine for MetricEngine {
})
}

/// Handles substrait query and return a stream of record batches
async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.inner
.read_region(region_id, request)
.await
.map_err(BoxedError::new)
) -> Result<RegionScannerRef, BoxedError> {
let stream = self.handle_query(region_id, request).await?;
let scanner = Arc::new(SinglePartitionScanner::new(stream));
Ok(scanner)
}

/// Retrieves region's metadata.
Expand Down Expand Up @@ -251,6 +251,18 @@ impl MetricEngine {
.logical_regions(physical_region_id)
.await
}

/// Handles substrait query and return a stream of record batches
async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.inner
.read_region(region_id, request)
.await
.map_err(BoxedError::new)
}
}

struct MetricEngineInner {
Expand Down
4 changes: 2 additions & 2 deletions src/metric-engine/src/engine/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl MetricEngineInner {
.start_timer();

self.mito
.handle_query(region_id, request)
.scan_to_stream(region_id, request)
.await
.context(MitoReadOperationSnafu)
}
Expand All @@ -82,7 +82,7 @@ impl MetricEngineInner {
.transform_request(physical_region_id, logical_region_id, request)
.await?;
self.mito
.handle_query(data_region_id, request)
.scan_to_stream(data_region_id, request)
.await
.context(MitoReadOperationSnafu)
}
Expand Down
8 changes: 4 additions & 4 deletions src/metric-engine/src/metadata_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl MetadataRegion {
let scan_req = Self::build_read_request(key);
let record_batch_stream = self
.mito
.handle_query(region_id, scan_req)
.scan_to_stream(region_id, scan_req)
.await
.context(MitoReadOperationSnafu)?;
let scan_result = collect(record_batch_stream)
Expand All @@ -317,7 +317,7 @@ impl MetadataRegion {
let scan_req = Self::build_read_request(key);
let record_batch_stream = self
.mito
.handle_query(region_id, scan_req)
.scan_to_stream(region_id, scan_req)
.await
.context(MitoReadOperationSnafu)?;
let scan_result = collect(record_batch_stream)
Expand Down Expand Up @@ -351,7 +351,7 @@ impl MetadataRegion {
};
let record_batch_stream = self
.mito
.handle_query(region_id, scan_req)
.scan_to_stream(region_id, scan_req)
.await
.context(MitoReadOperationSnafu)?;
let scan_result = collect(record_batch_stream)
Expand Down Expand Up @@ -590,7 +590,7 @@ mod test {
let scan_req = MetadataRegion::build_read_request("test_key");
let record_batch_stream = metadata_region
.mito
.handle_query(region_id, scan_req)
.scan_to_stream(region_id, scan_req)
.await
.unwrap();
let scan_result = collect(record_batch_stream).await.unwrap();
Expand Down
33 changes: 27 additions & 6 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::oneshot;
Expand Down Expand Up @@ -115,11 +115,35 @@ impl MitoEngine {
Ok(region.region_usage().await)
}

/// Handle substrait query and return a stream of record batches
#[tracing::instrument(skip_all)]
pub async fn scan_to_stream(
&self,
region_id: RegionId,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
self.scanner(region_id, request)
.map_err(BoxedError::new)?
.scan()
.await
.map_err(BoxedError::new)
}

/// Returns a scanner to scan for `request`.
fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
self.scan_region(region_id, request)?.scanner()
}

/// Returns a region scanner to scan the region for `request`.
async fn region_scanner(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<RegionScannerRef> {
let scanner = self.scanner(region_id, request)?;
scanner.region_scanner().await
}

/// Scans a region.
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
self.inner.handle_query(region_id, request)
Expand Down Expand Up @@ -312,16 +336,13 @@ impl RegionEngine for MitoEngine {
.map_err(BoxedError::new)
}

/// Handle substrait query and return a stream of record batches
#[tracing::instrument(skip_all)]
async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
self.scanner(region_id, request)
.map_err(BoxedError::new)?
.scan()
) -> Result<RegionScannerRef, BoxedError> {
self.region_scanner(region_id, request)
.await
.map_err(BoxedError::new)
}
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/engine/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ async fn test_put_after_alter() {
| | b | 2.0 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------------------+";
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/engine/append_mode_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn test_append_mode_write_query() {
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
Expand Down Expand Up @@ -183,7 +183,7 @@ async fn test_append_mode_compaction() {
// Reopens the region.
reopen_region(&engine, region_id, region_dir, false, region_opts).await;
let stream = engine
.handle_query(region_id, ScanRequest::default())
.scan_to_stream(region_id, ScanRequest::default())
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down
18 changes: 9 additions & 9 deletions src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ async fn test_region_replay() {
assert_eq!(0, result.affected_rows);

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(42, batches.iter().map(|b| b.num_rows()).sum::<usize>());

Expand Down Expand Up @@ -166,7 +166,7 @@ async fn test_write_query_region() {
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
Expand Down Expand Up @@ -227,7 +227,7 @@ async fn test_different_order() {
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+-------+---------+---------+---------------------+
Expand Down Expand Up @@ -289,7 +289,7 @@ async fn test_different_order_and_type() {
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+-------+---------+---------+---------------------+
Expand Down Expand Up @@ -341,7 +341,7 @@ async fn test_put_delete() {
delete_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
Expand Down Expand Up @@ -383,7 +383,7 @@ async fn test_delete_not_null_fields() {
delete_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
Expand All @@ -398,7 +398,7 @@ async fn test_delete_not_null_fields() {
// Reopen and scan again.
reopen_region(&engine, region_id, region_dir, false, HashMap::new()).await;
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}
Expand Down Expand Up @@ -447,7 +447,7 @@ async fn test_put_overwrite() {
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
Expand Down Expand Up @@ -688,7 +688,7 @@ async fn test_cache_null_primary_key() {
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+-------+---------+---------------------+
Expand Down
6 changes: 3 additions & 3 deletions src/mito2/src/engine/catchup_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async fn test_catchup_with_last_entry_id() {
// Scans
let request = ScanRequest::default();
let stream = follower_engine
.handle_query(region_id, request)
.scan_to_stream(region_id, request)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down Expand Up @@ -264,7 +264,7 @@ async fn test_catchup_without_last_entry_id() {

let request = ScanRequest::default();
let stream = follower_engine
.handle_query(region_id, request)
.scan_to_stream(region_id, request)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down Expand Up @@ -367,7 +367,7 @@ async fn test_catchup_with_manifest_update() {

let request = ScanRequest::default();
let stream = follower_engine
.handle_query(region_id, request)
.scan_to_stream(region_id, request)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down
Loading