diff --git a/Cargo.lock b/Cargo.lock index 94c22e7a6d..43d74e7e66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8219,6 +8219,7 @@ checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" name = "wal" version = "2.0.0" dependencies = [ + "anyhow", "async-trait", "bytes_ext", "chrono", diff --git a/src/analytic_engine/src/instance/write.rs b/src/analytic_engine/src/instance/write.rs index 95f2efce4e..8a007d5c98 100644 --- a/src/analytic_engine/src/instance/write.rs +++ b/src/analytic_engine/src/instance/write.rs @@ -531,21 +531,6 @@ impl<'a> Writer<'a> { e })?; - // When wal is disabled, there is no need to do this check. - if !self.instance.disable_wal { - // NOTE: Currently write wal will only increment seq by one, - // this may change in future. - let last_seq = table_data.last_sequence(); - if sequence != last_seq + 1 { - warn!( - "Sequence must be consecutive, table:{}, table_id:{}, last_sequence:{}, wal_sequence:{}", - table_data.name,table_data.id, - table_data.last_sequence(), - sequence - ); - } - } - debug!( "Instance write finished, update sequence, table:{}, table_id:{} last_sequence:{}", table_data.name, table_data.id, sequence diff --git a/src/wal/Cargo.toml b/src/wal/Cargo.toml index 0d13ef36a4..30a5b00461 100644 --- a/src/wal/Cargo.toml +++ b/src/wal/Cargo.toml @@ -47,6 +47,7 @@ name = "read_write" required-features = ["wal-message-queue", "wal-table-kv", "wal-rocksdb", "wal-local-storage"] [dependencies] +anyhow = { workspace = true } async-trait = { workspace = true } bytes_ext = { workspace = true } chrono = { workspace = true } diff --git a/src/wal/src/local_storage_impl/config.rs b/src/wal/src/local_storage_impl/config.rs index 8d018896e1..b3e70e9b5c 100644 --- a/src/wal/src/local_storage_impl/config.rs +++ b/src/wal/src/local_storage_impl/config.rs @@ -18,17 +18,18 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] pub struct LocalStorageConfig { - pub path: String, - pub max_segment_size: usize, + pub data_dir: String, + pub segment_size: usize, pub cache_size: usize, } impl Default for LocalStorageConfig { fn default() -> Self { Self { - path: "/tmp/horaedb".to_string(), - max_segment_size: 64 * 1024 * 1024, // 64MB + data_dir: "/tmp/horaedb".to_string(), + segment_size: 64 * 1024 * 1024, // 64MB cache_size: 3, } } diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 80b3718228..02bd1d13e0 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -16,6 +16,7 @@ // under the License. use std::{ + cmp::{max, min}, collections::{HashMap, VecDeque}, fmt::Debug, fs::{self, File, OpenOptions}, @@ -56,8 +57,19 @@ pub enum Error { #[snafu(display("Failed to map file to memory: {}", source))] Mmap { source: io::Error }, - #[snafu(display("Segment full"))] - SegmentFull, + #[snafu(display( + "Segment {} is full, current size: {}, segment size: {}, try to append: {}", + id, + current_size, + segment_size, + data_size + ))] + SegmentFull { + id: u64, + current_size: usize, + segment_size: usize, + data_size: usize, + }, #[snafu(display("Failed to append data to segment file: {}", source))] SegmentAppend { source: io::Error }, @@ -115,6 +127,9 @@ pub enum Error { source: GenericError, backtrace: Backtrace, }, + + #[snafu(display("{}", source))] + Internal { source: anyhow::Error }, } define_result!(Error); @@ -158,6 +173,10 @@ pub struct Segment { /// The maximum sequence number of records within this segment. max_seq: SequenceNumber, + /// A hashmap storing both min and max sequence numbers of records within + /// this segment for each `TableId`. + table_ranges: HashMap, + /// The encoding format used for records within this segment. record_encoding: RecordEncoding, @@ -187,15 +206,16 @@ impl Segment { version: NEWEST_WAL_SEGMENT_VERSION, path: path.clone(), id: segment_id, - current_size: SEGMENT_HEADER.len(), + current_size: VERSION_SIZE + SEGMENT_HEADER.len(), segment_size, min_seq: MAX_SEQUENCE_NUMBER, max_seq: MIN_SEQUENCE_NUMBER, + table_ranges: HashMap::new(), record_encoding: RecordEncoding::newest(), mmap: None, record_position: Vec::new(), write_count: 0, - last_flushed_position: SEGMENT_HEADER.len(), + last_flushed_position: VERSION_SIZE + SEGMENT_HEADER.len(), }; if !Path::new(&path).exists() { @@ -208,9 +228,12 @@ impl Segment { return Ok(segment); } - // Open the segment file to update min and max sequence number and file size + // Open the segment file segment.open()?; + // Restore meta info + segment.restore_meta()?; + // Close the segment file. If the segment is to be used for read or write, it // will be opened again segment.close()?; @@ -218,7 +241,64 @@ impl Segment { Ok(segment) } + fn restore_meta(&mut self) -> Result<()> { + // Ensure the segment file is open + let Some(mmap) = &mut self.mmap else { + return SegmentNotOpen { id: self.id }.fail(); + }; + let file_size = mmap.len(); + + // Read and validate all records + let mut pos = VERSION_SIZE + SEGMENT_HEADER.len(); + let mut record_position = Vec::new(); + + self.table_ranges.clear(); + + // Scan all records in the segment + while pos < file_size { + let data = &mmap[pos..]; + + match self.record_encoding.decode(data).box_err() { + Ok(record) => { + record_position.push(Position { + start: pos, + end: pos + record.len(), + }); + + // Update max sequence number + self.min_seq = min(self.min_seq, record.sequence_num); + self.max_seq = max(self.max_seq, record.sequence_num); + + // Update table_ranges + self.table_ranges + .entry(record.table_id) + .and_modify(|seq_range| { + seq_range.0 = min(seq_range.0, record.sequence_num); + seq_range.1 = max(seq_range.1, record.sequence_num); + }) + .or_insert((record.sequence_num, record.sequence_num)); + + pos += record.len(); + } + Err(_) => { + // If decoding fails, we've reached the end of valid data + // TODO: too tricky, refactor later + break; + } + } + } + + self.segment_size = file_size; + self.record_position = record_position; + self.current_size = pos; + self.write_count = 0; + self.last_flushed_position = pos; + Ok(()) + } + pub fn open(&mut self) -> Result<()> { + assert!(self.mmap.is_none()); + // Open the segment file let file = OpenOptions::new() .read(true) @@ -245,50 +325,19 @@ impl Segment { let header = &mmap[VERSION_SIZE..VERSION_SIZE + header_len]; ensure!(header == SEGMENT_HEADER, InvalidHeader); - // Read and validate all records - let mut pos = VERSION_SIZE + header_len; - let mut record_position = Vec::new(); - - // Update min and max sequence number - let mut min_seq = MAX_SEQUENCE_NUMBER; - let mut max_seq = MIN_SEQUENCE_NUMBER; - - while pos < file_size as usize { - let data = &mmap[pos..]; - - match self.record_encoding.decode(data).box_err() { - Ok(record) => { - record_position.push(Position { - start: pos, - end: pos + record.len(), - }); - min_seq = min_seq.min(record.sequence_num); - max_seq = max_seq.max(record.sequence_num); - pos += record.len(); - } - Err(_) => { - // If decoding fails, we've reached the end of valid data - // TODO: too tricky, refactor later - break; - } - } - } - self.mmap = Some(mmap); - self.record_position = record_position; - self.current_size = pos; - self.write_count = 0; - self.last_flushed_position = pos; - self.min_seq = min_seq; - self.max_seq = max_seq; + Ok(()) } pub fn close(&mut self) -> Result<()> { if let Some(ref mut mmap) = self.mmap { // Flush before closing - mmap.flush_range(self.last_flushed_position, self.current_size) - .context(Flush)?; + mmap.flush_range( + self.last_flushed_position, + self.current_size - self.last_flushed_position, + ) + .context(Flush)?; // Reset the write count self.write_count = 0; // Update the last flushed position @@ -298,15 +347,16 @@ impl Segment { Ok(()) } - pub fn is_open(&self) -> bool { - self.mmap.is_some() - } - /// Append a slice to the segment file. fn append(&mut self, data: &[u8]) -> Result<()> { ensure!( self.current_size + data.len() <= self.segment_size, - SegmentFull + SegmentFull { + id: self.id, + current_size: self.current_size, + segment_size: self.segment_size, + data_size: data.len() + } ); // Ensure the segment file is open @@ -324,8 +374,11 @@ impl Segment { // Only flush if the write_count reaches FLUSH_INTERVAL if self.write_count >= FLUSH_INTERVAL { - mmap.flush_range(self.last_flushed_position, self.current_size + data.len()) - .context(Flush)?; + mmap.flush_range( + self.last_flushed_position, + self.current_size + data.len() - self.last_flushed_position, + ) + .context(Flush)?; // Reset the write count self.write_count = 0; // Update the last flushed position @@ -360,6 +413,7 @@ impl Segment { &mut self, data: &[u8], positions: &mut Vec, + table_id: TableId, prev_sequence_num: SequenceNumber, next_sequence_num: SequenceNumber, ) -> Result<()> { @@ -370,9 +424,44 @@ impl Segment { self.record_position.append(positions); // Update min and max sequence number - self.min_seq = self.min_seq.min(prev_sequence_num); - self.max_seq = self.max_seq.max(next_sequence_num - 1); + self.min_seq = min(self.min_seq, prev_sequence_num); + self.max_seq = max(self.max_seq, next_sequence_num - 1); + + // Update sequence range + self.table_ranges + .entry(table_id) + .and_modify(|seq_range| { + seq_range.0 = min(seq_range.0, prev_sequence_num); + seq_range.1 = max(seq_range.1, next_sequence_num - 1); + }) + .or_insert((prev_sequence_num, next_sequence_num - 1)); + + Ok(()) + } + + fn mark_deleted(&mut self, table_id: TableId, sequence_num: SequenceNumber) { + if let Some(range) = self.table_ranges.get_mut(&table_id) { + // If sequence number is MAX, remove the range directly to prevent overflow + if sequence_num == MAX_SEQUENCE_NUMBER { + self.table_ranges.remove(&table_id); + return; + } + range.0 = max(range.0, sequence_num + 1); + if range.0 > range.1 { + self.table_ranges.remove(&table_id); + } + } + } + + fn is_empty(&self) -> bool { + self.table_ranges.is_empty() + } + fn delete(&mut self) -> Result<()> { + self.close()?; + fs::remove_file(&self.path) + .map_err(anyhow::Error::new) + .context(Internal)?; Ok(()) } } @@ -383,10 +472,13 @@ pub struct SegmentManager { all_segments: Mutex>>>, /// Cache for opened segments - cache: Mutex>, + cache: Mutex>)>>, /// Maximum size of the cache cache_size: usize, + + /// The latest segment for appending logs + current_segment: Mutex>>, } impl SegmentManager { @@ -396,63 +488,129 @@ impl SegmentManager { Ok(()) } - /// Obtain the target segment - fn get_segment(&self, segment_id: u64) -> Result>> { - let all_segments = self.all_segments.lock().unwrap(); - - let segment = all_segments.get(&segment_id); - - let segment = match segment { - Some(segment) => segment, - None => return SegmentNotFound { id: segment_id }.fail(), - }; - - Ok(segment.clone()) - } - /// Open segment if it is not in cache, need to acquire the lock outside - fn open_segment(&self, segment: &mut Segment) -> Result<()> { + /// otherwise the segment may get closed again. + fn open_segment(&self, guard: &mut Segment, segment: Arc>) -> Result<()> { + if guard.mmap.is_some() { + return Ok(()); + } + let mut cache = self.cache.lock().unwrap(); - // Check if segment is already in cache - if cache.iter().any(|id| *id == segment.id) { + let already_opened = cache.iter().any(|(id, _)| *id == guard.id); + if already_opened { return Ok(()); } - // If not in cache, load from disk - segment.open()?; + guard.open()?; - // Add to cache + // Try evicting the oldest segment if the cache is full if cache.len() == self.cache_size { - let evicted_segment_id = cache.pop_front(); - if let Some(evicted_segment_id) = evicted_segment_id { + let evicted_segment = cache.pop_front(); + if let Some((_, evicted_segment)) = evicted_segment { // The evicted segment should be closed first - let evicted_segment = self.get_segment(evicted_segment_id)?; let mut evicted_segment = evicted_segment.lock().unwrap(); evicted_segment.close()?; } } - cache.push_back(segment.id); + cache.push_back((guard.id, segment.clone())); + Ok(()) + } + pub fn close_all(&self) -> Result<()> { + { + let mut cache = self.cache.lock().unwrap(); + cache.clear(); + } + let all_segments = self.all_segments.lock().unwrap(); + for segment in all_segments.values() { + segment.lock().unwrap().close()?; + } Ok(()) } pub fn mark_delete_entries_up_to( &self, - _location: WalLocation, - _sequence_num: SequenceNumber, + location: WalLocation, + sequence_num: SequenceNumber, ) -> Result<()> { - todo!() + let current_segment_id = self.current_segment.lock().unwrap().lock().unwrap().id; + let mut all_segments = self.all_segments.lock().unwrap(); + let mut segments_to_remove = Vec::new(); + + for (_, segment) in all_segments.iter() { + let mut guard = segment.lock().unwrap(); + + guard.mark_deleted(location.table_id, sequence_num); + + // Delete this segment if it is empty and its id is less than the current + // segment + if guard.is_empty() && guard.id < current_segment_id { + let mut cache = self.cache.lock().unwrap(); + + // Check if segment is already in cache + if let Some(index) = cache.iter().position(|(id, _)| *id == guard.id) { + cache.remove(index); + } + + segments_to_remove.push((guard.id, segment.clone())); + } + } + + // Delete segments in all_segments + for (segment_id, _) in segments_to_remove.iter() { + all_segments.remove(segment_id); + } + + drop(all_segments); + + // Delete segments on disk + for (_, segment) in segments_to_remove.iter() { + segment.lock().unwrap().delete()?; + } + + Ok(()) } - pub fn close_all(&self) -> Result<()> { - let mut cache = self.cache.lock().unwrap(); - cache.clear(); + pub fn get_relevant_segments( + &self, + table_id: Option, + start: SequenceNumber, + end: SequenceNumber, + ) -> Result>>> { + // Find all segments that contain the requested sequence numbers + let mut relevant_segments = Vec::new(); + let all_segments = self.all_segments.lock().unwrap(); + for segment in all_segments.values() { - segment.lock().unwrap().close()?; + let guard = segment.lock().unwrap(); + match table_id { + Some(table_id) => { + if let Some(range) = guard.table_ranges.get(&table_id) { + if range.0 <= end && range.1 >= start { + relevant_segments.push((guard.id, segment.clone())); + } + } + } + None => { + if guard.min_seq <= end && guard.max_seq >= start { + relevant_segments.push((guard.id, segment.clone())); + } + } + } } - Ok(()) + + // Sort by segment id + relevant_segments.sort_by_key(|(id, _)| *id); + + // id is not needed, so remove it + let relevant_segments = relevant_segments + .into_iter() + .map(|(_, segment)| segment) + .collect(); + + Ok(relevant_segments) } } @@ -482,9 +640,6 @@ pub struct Region { /// Sequence number for the next log next_sequence_num: AtomicU64, - - /// The latest segment for appending logs - current_segment: Mutex>>, } impl Region { @@ -557,6 +712,7 @@ impl Region { all_segments: Mutex::new(all_segments), cache: Mutex::new(VecDeque::new()), cache_size, + current_segment: Mutex::new(latest_segment), }; Ok(Self { @@ -568,17 +724,29 @@ impl Region { region_dir, next_sequence_num: AtomicU64::new(next_sequence_num), runtime, - current_segment: Mutex::new(latest_segment), }) } + fn create_new_segment(&self, id: u64) -> Result>> { + // Create a new segment + let new_segment = Segment::new( + format!("{}/segment_{}.wal", self.region_dir, id), + id, + self.segment_size, + )?; + let new_segment = Arc::new(Mutex::new(new_segment)); + self.segment_manager.add_segment(id, new_segment.clone())?; + + Ok(new_segment) + } + pub fn write(&self, _ctx: &WriteContext, batch: &LogWriteBatch) -> Result { // In the WAL based on local storage, we need to ensure the sequence number in // segment is monotonically increasing. So we need to acquire a lock here. // Perhaps we could avoid acquiring the lock here and instead allocate the // position that needs to be written in the segment, then fill it within // spawn_blocking. However, I’m not sure about the correctness of this approach. - let mut current_segment = self.current_segment.lock().unwrap(); + let mut current_segment = self.segment_manager.current_segment.lock().unwrap(); let entries_num = batch.len() as u64; let table_id = batch.location.table_id; @@ -608,35 +776,23 @@ impl Region { next_sequence_num += 1; } - let guard = current_segment.lock().unwrap(); - // Check if the current segment has enough space for the new data // If not, create a new segment and update current_segment - if guard.current_size + data.len() > guard.segment_size { - let new_segment_id = guard.id + 1; - // We need to drop guard to allow the update of current_segment - drop(guard); - - // Create a new segment - let new_segment = Segment::new( - format!("{}/segment_{}.wal", self.region_dir, new_segment_id), - new_segment_id, - self.segment_size, - )?; - let new_segment = Arc::new(Mutex::new(new_segment)); - self.segment_manager - .add_segment(new_segment_id, new_segment.clone())?; + { + let guard = current_segment.lock().unwrap(); + if guard.current_size + data.len() > guard.segment_size { + let new_segment_id = guard.id + 1; + drop(guard); - // Update current segment - *current_segment = new_segment; - } else { - drop(guard); + *current_segment = self.create_new_segment(new_segment_id)?; + } } + // Open the segment if not opened let mut guard = current_segment.lock().unwrap(); + self.segment_manager + .open_segment(&mut guard, current_segment.clone())?; - // Open the segment if not opened - self.segment_manager.open_segment(&mut guard)?; for pos in record_position.iter_mut() { pos.start += guard.current_size; pos.end += guard.current_size; @@ -646,8 +802,9 @@ impl Region { guard.append_records( &data, &mut record_position, + table_id, prev_sequence_num, - next_sequence_num - 1, + next_sequence_num, )?; Ok(next_sequence_num - 1) } @@ -742,6 +899,8 @@ impl RegionManager { segment_size: usize, runtime: Arc, ) -> Result { + fs::create_dir_all(&root_dir).context(DirOpen)?; + let mut regions = HashMap::new(); // Naming conversion: / @@ -864,6 +1023,9 @@ struct SegmentLogIterator { /// Optional identifier for the table, which is used to filter logs. table_id: Option, + /// A hashmap of start and end sequence number for each table. + table_ranges: HashMap, + /// Starting sequence number for log iteration. start: SequenceNumber, @@ -887,17 +1049,12 @@ impl SegmentLogIterator { start: SequenceNumber, end: SequenceNumber, ) -> Result { + let mut guard = segment.lock().unwrap(); // Open the segment if it is not open - let mut segment = segment.lock().unwrap(); - if !segment.is_open() { - segment_manager.open_segment(&mut segment)?; - } - - // Read the entire content of the segment - let segment_content = segment.read(0, segment.current_size)?; - - // Get record positions - let record_positions = segment.record_position.clone(); + segment_manager.open_segment(&mut guard, segment.clone())?; + let segment_content = guard.read(0, guard.current_size)?; + let record_positions = guard.record_position.clone(); + let table_ranges = guard.table_ranges.clone(); Ok(Self { log_encoding, @@ -905,6 +1062,7 @@ impl SegmentLogIterator { segment_content, record_positions, table_id, + table_ranges, start, end, current_record_idx: 0, @@ -952,6 +1110,15 @@ impl SegmentLogIterator { } } + // Filter by sequence range + if let Some((start, end)) = &self.table_ranges.get(&record.table_id) { + if record.sequence_num < *start || record.sequence_num > *end { + continue; + } + } else { + continue; + } + // Decode the value let value = self .log_encoding @@ -975,7 +1142,7 @@ pub struct MultiSegmentLogIterator { segment_manager: Arc, /// All segments involved in this read operation. - segments: Vec, + segments: Vec>>, /// Current segment index. current_segment_idx: usize, @@ -1011,22 +1178,7 @@ impl MultiSegmentLogIterator { start: SequenceNumber, end: SequenceNumber, ) -> Result { - // Find all segments that contain the requested sequence numbers - let mut relevant_segments = Vec::new(); - - { - let all_segments = segment_manager.all_segments.lock().unwrap(); - - for (_, segment) in all_segments.iter() { - let segment = segment.lock().unwrap(); - if segment.min_seq <= end && segment.max_seq >= start { - relevant_segments.push(segment.id); - } - } - } - - // Sort by segment id - relevant_segments.sort_unstable(); + let relevant_segments = segment_manager.get_relevant_segments(table_id, start, end)?; let mut iter = Self { segment_manager, @@ -1053,8 +1205,7 @@ impl MultiSegmentLogIterator { return Ok(false); } - let segment = self.segments[self.current_segment_idx]; - let segment = self.segment_manager.get_segment(segment)?; + let segment = self.segments[self.current_segment_idx].clone(); let iterator = SegmentLogIterator::new( self.log_encoding.clone(), self.record_encoding.clone(), @@ -1129,7 +1280,7 @@ mod tests { assert_eq!(segment.version, NEWEST_WAL_SEGMENT_VERSION); assert_eq!(segment.path, path); assert_eq!(segment.id, 0); - assert_eq!(segment.current_size, SEGMENT_HEADER.len()); + assert_eq!(segment.current_size, SEGMENT_HEADER.len() + VERSION_SIZE); let segment_content = fs::read(path).unwrap(); assert_eq!(segment_content[0], NEWEST_WAL_SEGMENT_VERSION); @@ -1153,6 +1304,10 @@ mod tests { segment .open() .expect("Expected to open segment successfully"); + + segment + .restore_meta() + .expect("Expected to restore meta successfully"); } #[test] @@ -1166,6 +1321,7 @@ mod tests { .to_string(); let mut segment = Segment::new(path.clone(), 0, SEGMENT_SIZE).unwrap(); segment.open().unwrap(); + segment.restore_meta().unwrap(); let data = b"test_data"; let append_result = segment.append(data); @@ -1176,6 +1332,26 @@ mod tests { assert_eq!(read_result.unwrap(), data); } + #[test] + fn test_segment_delete() { + let dir = tempdir().unwrap(); + + let path = dir + .path() + .join("segment_0.wal") + .to_str() + .unwrap() + .to_string(); + + let mut segment = Segment::new(path.clone(), 0, SEGMENT_SIZE).unwrap(); + + segment.open().unwrap(); + assert!(Path::new(&path).exists()); + + segment.delete().unwrap(); + assert!(!Path::new(&path).exists()); + } + #[test] fn test_region_create_and_close() { let dir = tempdir().unwrap(); @@ -1190,8 +1366,6 @@ mod tests { ) .unwrap(); - let _segment = region.segment_manager.get_segment(0).unwrap(); - region.close().unwrap() } @@ -1300,4 +1474,70 @@ mod tests { let runtime = Arc::new(Builder::default().build().unwrap()); runtime.block_on(test_multi_segment_write_and_read_inner(runtime.clone())); } + + #[test] + fn test_region_mark_delete_entries_up_to() { + const SEGMENT_SIZE: usize = 4096; + + let dir = tempdir().unwrap(); + let runtime = Arc::new(Builder::default().build().unwrap()); + + // Create a new region + let region = Region::new( + 1, + 2, + SEGMENT_SIZE, + dir.path().to_str().unwrap().to_string(), + runtime.clone(), + ) + .unwrap(); + let region = Arc::new(region); + + // Write some log entries + let location = WalLocation::new(1, 1); // region_id = 1, table_id = 1 + let mut sequence = MIN_SEQUENCE_NUMBER + 1; + + for _i in 0..10 { + let log_entries = 0..100; + let log_batch_encoder = LogBatchEncoder::create(location); + let log_batch = log_batch_encoder + .encode_batch(log_entries.clone().map(|v| MemoryPayload { val: v })) + .expect("should succeed to encode payloads"); + + let write_ctx = WriteContext::default(); + let actual_sequence = region.write(&write_ctx, &log_batch).unwrap(); + assert_eq!(actual_sequence, sequence + 100 - 1); + sequence += 100; + } + + let latest_segment_id = { + let all_segments = region.segment_manager.all_segments.lock().unwrap(); + // Expect more than one segment + assert!( + all_segments.len() > 1, + "Expected multiple segments, but got {}", + all_segments.len() + ); + all_segments.keys().max().unwrap().to_owned() + }; + + // Mark delete entries up to sequence - 1, so only the last segment should + // remain + let mark_delete_sequence = sequence - 1; + region + .mark_delete_entries_up_to(location, mark_delete_sequence) + .unwrap(); + + { + let all_segments = region.segment_manager.all_segments.lock().unwrap(); + assert_eq!(all_segments.len(), 1); + assert!(all_segments.contains_key(&latest_segment_id)); + } + + // The num of segment in the dir should be 1 + let segment_count = fs::read_dir(dir.path()).unwrap().count(); + assert_eq!(segment_count, 1); + + region.close().unwrap(); + } } diff --git a/src/wal/src/local_storage_impl/wal_manager.rs b/src/wal/src/local_storage_impl/wal_manager.rs index 91c69fcd5a..694831eae1 100644 --- a/src/wal/src/local_storage_impl/wal_manager.rs +++ b/src/wal/src/local_storage_impl/wal_manager.rs @@ -54,14 +54,14 @@ impl LocalStorageImpl { ) -> Result { let LocalStorageConfig { cache_size, - max_segment_size, + segment_size, .. } = config.clone(); let wal_path_str = wal_path.to_str().unwrap().to_string(); let region_manager = RegionManager::new( wal_path_str.clone(), cache_size, - max_segment_size, + segment_size, runtime.clone(), ) .box_err() @@ -104,6 +104,10 @@ impl WalManager for LocalStorageImpl { location: WalLocation, sequence_num: SequenceNumber, ) -> Result<()> { + debug!( + "Mark delete entries up to {} for location:{:?}", + sequence_num, location + ); self.region_manager .mark_delete_entries_up_to(location, sequence_num) .box_err() @@ -111,10 +115,7 @@ impl WalManager for LocalStorageImpl { } async fn close_region(&self, region_id: RegionId) -> Result<()> { - debug!( - "Close region for LocalStorage based WAL is noop operation, region_id:{}", - region_id - ); + debug!("Close region {} for LocalStorage based WAL", region_id); self.region_manager .close(region_id) .box_err() @@ -133,10 +134,15 @@ impl WalManager for LocalStorageImpl { ctx: &ReadContext, req: &ReadRequest, ) -> Result { + debug!( + "Read batch from LocalStorage based WAL, ctx:{:?}, req:{:?}", + ctx, req + ); self.region_manager.read(ctx, req).box_err().context(Read) } async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch) -> Result { + debug!("Write batch to LocalStorage based WAL, ctx:{:?}", ctx); self.region_manager .write(ctx, batch) .box_err() @@ -144,6 +150,10 @@ impl WalManager for LocalStorageImpl { } async fn scan(&self, ctx: &ScanContext, req: &ScanRequest) -> Result { + debug!( + "Scan from LocalStorage based WAL, ctx:{:?}, req:{:?}", + ctx, req + ); self.region_manager.scan(ctx, req).box_err().context(Read) } @@ -181,7 +191,7 @@ impl WalsOpener for LocalStorageWalsOpener { }; let write_runtime = runtimes.write_runtime.clone(); - let data_path = Path::new(&local_storage_wal_config.path); + let data_path = Path::new(&local_storage_wal_config.data_dir); let data_wal = if config.disable_data { Arc::new(crate::dummy::DoNothing) diff --git a/src/wal/tests/read_write.rs b/src/wal/tests/read_write.rs index cccde53cfc..24c4c75f5b 100644 --- a/src/wal/tests/read_write.rs +++ b/src/wal/tests/read_write.rs @@ -72,7 +72,6 @@ fn test_kafka_wal() { } #[test] -#[ignore = "this test cannot pass completely, since delete is not supported yet"] fn test_local_storage_wal() { let builder = LocalStorageWalBuilder; test_all(builder, false); @@ -997,7 +996,7 @@ impl WalBuilder for LocalStorageWalBuilder { async fn build(&self, data_path: &Path, runtime: Arc) -> Arc { let config = LocalStorageConfig { - path: data_path.to_str().unwrap().to_string(), + data_dir: data_path.to_str().unwrap().to_string(), ..LocalStorageConfig::default() }; Arc::new(LocalStorageImpl::new(data_path.to_path_buf(), config, runtime).unwrap())