Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store split_fields in split #4190

Merged
merged 6 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ vrl = { version = "0.8.1", default-features = false, features = [
warp = "0.3"
whichlang = { git = "https://github.com/quickwit-oss/whichlang", rev = "fe406416" }
wiremock = "0.5"
zstd = "0.13.0"

aws-config = "0.55.0"
aws-credential-types = { version = "0.55.0", features = [
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-common/src/shared_consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ pub const SCROLL_BATCH_LEN: usize = 1_000;

/// Prefix used in chitchat to broadcast the list of primary shards hosted by a leader.
pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:";

/// File name for the encoded list of fields in the split
pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields";
7 changes: 7 additions & 0 deletions quickwit/quickwit-directories/src/bundle_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ mod tests {
use std::fs::File;
use std::io::Write;

use quickwit_common::shared_consts::SPLIT_FIELDS_FILE_NAME;
use quickwit_storage::{PutPayload, SplitPayloadBuilder};

use super::*;
Expand All @@ -182,6 +183,7 @@ mod tests {

let split_streamer = SplitPayloadBuilder::get_split_payload(
&[test_filepath1.clone(), test_filepath2.clone()],
&[],
&[
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
],
Expand Down Expand Up @@ -213,6 +215,7 @@ mod tests {

let split_streamer = SplitPayloadBuilder::get_split_payload(
&[test_filepath1.clone(), test_filepath2.clone()],
&[],
&[
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
],
Expand Down Expand Up @@ -251,13 +254,17 @@ mod tests {

let split_streamer = SplitPayloadBuilder::get_split_payload(
&[test_filepath1.clone(), test_filepath2.clone()],
&[5, 5, 5],
&[1, 2, 3],
)?;

let data = split_streamer.read_all().await?;

let bundle_dir = BundleDirectory::open_split(FileSlice::from(data.to_vec()))?;

let field_data = bundle_dir.atomic_read(Path::new(SPLIT_FIELDS_FILE_NAME))?;
assert_eq!(&*field_data, &[5, 5, 5]);

let f1_data = bundle_dir.atomic_read(Path::new("f1"))?;
assert_eq!(&*f1_data, &[123u8, 76u8]);

Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ tracing = { workspace = true }
ulid = { workspace = true }
utoipa = { workspace = true }
vrl = { workspace = true, optional = true }
zstd = { workspace = true }

quickwit-actors = { workspace = true }
quickwit-aws = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ mod tests {
let split_store = {
let mut storage_builder = RamStorageBuilder::default();
for split in &splits_to_merge {
let buffer = SplitPayloadBuilder::get_split_payload(&[], &[1, 2, 3])?
let buffer = SplitPayloadBuilder::get_split_payload(&[], &[], &[1, 2, 3])?
.read_all()
.await?;
storage_builder = storage_builder.put(&split_file(split.split_id()), &buffer);
Expand Down
9 changes: 7 additions & 2 deletions quickwit/quickwit-indexing/src/actors/packager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ const MAX_VALUES_PER_TAG_FIELD: usize = if cfg!(any(test, feature = "testsuite")

use crate::actors::Uploader;
use crate::models::{
EmptySplit, IndexedSplit, IndexedSplitBatch, PackagedSplit, PackagedSplitBatch,
serialize_split_fields, EmptySplit, IndexedSplit, IndexedSplitBatch, PackagedSplit,
PackagedSplitBatch,
};

/// The role of the packager is to get an index writer and
Expand Down Expand Up @@ -186,7 +187,6 @@ impl Handler<EmptySplit> for Packager {
}
}

/// returns true iff merge is required to reach a state where
fn list_split_files(
segment_metas: &[SegmentMeta],
scratch_directory: &TempDirectory,
Expand Down Expand Up @@ -287,6 +287,9 @@ fn create_packaged_split(
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;

let fields_metadata = split.index.fields_metadata()?;

let mut tags = BTreeSet::default();
for named_field in tag_fields {
let inverted_indexes = index_reader
Expand All @@ -312,8 +315,10 @@ fn create_packaged_split(
let mut hotcache_bytes = Vec::new();
build_hotcache(split.split_scratch_directory.path(), &mut hotcache_bytes)?;
ctx.record_progress();
let serialized_split_fields = serialize_split_fields(&fields_metadata);

let packaged_split = PackagedSplit {
serialized_split_fields,
split_attrs: split.split_attrs,
split_scratch_directory: split.split_scratch_directory,
tags,
Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-indexing/src/actors/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ impl Handler<PackagedSplitBatch> for Uploader {

let split_streamer = SplitPayloadBuilder::get_split_payload(
&packaged_split.split_files,
&packaged_split.serialized_split_fields,
&packaged_split.hotcache_bytes,
)?;
let split_metadata = create_split_metadata(
Expand Down Expand Up @@ -465,6 +466,7 @@ async fn upload_split(
) -> anyhow::Result<()> {
let split_streamer = SplitPayloadBuilder::get_split_payload(
&packaged_split.split_files,
&packaged_split.serialized_split_fields,
&packaged_split.hotcache_bytes,
)?;

Expand Down Expand Up @@ -561,6 +563,7 @@ mod tests {
delete_opstamp: 10,
num_merge_ops: 0,
},
serialized_split_fields: Vec::new(),
split_scratch_directory,
tags: Default::default(),
hotcache_bytes: Vec::new(),
Expand Down Expand Up @@ -672,6 +675,7 @@ mod tests {
delete_opstamp: 0,
num_merge_ops: 0,
},
serialized_split_fields: Vec::new(),
split_scratch_directory: split_scratch_directory_1,
tags: Default::default(),
split_files: Vec::new(),
Expand All @@ -695,6 +699,7 @@ mod tests {
delete_opstamp: 0,
num_merge_ops: 0,
},
serialized_split_fields: Vec::new(),
split_scratch_directory: split_scratch_directory_2,
tags: Default::default(),
split_files: Vec::new(),
Expand Down Expand Up @@ -812,6 +817,7 @@ mod tests {
delete_opstamp: 10,
num_merge_ops: 0,
},
serialized_split_fields: Vec::new(),
split_scratch_directory,
tags: Default::default(),
hotcache_bytes: Vec::new(),
Expand Down Expand Up @@ -990,6 +996,7 @@ mod tests {
delete_opstamp: 10,
num_merge_ops: 0,
},
serialized_split_fields: Vec::new(),
split_scratch_directory,
tags: Default::default(),
hotcache_bytes: Vec::new(),
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-indexing/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod publisher_message;
mod raw_doc_batch;
mod shard_positions;
mod split_attrs;
mod split_fields;

pub use indexed_split::{
CommitTrigger, EmptySplit, IndexedSplit, IndexedSplitBatch, IndexedSplitBatchBuilder,
Expand All @@ -53,6 +54,7 @@ pub use raw_doc_batch::RawDocBatch;
pub(crate) use shard_positions::LocalShardPositionsUpdate;
pub use shard_positions::ShardPositionsService;
pub use split_attrs::{create_split_metadata, SplitAttrs};
pub use split_fields::{read_split_fields, serialize_split_fields, FieldConfig};

#[derive(Debug)]
pub struct NewPublishToken(pub PublishToken);
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/src/models/packaged_split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::merge_policy::MergeOperation;
use crate::models::{PublishLock, SplitAttrs};

pub struct PackagedSplit {
pub serialized_split_fields: Vec<u8>,
pub split_attrs: SplitAttrs,
pub split_scratch_directory: TempDirectory,
pub tags: BTreeSet<String>,
Expand Down
Loading