Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: add runtime support for batch chunk #1289

Merged
merged 6 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 33 additions & 21 deletions rafs/src/builder/core/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,59 +393,71 @@ impl Node {
chunk_data: &[u8],
chunk: &mut ChunkWrapper,
) -> Result<Option<BlobChunkInfoV2Ondisk>> {
let uncompressed_size = chunk_data.len() as u32;
let aligned_chunk_size = if ctx.aligned_chunk {
let d_size = chunk_data.len() as u32;
let aligned_d_size = if ctx.aligned_chunk {
// Safe to unwrap because `chunk_size` is much less than u32::MAX.
try_round_up_4k(uncompressed_size).unwrap()
try_round_up_4k(d_size).unwrap()
} else {
uncompressed_size
d_size
};
let pre_uncompressed_offset = blob_ctx.current_uncompressed_offset;
blob_ctx.uncompressed_blob_size = pre_uncompressed_offset + aligned_chunk_size as u64;
blob_ctx.current_uncompressed_offset += aligned_chunk_size as u64;
chunk.set_uncompressed_offset(pre_uncompressed_offset);
chunk.set_uncompressed_size(uncompressed_size);
let pre_d_offset = blob_ctx.current_uncompressed_offset;
blob_ctx.uncompressed_blob_size = pre_d_offset + aligned_d_size as u64;
blob_ctx.current_uncompressed_offset += aligned_d_size as u64;
chunk.set_uncompressed_offset(pre_d_offset);
chunk.set_uncompressed_size(d_size);

let mut chunk_info = None;

if self.inode.child_count() == 1
&& uncompressed_size < ctx.batch_size / 2
&& d_size < ctx.batch_size / 2
&& ctx.blob_batch_generator.is_some()
{
// This chunk will be added into a batch chunk.
let mut batch = ctx.blob_batch_generator.as_ref().unwrap().lock().unwrap();

if batch.chunk_data_buf_len() as u32 + uncompressed_size < ctx.batch_size {
if batch.chunk_data_buf_len() as u32 + d_size < ctx.batch_size {
// Add into current batch chunk directly.
chunk_info =
Some(batch.generate_chunk_info(pre_uncompressed_offset, uncompressed_size)?);
chunk_info = Some(batch.generate_chunk_info(pre_d_offset, d_size)?);
batch.append_chunk_data_buf(chunk_data);
} else {
// Dump current batch chunk if exists, and then add into a new batch chunk.
if !batch.chunk_data_buf_is_empty() {
// Dump current batch chunk.
let (pre_compressed_offset, compressed_size, _) =
let (pre_c_offset, c_size, _) =
Self::write_chunk_data(ctx, blob_ctx, blob_writer, batch.chunk_data_buf())?;
batch.add_context(pre_compressed_offset, compressed_size);
batch.add_context(pre_c_offset, c_size);
batch.clear_chunk_data_buf();
}

// Add into a new batch chunk.
chunk_info =
Some(batch.generate_chunk_info(pre_uncompressed_offset, uncompressed_size)?);
chunk_info = Some(batch.generate_chunk_info(pre_d_offset, d_size)?);
batch.append_chunk_data_buf(chunk_data);
}
} else if !ctx.blob_features.contains(BlobFeatures::SEPARATE) {
// For other case which needs to write chunk data to data blobs.
let (pre_compressed_offset, compressed_size, is_compressed) =

// Interrupt and dump buffered batch chunks.
// TODO: cancel the interruption.
if let Some(batch) = &ctx.blob_batch_generator {
let mut batch = batch.lock().unwrap();
if !batch.chunk_data_buf_is_empty() {
// Dump current batch chunk.
let (pre_c_offset, c_size, _) =
Self::write_chunk_data(ctx, blob_ctx, blob_writer, batch.chunk_data_buf())?;
batch.add_context(pre_c_offset, c_size);
batch.clear_chunk_data_buf();
}
}

let (pre_c_offset, c_size, is_compressed) =
Self::write_chunk_data(ctx, blob_ctx, blob_writer, chunk_data)
.with_context(|| format!("failed to write chunk data {:?}", self.path()))?;
chunk.set_compressed_offset(pre_compressed_offset);
chunk.set_compressed_size(compressed_size);
chunk.set_compressed_offset(pre_c_offset);
chunk.set_compressed_size(c_size);
chunk.set_compressed(is_compressed);
}

event_tracer!("blob_uncompressed_size", +uncompressed_size);
event_tracer!("blob_uncompressed_size", +d_size);

Ok(chunk_info)
}
Expand Down
1 change: 1 addition & 0 deletions smoke/tests/compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (c *CompatibilityTestSuite) TestConvertImages() test.Generator {
ctx.Build.Compressor = "lz4_block"
ctx.Build.ChunkSize = "0x100000"
ctx.Build.OCIRef = false
ctx.Build.BatchSize = "0"

image := c.prepareImage(c.t, scenario.GetString(paramImage))
return scenario.Str(), func(t *testing.T) {
Expand Down
24 changes: 19 additions & 5 deletions smoke/tests/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
)

const (
paramZran = "zran"
paramZran = "zran"
paramBatch = "batch"
)

type ImageTestSuite struct {
Expand All @@ -30,10 +31,16 @@ func (i *ImageTestSuite) TestConvertImages() test.Generator {
Dimension(paramImage, []interface{}{"nginx:latest"}).
Dimension(paramFSVersion, []interface{}{"5", "6"}).
Dimension(paramZran, []interface{}{false, true}).
Dimension(paramBatch, []interface{}{"0", "0x100000"}).
Skip(
func(param *tool.DescartesItem) bool {
// Zran not work with rafs v6.
return param.GetString(paramFSVersion) == "5" && param.GetBool(paramZran)
// Zran and Batch not work with rafs v5.
if param.GetString(paramFSVersion) == "5" && (param.GetBool(paramZran) || param.GetString(paramBatch) != "0") {
return true
}

// Zran and Batch can not work together.
return param.GetBool(paramZran) && param.GetString(paramBatch) != "0"
})

return func() (name string, testCase test.Case) {
Expand All @@ -45,6 +52,7 @@ func (i *ImageTestSuite) TestConvertImages() test.Generator {
ctx := tool.DefaultContext(i.T)
ctx.Build.FSVersion = scenario.GetString(paramFSVersion)
ctx.Build.OCIRef = scenario.GetBool(paramZran)
ctx.Build.BatchSize = scenario.GetString(paramBatch)

image := i.prepareImage(i.T, scenario.GetString(paramImage))
return scenario.Str(), func(t *testing.T) {
Expand All @@ -64,6 +72,12 @@ func (i *ImageTestSuite) TestConvertImage(t *testing.T, ctx tool.Context, source
if ctx.Build.OCIRef {
enableOCIRef = "--oci-ref"
}

enableBatchSize := ""
if ctx.Build.BatchSize != "0" {
enableBatchSize = "--batch-size " + ctx.Build.BatchSize
}

target := fmt.Sprintf("%s-nydus-%s", source, uuid.NewString())
fsVersion := fmt.Sprintf("--fs-version %s", ctx.Build.FSVersion)
logLevel := "--log-level warn"
Expand All @@ -78,8 +92,8 @@ func (i *ImageTestSuite) TestConvertImage(t *testing.T, ctx tool.Context, source

// Convert image
convertCmd := fmt.Sprintf(
"%s %s convert --source %s --target %s %s %s --nydus-image %s --work-dir %s %s",
ctx.Binary.Nydusify, logLevel, source, target, fsVersion, enableOCIRef, ctx.Binary.Builder, ctx.Env.WorkDir, compressor,
"%s %s convert --source %s --target %s %s %s %s --nydus-image %s --work-dir %s %s",
ctx.Binary.Nydusify, logLevel, source, target, fsVersion, enableOCIRef, enableBatchSize, ctx.Binary.Builder, ctx.Env.WorkDir, compressor,
)
tool.RunWithoutOutput(t, convertCmd)

Expand Down
6 changes: 6 additions & 0 deletions smoke/tests/native_layer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (n *NativeLayerTestSuite) TestMakeLayers() test.Generator {
Dimension(paramCacheCompressed, []interface{}{true, false}).
Dimension(paramRafsMode, []interface{}{"direct", "cached"}).
Dimension(paramEnablePrefetch, []interface{}{false, true}).
Dimension(paramBatch, []interface{}{"0", "0x100000"}).
Skip(func(param *tool.DescartesItem) bool {

// rafs v6 not support cached mode nor dummy cache
Expand All @@ -53,6 +54,11 @@ func (n *NativeLayerTestSuite) TestMakeLayers() test.Generator {
return true
}

// Batch not work with rafs v5.
if param.GetString(paramFSVersion) == "5" && param.GetString(paramBatch) != "0" {
return true
}

return false
})

Expand Down
1 change: 1 addition & 0 deletions smoke/tests/tool/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type BuildContext struct {
ChunkSize string
OCIRef bool
OCIRefGzip bool
BatchSize string
}

type RuntimeContext struct {
Expand Down
66 changes: 50 additions & 16 deletions storage/src/cache/cachedfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ pub(crate) struct FileCacheEntry {
pub(crate) is_legacy_stargz: bool,
// The blob is for an RAFS filesystem in `TARFS` mode.
pub(crate) is_tarfs: bool,
// The blob contains batch chunks.
pub(crate) is_batch: bool,
// The blob is based on ZRan decompression algorithm.
pub(crate) is_zran: bool,
// True if direct IO is enabled for the `self.file`, supported for fscache only.
Expand Down Expand Up @@ -403,6 +405,18 @@ impl FileCacheEntry {
.get_zran_context(zran_index)
.ok_or_else(|| einval!("failed to get ZRan context for chunk"))?;
let blob_end = ctx.in_offset + ctx.in_len as u64;
(blob_start, blob_end)
} else if self.is_batch {
let meta = self
.get_blob_meta_info()?
.ok_or_else(|| einval!("failed to get blob meta object"))?;

let (c_offset, _) = meta.get_compressed_info(chunks[0].id())?;
let blob_start = c_offset;

let (c_offset, c_size) = meta.get_compressed_info(chunks[chunks.len() - 1].id())?;
let blob_end = c_offset + c_size as u64;

(blob_start, blob_end)
} else {
let last = chunks.len() - 1;
Expand Down Expand Up @@ -459,6 +473,10 @@ impl BlobCache for FileCacheEntry {
self.is_legacy_stargz
}

fn is_batch(&self) -> bool {
self.is_batch
}

fn is_zran(&self) -> bool {
self.is_zran
}
Expand Down Expand Up @@ -1004,14 +1022,15 @@ impl FileCacheEntry {
} else {
BlobIoTag::Internal
};

let (start, len) = if let Ok(Some(meta)) = self.get_blob_meta_info() {
meta.get_compressed_info(chunk.id())?
} else {
(chunk.compressed_offset(), chunk.compressed_size())
};

// NOTE: Only this request region can read more chunks from backend with user io.
state.push(
RegionType::Backend,
chunk.compressed_offset(),
chunk.compressed_size(),
tag,
Some(chunk.clone()),
)?;
state.push(RegionType::Backend, start, len, tag, Some(chunk.clone()))?;
}
}

Expand Down Expand Up @@ -1093,7 +1112,8 @@ impl FileCacheEntry {
tag_set.insert(chunk.id());
}
}
region_hold = Region::with(region, v);

region_hold = Region::with(self, region, v)?;
for (idx, c) in region_hold.chunks.iter().enumerate() {
if tag_set.contains(&c.id()) {
region_hold.tags[idx] = true;
Expand Down Expand Up @@ -1447,16 +1467,30 @@ impl Region {
}
}

fn with(region: &Region, chunks: Vec<Arc<dyn BlobChunkInfo>>) -> Self {
fn with(
ctx: &FileCacheEntry,
region: &Region,
chunks: Vec<Arc<dyn BlobChunkInfo>>,
) -> Result<Self> {
assert!(!chunks.is_empty());
let len = chunks.len();
let blob_address = chunks[0].compressed_offset();
let last = &chunks[len - 1];
let sz = last.compressed_offset() - blob_address;
assert!(sz < u32::MAX as u64);
let blob_len = sz as u32 + last.compressed_size();

Region {
let meta = ctx
.get_blob_meta_info()?
.ok_or_else(|| einval!("failed to get blob meta object"))?;
let (blob_address, blob_len) = if ctx.is_batch && meta.is_batch_chunk(chunks[0].id()) {
// Assert all chunks are in the same batch.
meta.get_compressed_info(chunks[0].id())?
} else {
let ba = chunks[0].compressed_offset();
let last = &chunks[len - 1];
let sz = last.compressed_offset() - ba;
assert!(sz < u32::MAX as u64);

(ba, sz as u32 + last.compressed_size())
};

Ok(Region {
r#type: region.r#type,
status: region.status,
count: len as u32,
Expand All @@ -1465,7 +1499,7 @@ impl Region {
blob_address,
blob_len,
seg: region.seg.clone(),
}
})
}

fn append(
Expand Down
5 changes: 4 additions & 1 deletion storage/src/cache/filecache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ impl FileCacheEntry {
) -> Result<Self> {
let is_separate_meta = blob_info.has_feature(BlobFeatures::SEPARATE);
let is_tarfs = blob_info.features().is_tarfs();
let is_batch = blob_info.has_feature(BlobFeatures::BATCH);
let is_zran = blob_info.has_feature(BlobFeatures::ZRAN);
let blob_id = blob_info.blob_id();
let blob_meta_id = if is_separate_meta {
Expand Down Expand Up @@ -296,12 +297,13 @@ impl FileCacheEntry {
};

trace!(
"filecache entry: is_raw_data {}, direct {}, legacy_stargz {}, separate_meta {}, tarfs {}, zran {}",
"filecache entry: is_raw_data {}, direct {}, legacy_stargz {}, separate_meta {}, tarfs {}, batch {}, zran {}",
mgr.cache_raw_data,
is_direct_chunkmap,
is_legacy_stargz,
is_separate_meta,
is_tarfs,
is_batch,
is_zran,
);
Ok(FileCacheEntry {
Expand All @@ -326,6 +328,7 @@ impl FileCacheEntry {
is_direct_chunkmap,
is_legacy_stargz,
is_tarfs,
is_batch,
is_zran,
dio_enabled: false,
need_validation,
Expand Down
2 changes: 2 additions & 0 deletions storage/src/cache/fscache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ impl FileCacheEntry {
.get_fscache_file()
.ok_or_else(|| einval!("No fscache file associated with the blob_info"))?;
let is_separate_meta = blob_info.has_feature(BlobFeatures::SEPARATE);
let is_batch = blob_info.has_feature(BlobFeatures::BATCH);
let is_zran = blob_info.has_feature(BlobFeatures::ZRAN);
let cache_cipher = blob_info.cipher();
let is_cache_encrypted = cache_cipher.is_encryption_enabled();
Expand Down Expand Up @@ -283,6 +284,7 @@ impl FileCacheEntry {
is_cache_encrypted,
is_legacy_stargz: blob_info.is_legacy_stargz(),
is_tarfs,
is_batch,
is_zran,
dio_enabled: true,
need_validation,
Expand Down
Loading