From c8a39c876ab84704193b3867fed089cbe55d158e Mon Sep 17 00:00:00 2001 From: ccx1024cc <1261138729@qq.com> Date: Wed, 12 Jul 2023 08:59:50 +0800 Subject: [PATCH] fix: amplify io is too large to hold in fuse buffer (#1311) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: amplify io is too large to hold in fuse buffer Fuse request buffer is fixed by `FUSE_KERN_BUF_SIZE * pagesize() + FUSE_HEADER_ SIZE`. When amplify io is larger than it, FuseDevWriter suffers from smaller buffer. As a result, invalid data error is returned. Reproduction: run nydusd with 3MB amplify_io error from random io: reply error header OutHeader { len: 16, error: -5, unique: 108 }, error Custom { kind: InvalidData, error: "data out of range, available 1052656 requested 1250066" } Details: size of fuse buffer = 1052656 + 16 (size of inner header) = 256(page number) * 4096(page size) + 4096(fuse header) let amplify_io = min(user_specified, fuseWriter.available_bytes()) Resolution: This pr is not best implements, but independent of modification to [fuse-backend-rs]("https://github.com/cloud-hypervisor/fuse-backend-rs"). In future, evalucation of amplify_io will be replaced with [ZeroCopyWriter.available_bytes()]("https://github.com/cloud-hypervisor/fuse-backend-rs/pull/135"). Signed-off-by: 泰友 * feat: e2e for amplify io larger than fuse buffer Signed-off-by: 泰友 --------- Signed-off-by: 泰友 Co-authored-by: 泰友 --- Cargo.toml | 2 +- rafs/src/fs.rs | 9 ++--- smoke/Makefile | 2 +- smoke/tests/api_test.go | 2 +- smoke/tests/image_test.go | 7 ++-- smoke/tests/native_layer_test.go | 62 ++++++++++++++++++++++++++++++++ smoke/tests/tool/context.go | 2 ++ smoke/tests/tool/iterator.go | 4 +++ smoke/tests/tool/nydusd.go | 4 ++- smoke/tests/tool/verify.go | 1 + 10 files changed, 84 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7422857dd59..eb72e64a536 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ path = "src/lib.rs" anyhow = "1" clap = { version = "4.0.18", features = ["derive", "cargo"] } flexi_logger = { version = "0.25", features = ["compress"] } -fuse-backend-rs = "^0.10.3" +fuse-backend-rs = "^0.10.4" hex = "0.4.3" hyper = "0.14.11" hyperlocal = "0.8.0" diff --git a/rafs/src/fs.rs b/rafs/src/fs.rs index cad27019b4a..e958c5f8f55 100644 --- a/rafs/src/fs.rs +++ b/rafs/src/fs.rs @@ -621,19 +621,20 @@ impl FileSystem for Rafs { assert!(!io_vecs.is_empty() && !io_vecs[0].is_empty()); // Try to amplify user io for Rafs v5, to improve performance. - if self.sb.meta.is_v5() && size < self.amplify_io { + let amplify_io = cmp::min(self.amplify_io as usize, w.available_bytes()) as u32; + if self.sb.meta.is_v5() && size < amplify_io { let all_chunks_ready = self.device.all_chunks_ready(&io_vecs); if !all_chunks_ready { let chunk_mask = self.metadata().chunk_size as u64 - 1; let next_chunk_base = (offset + (size as u64) + chunk_mask) & !chunk_mask; let window_base = cmp::min(next_chunk_base, inode_size); let actual_size = window_base - (offset & !chunk_mask); - if actual_size < self.amplify_io as u64 { - let window_size = self.amplify_io as u64 - actual_size; + if actual_size < amplify_io as u64 { + let window_size = amplify_io as u64 - actual_size; let orig_cnt = io_vecs.iter().fold(0, |s, d| s + d.len()); self.sb.amplify_io( &self.device, - self.amplify_io, + amplify_io, &mut io_vecs, &inode, window_base, diff --git a/smoke/Makefile b/smoke/Makefile index bb7318ad5fd..96260e5c8d4 100644 --- a/smoke/Makefile +++ b/smoke/Makefile @@ -15,7 +15,7 @@ build: # NYDUS_NYDUSIFY=/path/to/latest/nydusify \ # make test test: build - sudo -E ./smoke.test -test.v -test.timeout 10m -test.parallel=8 -test.run=$(TESTS) + sudo -E ./smoke.test -test.v -test.timeout 10m -test.parallel=16 -test.run=$(TESTS) # WORK_DIR=/tmp \ # NYDUS_BUILDER=/path/to/latest/nydus-image \ diff --git a/smoke/tests/api_test.go b/smoke/tests/api_test.go index 48bd598b4f2..a05ac84f86c 100644 --- a/smoke/tests/api_test.go +++ b/smoke/tests/api_test.go @@ -195,7 +195,7 @@ func (a *APIV1TestSuite) TestPrefetch(t *testing.T) { config.RafsMode = ctx.Runtime.RafsMode err = nydusd.MountByAPI(config) require.NoError(t, err) - time.Sleep(time.Millisecond * 10) + time.Sleep(time.Millisecond * 15) bcm, err := nydusd.GetBlobCacheMetrics("") require.NoError(t, err) diff --git a/smoke/tests/image_test.go b/smoke/tests/image_test.go index ff785b38df5..c7c9e730dfb 100644 --- a/smoke/tests/image_test.go +++ b/smoke/tests/image_test.go @@ -15,9 +15,10 @@ import ( ) const ( - paramZran = "zran" - paramBatch = "batch" - paramEncrypt = "encrypt" + paramZran = "zran" + paramBatch = "batch" + paramEncrypt = "encrypt" + paramAmplifyIO = "amplify_io" ) type ImageTestSuite struct { diff --git a/smoke/tests/native_layer_test.go b/smoke/tests/native_layer_test.go index 99f1031b309..2c32eee30f9 100644 --- a/smoke/tests/native_layer_test.go +++ b/smoke/tests/native_layer_test.go @@ -43,6 +43,7 @@ func (n *NativeLayerTestSuite) TestMakeLayers() test.Generator { Dimension(paramEnablePrefetch, []interface{}{false, true}). Dimension(paramBatch, []interface{}{"0", "0x100000"}). Dimension(paramEncrypt, []interface{}{false, true}). + Dimension(paramAmplifyIO, []interface{}{uint64(0x100000)}). Skip(func(param *tool.DescartesItem) bool { // rafs v6 not support cached mode nor dummy cache @@ -77,6 +78,67 @@ func (n *NativeLayerTestSuite) TestMakeLayers() test.Generator { ctx.Runtime.CacheCompressed = scenario.GetBool(paramCacheCompressed) ctx.Runtime.RafsMode = scenario.GetString(paramRafsMode) ctx.Runtime.EnablePrefetch = scenario.GetBool(paramEnablePrefetch) + ctx.Runtime.AmplifyIO = scenario.GetUInt64(paramAmplifyIO) + + return scenario.Str(), func(t *testing.T) { + n.testMakeLayers(*ctx, t) + } + } +} + +func (n *NativeLayerTestSuite) TestAmplifyIO() test.Generator { + + scenarios := tool.DescartesIterator{} + scenarios. + + /* Common params */ + Dimension(paramCompressor, []interface{}{"lz4_block"}). + Dimension(paramFSVersion, []interface{}{"5", "6"}). + Dimension(paramChunkSize, []interface{}{"0x100000"}). + Dimension(paramCacheType, []interface{}{"blobcache"}). + Dimension(paramCacheCompressed, []interface{}{false}). + Dimension(paramRafsMode, []interface{}{"direct"}). + Dimension(paramEnablePrefetch, []interface{}{true}). + Dimension(paramBatch, []interface{}{"0x100000"}). + Dimension(paramEncrypt, []interface{}{false}). + + /* Amplify io - target param */ + Dimension(paramAmplifyIO, []interface{}{uint64(0x0), uint64(0x100000), uint64(0x10000000)}). + Skip(func(param *tool.DescartesItem) bool { + + // Rafs v6 not support cached mode nor dummy cache + if param.GetString(paramFSVersion) == "6" { + return param.GetString(paramRafsMode) == "cached" || param.GetString(paramCacheType) == "" + } + + // Dummy cache not support prefetch + if param.GetString(paramCacheType) == "" && param.GetBool(paramEnablePrefetch) { + return true + } + + // Batch or encrypt not work with rafs v5. + if param.GetString(paramFSVersion) == "5" && (param.GetString(paramBatch) != "0" || param.GetBool(paramEncrypt)) { + return true + } + + return false + }) + + return func() (name string, testCase test.Case) { + if !scenarios.HasNext() { + return + } + scenario := scenarios.Next() + + ctx := tool.DefaultContext(n.t) + ctx.Build.Compressor = scenario.GetString(paramCompressor) + ctx.Build.FSVersion = scenario.GetString(paramFSVersion) + ctx.Build.ChunkSize = scenario.GetString(paramChunkSize) + ctx.Runtime.CacheType = scenario.GetString(paramCacheType) + ctx.Runtime.CacheCompressed = scenario.GetBool(paramCacheCompressed) + ctx.Runtime.RafsMode = scenario.GetString(paramRafsMode) + ctx.Runtime.EnablePrefetch = scenario.GetBool(paramEnablePrefetch) + ctx.Runtime.AmplifyIO = scenario.GetUInt64(paramAmplifyIO) return scenario.Str(), func(t *testing.T) { n.testMakeLayers(*ctx, t) diff --git a/smoke/tests/tool/context.go b/smoke/tests/tool/context.go index 9e714eb0cf0..703613be264 100644 --- a/smoke/tests/tool/context.go +++ b/smoke/tests/tool/context.go @@ -39,6 +39,7 @@ type RuntimeContext struct { CacheCompressed bool RafsMode string EnablePrefetch bool + AmplifyIO uint64 } type EnvContext struct { @@ -75,6 +76,7 @@ func DefaultContext(t *testing.T) *Context { CacheCompressed: false, RafsMode: "direct", EnablePrefetch: true, + AmplifyIO: uint64(0x100000), }, } } diff --git a/smoke/tests/tool/iterator.go b/smoke/tests/tool/iterator.go index 37f6e7310f8..bf49e32ef1e 100644 --- a/smoke/tests/tool/iterator.go +++ b/smoke/tests/tool/iterator.go @@ -47,6 +47,10 @@ func (d *DescartesItem) Str() string { return sb.String() } +func (d *DescartesItem) GetUInt64(name string) uint64 { + return d.vals[name].(uint64) +} + // Generator of Cartesian product. // // An example is below: diff --git a/smoke/tests/tool/nydusd.go b/smoke/tests/tool/nydusd.go index ee822d1b9d9..adfae105743 100644 --- a/smoke/tests/tool/nydusd.go +++ b/smoke/tests/tool/nydusd.go @@ -69,6 +69,7 @@ type NydusdConfig struct { LatestReadFiles bool AccessPattern bool PrefetchFiles []string + AmplifyIO uint64 } type Nydusd struct { @@ -104,7 +105,8 @@ var configTpl = ` "digest_validate": {{.DigestValidate}}, "enable_xattr": true, "latest_read_files": {{.LatestReadFiles}}, - "access_pattern": {{.AccessPattern}} + "access_pattern": {{.AccessPattern}}, + "amplify_io": {{.AmplifyIO}} } ` diff --git a/smoke/tests/tool/verify.go b/smoke/tests/tool/verify.go index 8d05cf2a66e..d2d4ad277d2 100644 --- a/smoke/tests/tool/verify.go +++ b/smoke/tests/tool/verify.go @@ -29,6 +29,7 @@ func Verify(t *testing.T, ctx Context, expectedFiles map[string]*File) { CacheCompressed: ctx.Runtime.CacheCompressed, RafsMode: ctx.Runtime.RafsMode, DigestValidate: false, + AmplifyIO: ctx.Runtime.AmplifyIO, } nydusd, err := NewNydusd(config)