From 559dfce20e40ed46577170efca02e5dd292e7cdf Mon Sep 17 00:00:00 2001 From: Draco Date: Fri, 20 Sep 2024 14:19:21 +0800 Subject: [PATCH] fix: logs might be missed during RegionBased replay in the WAL based on local disk (#1570) ## Rationale In RegionBased replay, a batch of logs is first scanned from the WAL, and then replayed on various tables using multiple threads. This approach works fine for WALs based on tables, as the logs for each table are clustered together. However, in a WAL based on local disk, the logs for each table may be scattered across different positions within the batch. During multi-threaded replay, it is possible that for a given table, log2 is replayed before log1, resulting in missed logs. ## Detailed Changes 1. Modify `split_log_batch_by_table` function to aggregate all logs for a table together. 2. Modify `tableBatch` struct to change a single range into a `Vec`. ## Test Plan Manual testing. --- .../src/instance/wal_replayer.rs | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/src/analytic_engine/src/instance/wal_replayer.rs b/src/analytic_engine/src/instance/wal_replayer.rs index 251797bfd0..f782895145 100644 --- a/src/analytic_engine/src/instance/wal_replayer.rs +++ b/src/analytic_engine/src/instance/wal_replayer.rs @@ -395,6 +395,11 @@ impl RegionBasedReplay { if failed_tables.contains_key(&table_batch.table_id) { continue; } + let log_entries: Vec<_> = table_batch + .ranges + .iter() + .flat_map(|range| log_batch.range(range.clone())) + .collect(); let serial_exec_ctxs = serial_exec_ctxs.clone(); replay_tasks.push(async move { @@ -405,7 +410,7 @@ impl RegionBasedReplay { context.max_retry_flush_limit, &mut ctx.serial_exec, &ctx.table_data, - log_batch.range(table_batch.range), + log_entries.into_iter(), ) .await; (table_batch.table_id, Some(result)) @@ -439,13 +444,14 @@ impl RegionBasedReplay { // Split log batch by table id, for example: // input batch: - // |1|1|2|2|2|3|3|3|3| + // |1|1|2|2|2|3|3|3|3|1|1| // // output batches: - // |1|1|, |2|2|2|, |3|3|3|3| + // |1|1|1|1|, |2|2|2|, |3|3|3|3| let mut start_log_idx = 0usize; let mut curr_log_idx = 0usize; let mut start_table_id = log_batch.get(start_log_idx).unwrap().table_id; + let mut table_ranges = HashMap::new(); loop { let time_to_break = curr_log_idx == log_batch.len(); let found_end_idx = if time_to_break { @@ -456,10 +462,10 @@ impl RegionBasedReplay { }; if found_end_idx { - table_batches.push(TableBatch { - table_id: TableId::new(start_table_id), - range: start_log_idx..curr_log_idx, - }); + table_ranges + .entry(TableId::new(start_table_id)) + .or_insert(Vec::new()) + .push(start_log_idx..curr_log_idx); // Step to next start idx. start_log_idx = curr_log_idx; @@ -476,13 +482,16 @@ impl RegionBasedReplay { } curr_log_idx += 1; } + for (table_id, ranges) in table_ranges { + table_batches.push(TableBatch { table_id, ranges }); + } } } #[derive(Debug, Eq, PartialEq)] struct TableBatch { table_id: TableId, - range: Range, + ranges: Vec>, } struct SerialExecContext<'a> { @@ -622,6 +631,7 @@ mod tests { } } + #[allow(clippy::single_range_in_vec_init)] fn test_set() -> Vec<(VecDeque>, Vec)> { let test_log_batch1: VecDeque> = VecDeque::from([ LogEntry { @@ -658,15 +668,15 @@ mod tests { let expected1 = vec![ TableBatch { table_id: TableId::new(0), - range: 0..3, + ranges: vec![0..3], }, TableBatch { table_id: TableId::new(1), - range: 3..5, + ranges: vec![3..5], }, TableBatch { table_id: TableId::new(2), - range: 5..6, + ranges: vec![5..6], }, ]; @@ -677,7 +687,7 @@ mod tests { }]); let expected2 = vec![TableBatch { table_id: TableId::new(0), - range: 0..1, + ranges: vec![0..1], }]; let test_log_batch3: VecDeque> = VecDeque::default(); @@ -693,6 +703,8 @@ mod tests { fn check_split_result(batch: &VecDeque>, expected: &[TableBatch]) { let mut table_batches = Vec::new(); RegionBasedReplay::split_log_batch_by_table(batch, &mut table_batches); + // split_log_batch_by_table returns unordered results, so sort it here. + table_batches.sort_by_key(|tb| tb.table_id); assert_eq!(&table_batches, expected); } }