Skip to content

Commit

Permalink
fix: logs might be missed during RegionBased replay in the WAL based …
Browse files Browse the repository at this point in the history
…on local disk (#1570)

## Rationale
In RegionBased replay, a batch of logs is first scanned from the WAL,
and then replayed on various tables using multiple threads. This
approach works fine for WALs based on tables, as the logs for each table
are clustered together. However, in a WAL based on local disk, the logs
for each table may be scattered across different positions within the
batch. During multi-threaded replay, it is possible that for a given
table, log2 is replayed before log1, resulting in missed logs.

## Detailed Changes
1. Modify `split_log_batch_by_table` function to aggregate all logs for
a table together.
2. Modify `tableBatch` struct to change a single range into a
`Vec<Range>`.

## Test Plan
Manual testing.
  • Loading branch information
dracoooooo authored Sep 20, 2024
1 parent 42baf2e commit 559dfce
Showing 1 changed file with 24 additions and 12 deletions.
36 changes: 24 additions & 12 deletions src/analytic_engine/src/instance/wal_replayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,11 @@ impl RegionBasedReplay {
if failed_tables.contains_key(&table_batch.table_id) {
continue;
}
let log_entries: Vec<_> = table_batch
.ranges
.iter()
.flat_map(|range| log_batch.range(range.clone()))
.collect();

let serial_exec_ctxs = serial_exec_ctxs.clone();
replay_tasks.push(async move {
Expand All @@ -405,7 +410,7 @@ impl RegionBasedReplay {
context.max_retry_flush_limit,
&mut ctx.serial_exec,
&ctx.table_data,
log_batch.range(table_batch.range),
log_entries.into_iter(),
)
.await;
(table_batch.table_id, Some(result))
Expand Down Expand Up @@ -439,13 +444,14 @@ impl RegionBasedReplay {

// Split log batch by table id, for example:
// input batch:
// |1|1|2|2|2|3|3|3|3|
// |1|1|2|2|2|3|3|3|3|1|1|
//
// output batches:
// |1|1|, |2|2|2|, |3|3|3|3|
// |1|1|1|1|, |2|2|2|, |3|3|3|3|
let mut start_log_idx = 0usize;
let mut curr_log_idx = 0usize;
let mut start_table_id = log_batch.get(start_log_idx).unwrap().table_id;
let mut table_ranges = HashMap::new();
loop {
let time_to_break = curr_log_idx == log_batch.len();
let found_end_idx = if time_to_break {
Expand All @@ -456,10 +462,10 @@ impl RegionBasedReplay {
};

if found_end_idx {
table_batches.push(TableBatch {
table_id: TableId::new(start_table_id),
range: start_log_idx..curr_log_idx,
});
table_ranges
.entry(TableId::new(start_table_id))
.or_insert(Vec::new())
.push(start_log_idx..curr_log_idx);

// Step to next start idx.
start_log_idx = curr_log_idx;
Expand All @@ -476,13 +482,16 @@ impl RegionBasedReplay {
}
curr_log_idx += 1;
}
for (table_id, ranges) in table_ranges {
table_batches.push(TableBatch { table_id, ranges });
}
}
}

#[derive(Debug, Eq, PartialEq)]
struct TableBatch {
table_id: TableId,
range: Range<usize>,
ranges: Vec<Range<usize>>,
}

struct SerialExecContext<'a> {
Expand Down Expand Up @@ -622,6 +631,7 @@ mod tests {
}
}

#[allow(clippy::single_range_in_vec_init)]
fn test_set() -> Vec<(VecDeque<LogEntry<u32>>, Vec<TableBatch>)> {
let test_log_batch1: VecDeque<LogEntry<u32>> = VecDeque::from([
LogEntry {
Expand Down Expand Up @@ -658,15 +668,15 @@ mod tests {
let expected1 = vec![
TableBatch {
table_id: TableId::new(0),
range: 0..3,
ranges: vec![0..3],
},
TableBatch {
table_id: TableId::new(1),
range: 3..5,
ranges: vec![3..5],
},
TableBatch {
table_id: TableId::new(2),
range: 5..6,
ranges: vec![5..6],
},
];

Expand All @@ -677,7 +687,7 @@ mod tests {
}]);
let expected2 = vec![TableBatch {
table_id: TableId::new(0),
range: 0..1,
ranges: vec![0..1],
}];

let test_log_batch3: VecDeque<LogEntry<u32>> = VecDeque::default();
Expand All @@ -693,6 +703,8 @@ mod tests {
fn check_split_result(batch: &VecDeque<LogEntry<u32>>, expected: &[TableBatch]) {
let mut table_batches = Vec::new();
RegionBasedReplay::split_log_batch_by_table(batch, &mut table_batches);
// split_log_batch_by_table returns unordered results, so sort it here.
table_batches.sort_by_key(|tb| tb.table_id);
assert_eq!(&table_batches, expected);
}
}

0 comments on commit 559dfce

Please sign in to comment.