Skip to content

Commit

Permalink
Add --blob-cache-dir arg use to generate raw blob cache and meta
Browse files Browse the repository at this point in the history
generate blob cache and blob meta through the —-blob-cache-dir parameters,
so that nydusd can be started directly from these two files without
going to the backend to download. this can improve the performance
of data loading in localfs mode.

Signed-off-by: zyfjeff <[email protected]>
  • Loading branch information
zyfjeff committed Oct 10, 2023
1 parent b777564 commit 0f4ff08
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 12 deletions.
4 changes: 3 additions & 1 deletion builder/src/core/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ impl Blob {
} else if ctx.blob_tar_reader.is_some() {
header.set_separate_blob(true);
};

let mut compressor = Self::get_compression_algorithm_for_meta(ctx);
let (compressed_data, compressed) = compress::compress(ci_data, compressor)
.with_context(|| "failed to compress blob chunk info array".to_string())?;
Expand Down Expand Up @@ -223,6 +222,9 @@ impl Blob {
}

blob_ctx.blob_meta_header = header;
if let Some(blob_cache) = ctx.blob_cache_generator.as_ref() {
blob_cache.write_blob_meta(ci_data, &header)?;
}
let encrypted_header =
crypt::encrypt_with_context(header.as_bytes(), cipher_obj, cipher_ctx, encrypt)?;
let header_size = encrypted_header.len();
Expand Down
88 changes: 85 additions & 3 deletions builder/src/core/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::collections::{HashMap, VecDeque};
use std::convert::TryFrom;
use std::fs::{remove_file, rename, File, OpenOptions};
use std::io::{BufWriter, Cursor, Read, Seek, Write};
use std::mem::size_of;
use std::os::unix::fs::FileTypeExt;
use std::path::{Display, Path, PathBuf};
use std::str::FromStr;
Expand Down Expand Up @@ -40,7 +41,7 @@ use nydus_storage::meta::{
BlobMetaChunkArray, BlobMetaChunkInfo, ZranContextGenerator,
};
use nydus_utils::digest::DigestData;
use nydus_utils::{compress, digest, div_round_up, round_down, BufReaderInfo};
use nydus_utils::{compress, digest, div_round_up, round_down, try_round_up_4k, BufReaderInfo};

use super::node::ChunkSource;
use crate::core::tree::TreeNode;
Expand Down Expand Up @@ -193,7 +194,13 @@ impl Write for ArtifactMemoryWriter {
}
}

struct ArtifactFileWriter(ArtifactWriter);
pub struct ArtifactFileWriter(pub ArtifactWriter);

impl ArtifactFileWriter {
pub fn finalize(&mut self, name: Option<String>) -> Result<()> {
self.0.finalize(name)
}
}

