Skip to content

Commit

Permalink
feat: replay logs of different tables in parallel (#1492)
Browse files Browse the repository at this point in the history
## Rationale

Related with #1466

## Detailed Changes
Replay logs of different tables in parallel

## Test Plan
CI

---------

Co-authored-by: jiacai2050 <[email protected]>
  • Loading branch information
Lethannn and jiacai2050 authored Mar 11, 2024
1 parent 9f166f3 commit 66d7a0d
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 36 deletions.
35 changes: 22 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 30 additions & 23 deletions src/analytic_engine/src/instance/wal_replayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ use common_types::{
schema::{IndexInWriterSchema, Schema},
table::ShardId,
};
use futures::StreamExt;
use generic_error::BoxError;
use lazy_static::lazy_static;
use logger::{debug, error, info, trace, warn};
use prometheus::{exponential_buckets, register_histogram, Histogram};
use snafu::ResultExt;
use table_engine::table::TableId;
use tokio::sync::MutexGuard;
use tokio::sync::{Mutex, MutexGuard};
use wal::{
log_batch::LogEntry,
manager::{
Expand Down Expand Up @@ -335,6 +336,7 @@ impl RegionBasedReplay {
let schema_provider = TableSchemaProviderAdapter {
table_datas: table_datas_by_id.clone(),
};
let serial_exec_ctxs = Arc::new(Mutex::new(serial_exec_ctxs));
// Split and replay logs.
loop {
let _timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer();
Expand All @@ -352,13 +354,8 @@ impl RegionBasedReplay {
}

let _timer = APPLY_LOGS_DURATION_HISTOGRAM.start_timer();
Self::replay_single_batch(
context,
&log_entry_buf,
&mut serial_exec_ctxs,
failed_tables,
)
.await?;
Self::replay_single_batch(context, &log_entry_buf, &serial_exec_ctxs, failed_tables)
.await?;
}

Ok(())
Expand All @@ -367,36 +364,46 @@ impl RegionBasedReplay {
async fn replay_single_batch(
context: &ReplayContext,
log_batch: &VecDeque<LogEntry<ReadPayload>>,
serial_exec_ctxs: &mut HashMap<TableId, SerialExecContext<'_>>,
serial_exec_ctxs: &Arc<Mutex<HashMap<TableId, SerialExecContext<'_>>>>,
failed_tables: &mut FailedTables,
) -> Result<()> {
let mut table_batches = Vec::new();
// TODO: No `group_by` method in `VecDeque`, so implement it manually here...
Self::split_log_batch_by_table(log_batch, &mut table_batches);

// TODO: Replay logs of different tables in parallel.
let mut replay_tasks = Vec::with_capacity(table_batches.len());
for table_batch in table_batches {
// Some tables may have failed in previous replay, ignore them.
if failed_tables.contains_key(&table_batch.table_id) {
continue;
}

// Replay all log entries of current table.
// Some tables may have been moved to other shards or dropped, ignore such logs.
if let Some(ctx) = serial_exec_ctxs.get_mut(&table_batch.table_id) {
let result = replay_table_log_entries(
&context.flusher,
context.max_retry_flush_limit,
&mut ctx.serial_exec,
&ctx.table_data,
log_batch.range(table_batch.range),
)
.await;
let serial_exec_ctxs = serial_exec_ctxs.clone();
replay_tasks.push(async move {
// Some tables may have been moved to other shards or dropped, ignore such logs.
if let Some(ctx) = serial_exec_ctxs.lock().await.get_mut(&table_batch.table_id) {
let result = replay_table_log_entries(
&context.flusher,
context.max_retry_flush_limit,
&mut ctx.serial_exec,
&ctx.table_data,
log_batch.range(table_batch.range),
)
.await;
(table_batch.table_id, Some(result))
} else {
(table_batch.table_id, None)
}
});
}

// Run at most 20 tasks in parallel
let mut replay_tasks = futures::stream::iter(replay_tasks).buffer_unordered(20);
while let Some((table_id, ret)) = replay_tasks.next().await {
if let Some(Err(e)) = ret {
// If occur error, mark this table as failed and store the cause.
if let Err(e) = result {
failed_tables.insert(table_batch.table_id, e);
}
failed_tables.insert(table_id, e);
}
}

Expand Down

0 comments on commit 66d7a0d

Please sign in to comment.