Skip to content

Commit

Permalink
refactor: streamline system.temp_files table scan (#16659)
Browse files Browse the repository at this point in the history
* refactor: streamline system.temp_files result set

Changed the implementation to return the result set as a stream instead
of a single block, providing better responsiveness and memory usage.

* chore: remove old implementation

* fix: incorrect finish processing

should not stop processing if only step_limit is reached

* refactor: refine using stream chunk

* refactor: extract ListerStreamSourceBuilder

* chore: rename methods

---------

Co-authored-by: Bohu <[email protected]>
  • Loading branch information
dantengsky and BohuTANG authored Oct 28, 2024
1 parent 134976a commit 6f7ede7
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 65 deletions.
1 change: 1 addition & 0 deletions src/query/storages/system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ mod user_functions_table;
mod users_table;
mod util;
mod virtual_columns_table;

pub use background_jobs_table::BackgroundJobTable;
pub use background_tasks_table::BackgroundTaskTable;
pub use backtrace_table::BacktraceTable;
Expand Down
300 changes: 235 additions & 65 deletions src/query/storages/system/src/temp_files_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::future::Future;
use std::sync::Arc;

use databend_common_catalog::plan::DataSourcePlan;
use databend_common_catalog::plan::PartStatistics;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::plan::PartitionsShuffleKind;
use databend_common_catalog::plan::PushDownInfo;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
Expand All @@ -27,99 +33,85 @@ use databend_common_expression::BlockEntry;
use databend_common_expression::DataBlock;
use databend_common_expression::FromData;
use databend_common_expression::Scalar;
use databend_common_expression::SendableDataBlockStream;
use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchemaRefExt;
use databend_common_expression::Value;
use databend_common_meta_app::schema::TableIdent;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_core::query_spill_prefix;
use databend_common_pipeline_core::Pipeline;
use databend_common_pipeline_sources::EmptySource;
use databend_common_pipeline_sources::StreamSource;
use databend_common_storage::DataOperator;
use futures::stream;
use futures::stream::Chunks;
use futures::stream::Take;
use futures::StreamExt;
use futures::TryStreamExt;
use opendal::operator_futures::FutureLister;
use opendal::Entry;
use opendal::Lister;
use opendal::Metakey;

use crate::table::AsyncOneBlockSystemTable;
use crate::table::AsyncSystemTable;
use crate::table::SystemTablePart;

pub struct TempFilesTable {
table_info: TableInfo,
}

#[async_trait::async_trait]
impl AsyncSystemTable for TempFilesTable {
const NAME: &'static str = "system.temp_files";
impl Table for TempFilesTable {
fn is_local(&self) -> bool {
// Follow the practice of `SyncOneBlockSystemTable::is_local`
false
}

fn as_any(&self) -> &dyn Any {
self
}

fn get_table_info(&self) -> &TableInfo {
&self.table_info
}

#[async_backtrace::framed]
async fn get_full_data(
async fn read_partitions(
&self,
ctx: Arc<dyn TableContext>,
push_downs: Option<PushDownInfo>,
) -> Result<DataBlock> {
let tenant = ctx.get_tenant();
let operator = DataOperator::instance().operator();

let mut temp_files_name: Vec<String> = vec![];
let mut temp_files_content_length = vec![];
let mut temp_files_last_modified = vec![];
_ctx: Arc<dyn TableContext>,
_push_downs: Option<PushDownInfo>,
_dry_run: bool,
) -> Result<(PartStatistics, Partitions)> {
Ok((
PartStatistics::default(),
Partitions::create(PartitionsShuffleKind::Seq, vec![Arc::new(Box::new(
SystemTablePart,
))]),
))
}

let location_prefix = format!("{}/", query_spill_prefix(tenant.tenant_name(), ""));
if let Ok(lister) = operator
.lister_with(&location_prefix)
.recursive(true)
.metakey(Metakey::LastModified | Metakey::ContentLength)
.await
{
let limit = push_downs.and_then(|x| x.limit).unwrap_or(usize::MAX);
let mut lister = lister.take(limit);

while let Some(entry) = lister.try_next().await? {
let metadata = entry.metadata();

if metadata.is_file() {
temp_files_name.push(
entry
.path()
.trim_start_matches(&location_prefix)
.to_string(),
);

temp_files_last_modified
.push(metadata.last_modified().map(|x| x.timestamp_micros()));
temp_files_content_length.push(metadata.content_length());
}
}
fn read_data(
&self,
ctx: Arc<dyn TableContext>,
plan: &DataSourcePlan,
pipeline: &mut Pipeline,
_put_cache: bool,
) -> Result<()> {
// avoid duplicate read in cluster mode.
if plan.parts.partitions.is_empty() {
pipeline.add_source(EmptySource::create, 1)?;
return Ok(());
}

let num_rows = temp_files_name.len();
let data_block = DataBlock::new(
vec![
BlockEntry::new(
DataType::String,
Value::Scalar(Scalar::String("Spill".to_string())),
),
BlockEntry::new(
DataType::String,
Value::Column(StringType::from_data(temp_files_name)),
),
BlockEntry::new(
DataType::Number(NumberDataType::UInt64),
Value::Column(NumberType::from_data(temp_files_content_length)),
),
BlockEntry::new(
DataType::Timestamp.wrap_nullable(),
Value::Column(TimestampType::from_opt_data(temp_files_last_modified)),
),
],
num_rows,
);
pipeline.add_source(
|output| TempFilesTable::create_source(ctx.clone(), output, plan.push_downs.clone()),
1,
)?;

Ok(data_block.consume_convert_to_full())
Ok(())
}
}

Expand Down Expand Up @@ -151,6 +143,184 @@ impl TempFilesTable {
..Default::default()
};

AsyncOneBlockSystemTable::create(Self { table_info })
Arc::new(Self { table_info })
}

pub fn create_source(
ctx: Arc<dyn TableContext>,
output: Arc<OutputPort>,
push_downs: Option<PushDownInfo>,
) -> Result<ProcessorPtr> {
let tenant = ctx.get_tenant();
let location_prefix = format!("{}/", query_spill_prefix(tenant.tenant_name(), ""));
let limit = push_downs.as_ref().and_then(|x| x.limit);

let operator = DataOperator::instance().operator();
let lister = operator
.lister_with(&location_prefix)
.recursive(true)
.metakey(Metakey::LastModified | Metakey::ContentLength);

let stream = {
let prefix = location_prefix.clone();
let mut counter = 0;
let ctx = ctx.clone();
let builder = ListerStreamSourceBuilder::with_lister_fut(lister);
builder
.limit_opt(limit)
.chunk_size(MAX_BATCH_SIZE)
.build(move |entries| {
counter += entries.len();
let block = Self::block_from_entries(&prefix, entries)?;
ctx.set_status_info(format!("{} entries processed", counter).as_str());
Ok(block)
})?
};

StreamSource::create(ctx, Some(stream), output)
}

fn build_block(
names: Vec<String>,
file_lens: Vec<u64>,
file_last_modifieds: Vec<Option<i64>>,
) -> DataBlock {
let row_number = names.len();
DataBlock::new(
vec![
BlockEntry::new(
DataType::String,
Value::Scalar(Scalar::String("Spill".to_string())),
),
BlockEntry::new(
DataType::String,
Value::Column(StringType::from_data(names)),
),
BlockEntry::new(
DataType::Number(NumberDataType::UInt64),
Value::Column(NumberType::from_data(file_lens)),
),
BlockEntry::new(
DataType::Timestamp.wrap_nullable(),
Value::Column(TimestampType::from_opt_data(file_last_modifieds)),
),
],
row_number,
)
}

fn block_from_entries(location_prefix: &str, entries: Vec<Entry>) -> Result<DataBlock> {
let num_items = entries.len();
let mut temp_files_name: Vec<String> = Vec::with_capacity(num_items);
let mut temp_files_content_length = Vec::with_capacity(num_items);
let mut temp_files_last_modified = Vec::with_capacity(num_items);
for entry in entries {
let metadata = entry.metadata();
if metadata.is_file() {
temp_files_name.push(entry.path().trim_start_matches(location_prefix).to_string());

temp_files_last_modified
.push(metadata.last_modified().map(|x| x.timestamp_micros()));
temp_files_content_length.push(metadata.content_length());
}
}

let data_block = TempFilesTable::build_block(
temp_files_name,
temp_files_content_length,
temp_files_last_modified,
);
Ok(data_block)
}
}

const MAX_BATCH_SIZE: usize = 1000;

pub struct ListerStreamSourceBuilder<T>
where T: Future<Output = opendal::Result<Lister>> + Send + 'static
{
lister_fut: FutureLister<T>,
limit: Option<usize>,
chunk_size: usize,
}

impl<T> ListerStreamSourceBuilder<T>
where T: Future<Output = opendal::Result<Lister>> + Send + 'static
{
pub fn with_lister_fut(lister_fut: FutureLister<T>) -> Self {
Self {
lister_fut,
limit: None,
chunk_size: MAX_BATCH_SIZE,
}
}

pub fn limit(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}

pub fn limit_opt(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}

pub fn chunk_size(mut self, chunk_size: usize) -> Self {
self.chunk_size = chunk_size;
self
}

pub fn build(
self,
block_builder: (impl FnMut(Vec<Entry>) -> Result<DataBlock> + Sync + Send + 'static),
) -> Result<SendableDataBlockStream> {
stream_source_from_entry_lister_with_chunk_size(
self.lister_fut,
self.limit,
self.chunk_size,
block_builder,
)
}
}

fn stream_source_from_entry_lister_with_chunk_size<T>(
lister_fut: FutureLister<T>,
limit: Option<usize>,
chunk_size: usize,
block_builder: (impl FnMut(Vec<Entry>) -> Result<DataBlock> + Sync + Send + 'static),
) -> Result<SendableDataBlockStream>
where
T: Future<Output = opendal::Result<Lister>> + Send + 'static,
{
enum ListerState<U: Future<Output = opendal::Result<Lister>> + Send + 'static> {
Uninitialized(FutureLister<U>),
Initialized(Chunks<Take<Lister>>),
}

let state = ListerState::<T>::Uninitialized(lister_fut);

let stream = stream::try_unfold(
(state, block_builder),
move |(mut state, mut builder)| async move {
let mut lister = {
match state {
ListerState::Uninitialized(fut) => {
let lister = fut.await?;
lister.take(limit.unwrap_or(usize::MAX)).chunks(chunk_size)
}
ListerState::Initialized(l) => l,
}
};
if let Some(entries) = lister.next().await {
let entries = entries.into_iter().collect::<opendal::Result<Vec<_>>>()?;
let data_block = builder(entries)?;
state = ListerState::Initialized(lister);
Ok(Some((data_block, (state, builder))))
} else {
Ok(None)
}
},
);

Ok(stream.boxed())
}

0 comments on commit 6f7ede7

Please sign in to comment.