impl RafsIoWrite for ArtifactFileWriter {
fn as_any(&self) -> &dyn Any {
Expand All @@ -215,6 +222,12 @@ impl RafsIoWrite for ArtifactFileWriter {
}
}

impl ArtifactFileWriter {
pub fn set_len(&mut self, s: u64) -> std::io::Result<()> {
self.0.file.get_mut().set_len(s)
}
}

impl Seek for ArtifactFileWriter {
fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
self.0.file.seek(pos)
Expand Down Expand Up @@ -367,6 +380,72 @@ impl ArtifactWriter {
}
}

pub struct BlobCacheGenerator {
blob_data: Mutex<ArtifactFileWriter>,
blob_meta: Mutex<ArtifactFileWriter>,
}

impl BlobCacheGenerator {
pub fn new(storage: ArtifactStorage) -> Result<Self> {
Ok(BlobCacheGenerator {
blob_data: Mutex::new(ArtifactFileWriter(ArtifactWriter::new(storage.clone())?)),
blob_meta: Mutex::new(ArtifactFileWriter(ArtifactWriter::new(storage)?)),
})
}

pub fn write_blob_meta(
&self,
data: &[u8],
header: &BlobCompressionContextHeader,
) -> Result<()> {
let mut guard = self.blob_meta.lock().unwrap();
let aligned_uncompressed_size = try_round_up_4k(data.len() as u64).ok_or(anyhow!(
format!("invalid input {} for try_round_up_4k", data.len())
))?;
guard.set_len(
aligned_uncompressed_size + size_of::<BlobCompressionContextHeader>() as u64,
)?;
guard
.write_all(data)
.context("failed to write blob meta data")?;
guard.seek(std::io::SeekFrom::Start(aligned_uncompressed_size))?;
guard
.write_all(header.as_bytes())
.context("failed to write blob meta header")?;
Ok(())
}

pub fn write_blob_data(
&self,
chunk_data: &[u8],
chunk_info: &ChunkWrapper,
aligned_d_size: u32,
) -> Result<()> {
let mut guard = self.blob_data.lock().unwrap();
let curr_pos = guard.seek(std::io::SeekFrom::End(0))?;
if curr_pos < chunk_info.uncompressed_offset() + aligned_d_size as u64 {
guard.set_len(chunk_info.uncompressed_offset() + aligned_d_size as u64)?;
}

guard.seek(std::io::SeekFrom::Start(chunk_info.uncompressed_offset()))?;
guard
.write_all(&chunk_data)
.context("failed to write blob cache")?;
Ok(())
}

pub fn finalize(&self, name: &str) -> Result<()> {
let blob_data_name = format!("{}.blob.data", name);
let mut guard = self.blob_data.lock().unwrap();
guard.finalize(Some(blob_data_name))?;
drop(guard);

let blob_meta_name = format!("{}.blob.meta", name);
let mut guard = self.blob_meta.lock().unwrap();
guard.finalize(Some(blob_meta_name))
}
}

/// BlobContext is used to hold the blob information of a layer during build.
pub struct BlobContext {
/// Blob id (user specified or sha256(blob)).
Expand Down Expand Up @@ -1182,6 +1261,8 @@ pub struct BuildContext {

pub features: Features,
pub configuration: Arc<ConfigV2>,
/// Generate the blob cache and blob meta
pub blob_cache_generator: Option<BlobCacheGenerator>,
}

impl BuildContext {
Expand Down Expand Up @@ -1221,7 +1302,6 @@ impl BuildContext {
} else {
crypt::Algorithm::None
};

BuildContext {
blob_id,
aligned_chunk,
Expand Down Expand Up @@ -1250,6 +1330,7 @@ impl BuildContext {

features,
configuration: Arc::new(ConfigV2::default()),
blob_cache_generator: None,
}
}

Expand Down Expand Up @@ -1299,6 +1380,7 @@ impl Default for BuildContext {
blob_inline_meta: false,
features: Features::new(),
configuration: Arc::new(ConfigV2::default()),
blob_cache_generator: None,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions builder/src/core/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,9 @@ impl Node {
chunk.set_compressed(is_compressed);
}

if let Some(blob_cache) = ctx.blob_cache_generator.as_ref() {
blob_cache.write_blob_data(chunk_data, chunk, aligned_d_size)?;
}
event_tracer!("blob_uncompressed_size", +d_size);

Ok(chunk_info)
Expand Down
8 changes: 6 additions & 2 deletions builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub use self::compact::BlobCompactor;
pub use self::core::bootstrap::Bootstrap;
pub use self::core::chunk_dict::{parse_chunk_dict_arg, ChunkDict, HashChunkDict};
pub use self::core::context::{
ArtifactStorage, ArtifactWriter, BlobContext, BlobManager, BootstrapContext, BootstrapManager,
BuildContext, BuildOutput, ConversionType,
ArtifactFileWriter, ArtifactStorage, ArtifactWriter, BlobCacheGenerator, BlobContext,
BlobManager, BootstrapContext, BootstrapManager, BuildContext, BuildOutput, ConversionType,
};
pub use self::core::feature::{Feature, Features};
pub use self::core::node::{ChunkSource, NodeChunk};
Expand Down Expand Up @@ -238,6 +238,10 @@ fn finalize_blob(
if !is_tarfs {
blob_writer.finalize(Some(blob_meta_id))?;
}

if let Some(blob_cache) = ctx.blob_cache_generator.as_ref() {
blob_cache.finalize(&blob_ctx.blob_id)?;
}
}

Ok(())
Expand Down
120 changes: 120 additions & 0 deletions smoke/tests/blobcache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package tests

import (
"fmt"
"io"
"io/fs"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/containerd/containerd/log"
"github.com/dragonflyoss/image-service/smoke/tests/texture"
"github.com/dragonflyoss/image-service/smoke/tests/tool"
"github.com/dragonflyoss/image-service/smoke/tests/tool/test"
"github.com/opencontainers/go-digest"
"github.com/stretchr/testify/require"
)

type BlobCacheTestSuite struct {
T *testing.T
}

func (a *BlobCacheTestSuite) compareTwoFiles(t *testing.T, left, right string) {

lf, err := os.Open(left)
require.NoError(t, err)
defer lf.Close()
leftDigester, err := digest.FromReader(lf)
require.NoError(t, err)


rf, err := os.Open(right)
require.NoError(t, err)
defer rf.Close()
rightDigester, err := digest.FromReader(rf)
require.NoError(t, err)

require.Equal(t, leftDigester, rightDigester)
}

func (a *BlobCacheTestSuite) TestGenerateBlobcache(t *testing.T) {

ctx := tool.DefaultContext(t)

ctx.PrepareWorkDir(t)
defer ctx.Destroy(t)

rootFs := texture.MakeLowerLayer(t, filepath.Join(ctx.Env.WorkDir, "root-fs"))

rootfsReader := rootFs.ToOCITar(t);

ociBlobDigester := digest.Canonical.Digester()
ociBlob, err := ioutil.TempFile(ctx.Env.BlobDir, "oci-blob-")
require.NoError(t, err)

_, err = io.Copy(io.MultiWriter(ociBlobDigester.Hash(), ociBlob), rootfsReader)
require.NoError(t, err)

ociBlobDigest := ociBlobDigester.Digest()
err = os.Rename(ociBlob.Name(), filepath.Join(ctx.Env.BlobDir, ociBlobDigest.Hex()))
require.NoError(t, err)

// use to generate blob.data and blob.meta
blobcacheDir := filepath.Join(ctx.Env.WorkDir, "blobcache")
err = os.MkdirAll(blobcacheDir, 0755)
require.NoError(t, err)

ctx.Env.BootstrapPath = filepath.Join(ctx.Env.WorkDir, "bootstrap")


tool.Run(t, fmt.Sprintf("%s create -t targz-ref --bootstrap %s --blob-dir %s --blob-cache-dir %s %s",
ctx.Binary.Builder, ctx.Env.BootstrapPath, ctx.Env.BlobDir, blobcacheDir,
filepath.Join(ctx.Env.BlobDir, ociBlobDigest.Hex())));

nydusd, err := tool.NewNydusd(tool.NydusdConfig{
NydusdPath: ctx.Binary.Nydusd,
BootstrapPath: ctx.Env.BootstrapPath,
ConfigPath: filepath.Join(ctx.Env.WorkDir, "nydusd-config.fusedev.json"),
MountPath: ctx.Env.MountDir,
APISockPath: filepath.Join(ctx.Env.WorkDir, "nydusd-api.sock"),
BackendType: "localfs",
BackendConfig: fmt.Sprintf(`{"dir": "%s"}`, ctx.Env.BlobDir),
EnablePrefetch: ctx.Runtime.EnablePrefetch,
BlobCacheDir: ctx.Env.CacheDir,
CacheType: ctx.Runtime.CacheType,
CacheCompressed: ctx.Runtime.CacheCompressed,
RafsMode: ctx.Runtime.RafsMode,
DigestValidate: false,
})
require.NoError(t, err)

err = nydusd.Mount()
require.NoError(t, err)
defer func() {
if err := nydusd.Umount(); err != nil {
log.L.WithError(err).Errorf("umount")
}
}()

// make sure blobcache ready
err = filepath.WalkDir(ctx.Env.MountDir, func(path string, entry fs.DirEntry, err error) error {
require.Nil(t, err)
if entry.Type().IsRegular() {
targetPath, err := filepath.Rel(ctx.Env.MountDir, path)
require.NoError(t, err)
_, _ = os.ReadFile(targetPath)
}
return nil
})
require.NoError(t, err)

a.compareTwoFiles(t, filepath.Join(blobcacheDir,fmt.Sprintf("%s.blob.data", ociBlobDigest.Hex())), filepath.Join(ctx.Env.CacheDir, fmt.Sprintf("%s.blob.data", ociBlobDigest.Hex())))
a.compareTwoFiles(t, filepath.Join(blobcacheDir, fmt.Sprintf("%s.blob.meta", ociBlobDigest.Hex())), filepath.Join(ctx.Env.CacheDir, fmt.Sprintf("%s.blob.meta", ociBlobDigest.Hex())))
}


func TestBlobCache(t *testing.T) {
test.Run(t, &BlobCacheTestSuite{T: t})
}
6 changes: 3 additions & 3 deletions smoke/tests/tool/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (l *Layer) TargetPath(t *testing.T, path string) string {

func (l *Layer) Pack(t *testing.T, packOption converter.PackOption, blobDir string) digest.Digest {
// Output OCI tar stream
ociTar := l.toOCITar(t)
ociTar := l.ToOCITar(t)
defer ociTar.Close()
l.recordFileTree(t)

Expand All @@ -141,7 +141,7 @@ func (l *Layer) Pack(t *testing.T, packOption converter.PackOption, blobDir stri

func (l *Layer) PackRef(t *testing.T, ctx Context, blobDir string, compress bool) (digest.Digest, digest.Digest) {
// Output OCI tar stream
ociTar := l.toOCITar(t)
ociTar := l.ToOCITar(t)
defer ociTar.Close()
l.recordFileTree(t)

Expand Down Expand Up @@ -238,7 +238,7 @@ func (l *Layer) recordFileTree(t *testing.T) {
})
}

func (l *Layer) toOCITar(t *testing.T) io.ReadCloser {
func (l *Layer) ToOCITar(t *testing.T) io.ReadCloser {
return archive.Diff(context.Background(), "", l.workDir)
}

Expand Down
Loading

0 comments on commit 0f4ff08

Please sign in to comment.