Skip to content

Commit

Permalink
store split_fields in split (#4190)
Browse files Browse the repository at this point in the history
* store split_fields in split

* add header with version to split fields

* refactor

* add test

* refactor PutPayload
  • Loading branch information
PSeitz authored Dec 4, 2023
1 parent b31bf72 commit 73fde83
Show file tree
Hide file tree
Showing 15 changed files with 367 additions and 40 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ vrl = { version = "0.8.1", default-features = false, features = [
warp = "0.3"
whichlang = { git = "https://github.com/quickwit-oss/whichlang", rev = "fe406416" }
wiremock = "0.5"
zstd = "0.13.0"

aws-config = "0.55.0"
aws-credential-types = { version = "0.55.0", features = [
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-common/src/shared_consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ pub const SCROLL_BATCH_LEN: usize = 1_000;

/// Prefix used in chitchat to broadcast the list of primary shards hosted by a leader.
pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:";

/// File name for the encoded list of fields in the split
pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields";
7 changes: 7 additions & 0 deletions quickwit/quickwit-directories/src/bundle_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ mod tests {
use std::fs::File;
use std::io::Write;

use quickwit_common::shared_consts::SPLIT_FIELDS_FILE_NAME;
use quickwit_storage::{PutPayload, SplitPayloadBuilder};

use super::*;
Expand All @@ -182,6 +183,7 @@ mod tests {

let split_streamer = SplitPayloadBuilder::get_split_payload(
&[test_filepath1.clone(), test_filepath2.clone()],
&[],
&[
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
],
Expand Down Expand Up @@ -213,6 +215,7 @@ mod tests {

let split_streamer = SplitPayloadBuilder::get_split_payload(
&[test_filepath1.clone(), test_filepath2.clone()],
&[],
&[
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
],
Expand Down Expand Up @@ -251,13 +254,17 @@ mod tests {

let split_streamer = SplitPayloadBuilder::get_split_payload(
&[test_filepath1.clone(), test_filepath2.clone()],
&[5, 5, 5],
&[1, 2, 3],
)?;

let data = split_streamer.read_all().await?;

let bundle_dir = BundleDirectory::open_split(FileSlice::from(data.to_vec()))?;

let field_data = bundle_dir.atomic_read(Path::new(SPLIT_FIELDS_FILE_NAME))?;
assert_eq!(&*field_data, &[5, 5, 5]);

let f1_data = bundle_dir.atomic_read(Path::new("f1"))?;
assert_eq!(&*f1_data, &[123u8, 76u8]);

Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ tracing = { workspace = true }
ulid = { workspace = true }
utoipa = { workspace = true }
vrl = { workspace = true, optional = true }
zstd = { workspace = true }

quickwit-actors = { workspace = true }
quickwit-aws = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ mod tests {
let split_store = {
let mut storage_builder = RamStorageBuilder::default();
for split in &splits_to_merge {
let buffer = SplitPayloadBuilder::get_split_payload(&[], &[1, 2, 3])?
let buffer = SplitPayloadBuilder::get_split_payload(&[], &[], &[1, 2, 3])?
.read_all()
.await?;
storage_builder = storage_builder.put(&split_file(split.split_id()), &buffer);
Expand Down
9 changes: 7 additions & 2 deletions quickwit/quickwit-indexing/src/actors/packager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ const MAX_VALUES_PER_TAG_FIELD: usize = if cfg!(any(test, feature = "testsuite")

use crate::actors::Uploader;
use crate::models::{
EmptySplit, IndexedSplit, IndexedSplitBatch, PackagedSplit, PackagedSplitBatch,
serialize_split_fields, EmptySplit, IndexedSplit, IndexedSplitBatch, PackagedSplit,
PackagedSplitBatch,
};

/// The role of the packager is to get an index writer and
Expand Down Expand Up @@ -186,7 +187,6 @@ impl Handler<EmptySplit> for Packager {
}
}

/// returns true iff merge is required to reach a state where
fn list_split_files(
segment_metas: &[SegmentMeta],
scratch_directory: &TempDirectory,
Expand Down Expand Up @@ -287,6 +287,9 @@ fn create_packaged_split(
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;

let fields_metadata = split.index.fields_metadata()?;

let mut tags = BTreeSet::default();
for named_field in tag_fields {
let inverted_indexes = index_reader
Expand All @@ -312,8 +315,10 @@ fn create_packaged_split(
let mut hotcache_bytes = Vec::new();
build_hotcache(split.split_scratch_directory.path(), &mut hotcache_bytes)?;
ctx.record_progress();
let serialized_split_fields = serialize_split_fields(&fields_metadata);

let packaged_split = PackagedSplit {
serialized_split_fields,
split_attrs: split.split_attrs,
split_scratch_directory: split.split_scratch_directory,
tags,
Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-indexing/src/actors/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ impl Handler<PackagedSplitBatch> for Uploader {

let split_streamer = SplitPayloadBuilder::get_split_payload(
&packaged_split.split_files,
&packaged_split.serialized_split_fields,
&packaged_split.hotcache_bytes,
)?;
let split_metadata = create_split_metadata(
Expand Down Expand Up @@ -465,6 +466,7 @@ async fn upload_split(
) -> anyhow::Result<()> {
let split_streamer = SplitPayloadBuilder::get_split_payload(
&packaged_split.split_files,
&packaged_split.serialized_split_fields,
&packaged_split.hotcache_bytes,
)?;

Expand Down Expand Up @@ -561,6 +563,7 @@ mod tests {
delete_opstamp: 10,
num_merge_ops: 0,
},
serialized_split_fields: Vec::new(),
split_scratch_directory,
tags: Default::default(),
hotcache_bytes: Vec::new(),
Expand Down Expand Up @@ -672,6 +675,7 @@ mod tests {
delete_opstamp: 0,
num_merge_ops: 0,
},
serialized_split_fields: Vec::new(),
split_scratch_directory: split_scratch_directory_1,
tags: Default::default(),
split_files: Vec::new(),
Expand All @@ -695,6 +699,7 @@ mod tests {
delete_opstamp: 0,
num_merge_ops: 0,
},
serialized_split_fields: Vec::new(),
split_scratch_directory: split_scratch_directory_2,
tags: Default::default(),
split_files: Vec::new(),
Expand Down Expand Up @@ -812,6 +817,7 @@ mod tests {
delete_opstamp: 10,
num_merge_ops: 0,
},
serialized_split_fields: Vec::new(),
split_scratch_directory,
tags: Default::default(),
hotcache_bytes: Vec::new(),
Expand Down Expand Up @@ -990,6 +996,7 @@ mod tests {
delete_opstamp: 10,
num_merge_ops: 0,
},
serialized_split_fields: Vec::new(),
split_scratch_directory,
tags: Default::default(),
hotcache_bytes: Vec::new(),
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-indexing/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod publisher_message;
mod raw_doc_batch;
mod shard_positions;
mod split_attrs;
mod split_fields;

pub use indexed_split::{
CommitTrigger, EmptySplit, IndexedSplit, IndexedSplitBatch, IndexedSplitBatchBuilder,
Expand All @@ -53,6 +54,7 @@ pub use raw_doc_batch::RawDocBatch;
pub(crate) use shard_positions::LocalShardPositionsUpdate;
pub use shard_positions::ShardPositionsService;
pub use split_attrs::{create_split_metadata, SplitAttrs};
pub use split_fields::{read_split_fields, serialize_split_fields, FieldConfig};

#[derive(Debug)]
pub struct NewPublishToken(pub PublishToken);
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/src/models/packaged_split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::merge_policy::MergeOperation;
use crate::models::{PublishLock, SplitAttrs};

pub struct PackagedSplit {
pub serialized_split_fields: Vec<u8>,
pub split_attrs: SplitAttrs,
pub split_scratch_directory: TempDirectory,
pub tags: BTreeSet<String>,
Expand Down
Loading

0 comments on commit 73fde83

Please sign in to comment.