Skip to content

Commit

Permalink
refactor PutPayload
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed Nov 29, 2023
1 parent 62b6df8 commit 424dd46
Showing 1 changed file with 46 additions and 44 deletions.
90 changes: 46 additions & 44 deletions quickwit/quickwit-storage/src/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::fmt::Debug;
use std::io::{self, SeekFrom};
use std::ops::Range;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -107,9 +106,11 @@ impl PutPayload for FilePayload {
}

/// SplitPayloadBuilder is used to create a `SplitPayload`.
#[derive(Debug, Default)]
#[derive(Default)]
pub struct SplitPayloadBuilder {
metadata: BundleStorageFileOffsets,
/// File name, payload, and range of the payload in the bundle file
/// Range coud be computed on the fly, and is just kept here for convenience.
payloads: Vec<(String, Box<dyn PutPayload>, Range<u64>)>,
current_offset: usize,
}

Expand All @@ -124,78 +125,79 @@ impl SplitPayloadBuilder {
for file in split_files {
split_payload_builder.add_file(file)?;
}
let offsets = split_payload_builder.finalize(serialized_split_fields, hotcache)?;
split_payload_builder.add_payload(
SPLIT_FIELDS_FILE_NAME.to_string(),
Box::new(serialized_split_fields.to_vec()),
);
let offsets = split_payload_builder.finalize(hotcache)?;
Ok(offsets)
}

/// Adds the payload to the bundle file.
pub fn add_payload(&mut self, file_name: String, payload: Box<dyn PutPayload>) {
let range = self.current_offset as u64..self.current_offset as u64 + payload.len();
self.current_offset += payload.len() as usize;
self.payloads.push((file_name, payload, range));
}

/// Adds the file to the bundle file.
///
/// The hotcache needs to be the last file that is added, in order to be able to read
/// the hotcache and the metadata in one continuous read.
pub fn add_file(&mut self, path: &Path) -> io::Result<()> {
let file = std::fs::metadata(path)?;
let file_range = self.current_offset as u64..self.current_offset as u64 + file.len();
self.current_offset += file.len() as usize;
self.metadata.files.insert(path.to_owned(), file_range);
let file_name = path
.file_name()
.and_then(std::ffi::OsStr::to_str)
.map(ToOwned::to_owned)
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid file name in path {:?}", path),
)
})?;

let file_payload = FilePayload {
path: path.to_owned(),
len: file.len(),
};

self.add_payload(file_name, Box::new(file_payload));

Ok(())
}

/// Writes the bundle file offsets metadata at the end of the bundle file,
/// and returns the byte-range of this metadata information.
pub fn finalize(
mut self,
serialized_split_fields: &[u8],
hotcache: &[u8],
) -> anyhow::Result<SplitPayload> {
// let split_fields = serialize_split_fields(fields_metadata);
pub fn finalize(self, hotcache: &[u8]) -> anyhow::Result<SplitPayload> {
// Add the fields metadata to the bundle metadata.
// Build the footer.
let mut footer_bytes = Vec::new();
// Fix paths to be relative
let mut metadata_with_fixed_paths = self
.metadata
.files
let metadata_with_fixed_paths = self
.payloads
.iter()
.map(|(path, range)| {
let file_name = path.file_name().ok_or_else(|| {
anyhow::anyhow!("could not extract file_name from path {path:?}")
})?;
.map(|(file_name, _, range)| {
let file_name = PathBuf::from(file_name);
Ok((file_name, range.start..range.end))
})
.collect::<Result<HashMap<_, _>, anyhow::Error>>()?;

// Add Split fields to the bundle metadata.
let list_fields_data_range = self.current_offset as u64
..self.current_offset as u64 + serialized_split_fields.len() as u64;
self.current_offset += serialized_split_fields.len();
metadata_with_fixed_paths.insert(SPLIT_FIELDS_FILE_NAME.into(), list_fields_data_range);

let bundle_storage_file_offsets = BundleStorageFileOffsets {
files: metadata_with_fixed_paths,
};
let metadata_json =
BundleStorageFileOffsetsVersions::serialize(&bundle_storage_file_offsets);

// The hotcache needs to be the next to the metadata in order to be able to read both
// in one continuous read.
let mut footer_bytes = Vec::new();
footer_bytes.extend(&metadata_json);
footer_bytes.extend((metadata_json.len() as u32).to_le_bytes());
footer_bytes.extend(hotcache);
footer_bytes.extend((hotcache.len() as u32).to_le_bytes());

let mut payloads: Vec<Box<dyn PutPayload>> = Vec::new();

let mut sorted_files = self.metadata.files.iter().collect::<Vec<_>>();
sorted_files.sort_by_key(|(_file, range)| range.start);

for (path, byte_range) in sorted_files {
let file_payload = FilePayload {
path: path.to_owned(),
len: byte_range.end - byte_range.start,
};
payloads.push(Box::new(file_payload));
}
let mut payloads: Vec<Box<dyn PutPayload>> = self
.payloads
.into_iter()
.map(|(_, payload, _)| payload)
.collect();

payloads.push(Box::new(serialized_split_fields.to_vec()));
payloads.push(Box::new(footer_bytes.to_vec()));

Ok(SplitPayload {
Expand Down

0 comments on commit 424dd46

Please sign in to comment.