diff --git a/Cargo.toml b/Cargo.toml index fbd64232437..9656f5a92c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,6 +95,7 @@ default = [ "backend-s3", "backend-http-proxy", "backend-localdisk", + "dedup", ] virtiofs = [ "nydus-service/virtiofs", @@ -116,6 +117,8 @@ backend-oss = ["nydus-storage/backend-oss"] backend-registry = ["nydus-storage/backend-registry"] backend-s3 = ["nydus-storage/backend-s3"] +dedup = ["nydus-storage/dedup"] + [workspace] members = [ "api", diff --git a/docs/data-deduplication.md b/docs/data-deduplication.md index 45b259ad204..1b6e0305ec3 100644 --- a/docs/data-deduplication.md +++ b/docs/data-deduplication.md @@ -164,4 +164,25 @@ So Nydus provides a node level CAS system to reduce data downloaded from the reg The node level CAS system helps to achieve O4 and O5. -# Node Level CAS System (WIP) +# Node Level CAS System (Experimental) +Data deduplication can also be achieved when accessing Nydus images. The key idea is to maintain information about data chunks available on local host by using a database. +When a chunk is needed but not available in the uncompressed data blob files yet, we will query the database using chunk digest as key. +If a record with the same chunk digest already exists, it will be reused. +We call such a system as CAS (Content Addressable Storage). + +## Chunk Deduplication by Using CAS as L2 Cache +In this chunk deduplication mode, the CAS system works as an L2 cache to provide chunk data on demand, and it keeps Nydus bootstrap blobs as is. +It works in this way: +1. query the database when a chunk is needed but not available yet +2. copy data from source blob to target blob using `copy_file_range` if a record with the same chunk digest +3. download chunk data from remote if there's no record in database +4. insert a new record into the database for just downloaded chunk so it can be reused later. + +![chunk_dedup_l2cache](images/chunk_dedup_l2_cache.png) + +A data download operation can be avoided if a chunk already exists in the database. +And if the underlying filesystem support data reference, `copy_file_range` will use reference instead of data copy, thus reduce storage space consumption. +This design has benefit of robustness, the target blob file doesn't have any dependency on the database and source blob files, so ease garbage collection. +But it depends on capability of underlying filesystem to reduce storage consumption. + +## Chunk Deduplication by Rebuilding Nydus Bootstrap (WIP) diff --git a/docs/images/chunk_dedup_l2_cache.drawio b/docs/images/chunk_dedup_l2_cache.drawio new file mode 100644 index 00000000000..c7d1417615b --- /dev/null +++ b/docs/images/chunk_dedup_l2_cache.drawio @@ -0,0 +1,265 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/images/chunk_dedup_l2_cache.png b/docs/images/chunk_dedup_l2_cache.png new file mode 100644 index 00000000000..e931e3f6927 Binary files /dev/null and b/docs/images/chunk_dedup_l2_cache.png differ diff --git a/smoke/go.mod b/smoke/go.mod index dbf95cf434f..d68c7193fba 100644 --- a/smoke/go.mod +++ b/smoke/go.mod @@ -12,6 +12,7 @@ require ( github.com/pkg/xattr v0.4.9 github.com/stretchr/testify v1.8.4 golang.org/x/sys v0.15.0 + github.com/mattn/go-sqlite3 v1.14.23 ) require ( @@ -27,6 +28,7 @@ require ( github.com/google/go-cmp v0.6.0 // indirect github.com/klauspost/compress v1.17.4 // indirect github.com/kr/pretty v0.3.1 // indirect + github.com/mattn/go-sqlite3 v1.14.23 // indirect github.com/moby/sys/mountinfo v0.7.1 // indirect github.com/moby/sys/sequential v0.5.0 // indirect github.com/opencontainers/image-spec v1.1.0-rc5 // indirect diff --git a/smoke/go.sum b/smoke/go.sum index 0fa56cfafd8..67e97a62e87 100644 --- a/smoke/go.sum +++ b/smoke/go.sum @@ -10,6 +10,7 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX github.com/containerd/cgroups v1.1.0 h1:v8rEWFl6EoqHB+swVNjVoCJE8o3jX7e8nqBGPLaDFBM= github.com/containerd/cgroups v1.1.0/go.mod h1:6ppBcbh/NOOUU+dMKrykgaBnK9lCIBxHqJDGwsa1mIw= github.com/containerd/containerd v1.7.11 h1:lfGKw3eU35sjV0aG2eYZTiwFEY1pCzxdzicHP3SZILw= +github.com/containerd/containerd v1.7.11/go.mod h1:5UluHxHTX2rdvYuZ5OJTC5m/KJNs0Zs9wVoJm9zf5ZE= github.com/containerd/continuity v0.4.3 h1:6HVkalIp+2u1ZLH1J/pYX2oBVXlJZvh1X1A7bEZ9Su8= github.com/containerd/continuity v0.4.3/go.mod h1:F6PTNCKepoxEaXLQp3wDAjygEnImnZ/7o4JzpodfroQ= github.com/containerd/fifo v1.1.0 h1:4I2mbh5stb1u6ycIABlBw9zgtlK8viPI9QkQNRQEEmY= @@ -53,6 +54,7 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -67,7 +69,10 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-sqlite3 v1.14.23 h1:gbShiuAP1W5j9UOksQ06aiiqPMxYecovVGwmTxWtuw0= +github.com/mattn/go-sqlite3 v1.14.23/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/moby/sys/mountinfo v0.7.1 h1:/tTvQaSJRr2FshkhXiIpux6fQ2Zvc4j7tAhMTStAG2g= +github.com/moby/sys/mountinfo v0.7.1/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI= github.com/moby/sys/sequential v0.5.0 h1:OPvI35Lzn9K04PBbCLW0g4LcFAJgHsvXsRyewg5lXtc= github.com/moby/sys/sequential v0.5.0/go.mod h1:tH2cOOs5V9MlPiXcQzRC+eEyab644PWKGRYaaV5ZZlo= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -135,6 +140,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/smoke/tests/cas_test.go b/smoke/tests/cas_test.go new file mode 100644 index 00000000000..6520f6f5bdf --- /dev/null +++ b/smoke/tests/cas_test.go @@ -0,0 +1,140 @@ +// Copyright 2024 Nydus Developers. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +package tests + +import ( + "database/sql" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + _ "github.com/mattn/go-sqlite3" + + "github.com/dragonflyoss/nydus/smoke/tests/texture" + "github.com/dragonflyoss/nydus/smoke/tests/tool" + "github.com/dragonflyoss/nydus/smoke/tests/tool/test" + "github.com/stretchr/testify/require" +) + +type CasTestSuite struct{} + +func (c *CasTestSuite) TestCasTables() test.Generator { + scenarios := tool.DescartesIterator{} + scenarios.Dimension(paramEnablePrefetch, []interface{}{false, true}) + + return func() (name string, testCase test.Case) { + if !scenarios.HasNext() { + return + } + scenario := scenarios.Next() + + return scenario.Str(), func(t *testing.T) { + c.testCasTables(t, scenario.GetBool(paramEnablePrefetch)) + } + } +} + +func (c *CasTestSuite) testCasTables(t *testing.T, enablePrefetch bool) { + ctx, layer := texture.PrepareLayerWithContext(t) + ctx.Runtime.EnablePrefetch = enablePrefetch + ctx.Runtime.ChunkDedupDb = filepath.Join(ctx.Env.WorkDir, "cas.db") + + nydusd, err := tool.NewNydusdWithContext(*ctx) + require.NoError(t, err) + err = nydusd.Mount() + require.NoError(t, err) + defer nydusd.Umount() + nydusd.Verify(t, layer.FileTree) + + db, err := sql.Open("sqlite3", ctx.Runtime.ChunkDedupDb) + require.NoError(t, err) + defer db.Close() + + for _, expectedTable := range []string{"Blobs", "Chunks"} { + var count int + query := fmt.Sprintf("SELECT COUNT(*) FROM %s;", expectedTable) + err := db.QueryRow(query).Scan(&count) + require.NoError(t, err) + if expectedTable == "Blobs" { + require.Equal(t, count, 1) + } else { + require.Equal(t, count, 8) + } + } +} + +func (c *CasTestSuite) TestCasGc() test.Generator { + scenarios := tool.DescartesIterator{} + scenarios.Dimension(paramEnablePrefetch, []interface{}{false, true}) + + return func() (name string, testCase test.Case) { + if !scenarios.HasNext() { + return + } + scenario := scenarios.Next() + + return scenario.Str(), func(t *testing.T) { + c.testCasGc(t, scenario.GetBool(paramEnablePrefetch)) + } + } +} + +func (c *CasTestSuite) testCasGc(t *testing.T, enablePrefetch bool) { + ctx, layer := texture.PrepareLayerWithContext(t) + defer ctx.Destroy(t) + config := tool.NydusdConfig{ + NydusdPath: ctx.Binary.Nydusd, + MountPath: ctx.Env.MountDir, + APISockPath: filepath.Join(ctx.Env.WorkDir, "nydusd-api.sock"), + ConfigPath: filepath.Join(ctx.Env.WorkDir, "nydusd-config.fusedev.json"), + ChunkDedupDb: filepath.Join(ctx.Env.WorkDir, "cas.db"), + } + nydusd, err := tool.NewNydusd(config) + require.NoError(t, err) + + err = nydusd.Mount() + defer nydusd.Umount() + require.NoError(t, err) + + config.BootstrapPath = ctx.Env.BootstrapPath + config.MountPath = "/mount" + config.BackendType = "localfs" + config.BackendConfig = fmt.Sprintf(`{"dir": "%s"}`, ctx.Env.BlobDir) + config.BlobCacheDir = ctx.Env.CacheDir + config.CacheType = ctx.Runtime.CacheType + config.CacheCompressed = ctx.Runtime.CacheCompressed + config.RafsMode = ctx.Runtime.RafsMode + config.EnablePrefetch = enablePrefetch + config.DigestValidate = false + config.AmplifyIO = ctx.Runtime.AmplifyIO + err = nydusd.MountByAPI(config) + require.NoError(t, err) + + nydusd.VerifyByPath(t, layer.FileTree, config.MountPath) + + db, err := sql.Open("sqlite3", config.ChunkDedupDb) + require.NoError(t, err) + defer db.Close() + + // Mock nydus snapshotter clear cache + os.RemoveAll(filepath.Join(ctx.Env.WorkDir, "cache")) + time.Sleep(1 * time.Second) + + nydusd.UmountByAPI(config.MountPath) + + for _, expectedTable := range []string{"Blobs", "Chunks"} { + var count int + query := fmt.Sprintf("SELECT COUNT(*) FROM %s;", expectedTable) + err := db.QueryRow(query).Scan(&count) + require.NoError(t, err) + require.Zero(t, count) + } +} + +func TestCas(t *testing.T) { + test.Run(t, &CasTestSuite{}) +} diff --git a/smoke/tests/chunk_dedup_test.go b/smoke/tests/chunk_dedup_test.go new file mode 100644 index 00000000000..eca4d362e86 --- /dev/null +++ b/smoke/tests/chunk_dedup_test.go @@ -0,0 +1,125 @@ +// Copyright 2024 Nydus Developers. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +package tests + +import ( + "context" + "encoding/json" + "io" + "net" + "net/http" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/dragonflyoss/nydus/smoke/tests/texture" + "github.com/dragonflyoss/nydus/smoke/tests/tool" + "github.com/dragonflyoss/nydus/smoke/tests/tool/test" +) + +const ( + paramIteration = "iteration" +) + +type ChunkDedupTestSuite struct{} + +type BackendMetrics struct { + ReadCount uint64 `json:"read_count"` + ReadAmountTotal uint64 `json:"read_amount_total"` + ReadErrors uint64 `json:"read_errors"` +} + +func (c *ChunkDedupTestSuite) TestChunkDedup() test.Generator { + scenarios := tool.DescartesIterator{} + scenarios.Dimension(paramIteration, []interface{}{1}) + + file, _ := os.CreateTemp("", "cas-*.db") + defer os.Remove(file.Name()) + + return func() (name string, testCase test.Case) { + if !scenarios.HasNext() { + return + } + scenario := scenarios.Next() + + return scenario.Str(), func(t *testing.T) { + c.testRemoteWithDedup(t, file.Name()) + } + } +} + +func (c *ChunkDedupTestSuite) testRemoteWithDedup(t *testing.T, dbPath string) { + ctx, layer := texture.PrepareLayerWithContext(t) + defer ctx.Destroy(t) + ctx.Runtime.EnablePrefetch = false + ctx.Runtime.ChunkDedupDb = dbPath + + nydusd, err := tool.NewNydusdWithContext(*ctx) + require.NoError(t, err) + err = nydusd.Mount() + require.NoError(t, err) + defer nydusd.Umount() + nydusd.Verify(t, layer.FileTree) + metrics := c.getBackendMetrics(t, filepath.Join(ctx.Env.WorkDir, "nydusd-api.sock")) + require.Zero(t, metrics.ReadErrors) + + ctx2, layer2 := texture.PrepareLayerWithContext(t) + defer ctx2.Destroy(t) + ctx2.Runtime.EnablePrefetch = false + ctx2.Runtime.ChunkDedupDb = dbPath + + nydusd2, err := tool.NewNydusdWithContext(*ctx2) + require.NoError(t, err) + err = nydusd2.Mount() + require.NoError(t, err) + defer nydusd2.Umount() + nydusd2.Verify(t, layer2.FileTree) + metrics2 := c.getBackendMetrics(t, filepath.Join(ctx2.Env.WorkDir, "nydusd-api.sock")) + require.Zero(t, metrics2.ReadErrors) + + require.Greater(t, metrics.ReadCount, metrics2.ReadCount) + require.Greater(t, metrics.ReadAmountTotal, metrics2.ReadAmountTotal) +} + +func (c *ChunkDedupTestSuite) getBackendMetrics(t *testing.T, sockPath string) *BackendMetrics { + transport := &http.Transport{ + MaxIdleConns: 10, + IdleConnTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { + dialer := &net.Dialer{ + Timeout: 5 * time.Second, + KeepAlive: 5 * time.Second, + } + return dialer.DialContext(ctx, "unix", sockPath) + }, + } + + client := &http.Client{ + Timeout: 30 * time.Second, + Transport: transport, + } + + resp, err := client.Get("http://unix/api/v1/metrics/backend") + require.NoError(t, err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + var metrics BackendMetrics + if err = json.Unmarshal(body, &metrics); err != nil { + require.NoError(t, err) + } + + return &metrics +} + +func TestChunkDedup(t *testing.T) { + test.Run(t, &ChunkDedupTestSuite{}) +} diff --git a/smoke/tests/texture/layer.go b/smoke/tests/texture/layer.go index c6a4933b3fe..ab64bff2a85 100644 --- a/smoke/tests/texture/layer.go +++ b/smoke/tests/texture/layer.go @@ -10,7 +10,10 @@ import ( "syscall" "testing" + "github.com/containerd/nydus-snapshotter/pkg/converter" "github.com/dragonflyoss/nydus/smoke/tests/tool" + "github.com/opencontainers/go-digest" + "github.com/stretchr/testify/require" ) type LayerMaker func(t *testing.T, layer *tool.Layer) @@ -135,3 +138,28 @@ func MakeMatrixLayer(t *testing.T, workDir, id string) *tool.Layer { return layer } + +func PrepareLayerWithContext(t *testing.T) (*tool.Context, *tool.Layer) { + ctx := tool.DefaultContext(t) + + // Prepare work directory + ctx.PrepareWorkDir(t) + + lowerLayer := MakeLowerLayer(t, filepath.Join(ctx.Env.WorkDir, "source")) + lowerOCIBlobDigest, lowerRafsBlobDigest := lowerLayer.PackRef(t, *ctx, ctx.Env.BlobDir, ctx.Build.OCIRefGzip) + mergeOption := converter.MergeOption{ + BuilderPath: ctx.Binary.Builder, + ChunkDictPath: "", + OCIRef: true, + } + actualDigests, lowerBootstrap := tool.MergeLayers(t, *ctx, mergeOption, []converter.Layer{ + { + Digest: lowerRafsBlobDigest, + OriginalDigest: &lowerOCIBlobDigest, + }, + }) + require.Equal(t, []digest.Digest{lowerOCIBlobDigest}, actualDigests) + + ctx.Env.BootstrapPath = lowerBootstrap + return ctx, lowerLayer +} diff --git a/smoke/tests/tool/context.go b/smoke/tests/tool/context.go index 04cf6d851cf..b1215f59b7d 100644 --- a/smoke/tests/tool/context.go +++ b/smoke/tests/tool/context.go @@ -39,6 +39,7 @@ type RuntimeContext struct { RafsMode string EnablePrefetch bool AmplifyIO uint64 + ChunkDedupDb string } type EnvContext struct { diff --git a/smoke/tests/tool/nydusd.go b/smoke/tests/tool/nydusd.go index c340cce88b1..344588421c6 100644 --- a/smoke/tests/tool/nydusd.go +++ b/smoke/tests/tool/nydusd.go @@ -74,6 +74,7 @@ type NydusdConfig struct { AccessPattern bool PrefetchFiles []string AmplifyIO uint64 + ChunkDedupDb string // Hot Upgrade config. Upgrade bool SupervisorSockPath string @@ -193,6 +194,9 @@ func newNydusd(conf NydusdConfig) (*Nydusd, error) { if len(conf.BootstrapPath) > 0 { args = append(args, "--bootstrap", conf.BootstrapPath) } + if len(conf.ChunkDedupDb) > 0 { + args = append(args, "--dedup-db", conf.ChunkDedupDb) + } if conf.Upgrade { args = append(args, "--upgrade") } @@ -276,6 +280,7 @@ func NewNydusdWithContext(ctx Context) (*Nydusd, error) { RafsMode: ctx.Runtime.RafsMode, DigestValidate: false, AmplifyIO: ctx.Runtime.AmplifyIO, + ChunkDedupDb: ctx.Runtime.ChunkDedupDb, } if err := makeConfig(NydusdConfigTpl, conf); err != nil { @@ -346,7 +351,6 @@ func (nydusd *Nydusd) MountByAPI(config NydusdConfig) error { ) return err - } func (nydusd *Nydusd) Umount() error { diff --git a/src/bin/nydusd/main.rs b/src/bin/nydusd/main.rs index fc5e4b7a6b8..ab138442f6b 100644 --- a/src/bin/nydusd/main.rs +++ b/src/bin/nydusd/main.rs @@ -26,6 +26,7 @@ use nydus_service::{ create_daemon, create_fuse_daemon, create_vfs_backend, validate_threads_configuration, Error as NydusError, FsBackendMountCmd, FsBackendType, ServiceArgs, }; +use nydus_storage::cache::CasMgr; use crate::api_server_glue::ApiServerController; @@ -50,7 +51,7 @@ fn thread_validator(v: &str) -> std::result::Result { } fn append_fs_options(app: Command) -> Command { - app.arg( + let mut app = app.arg( Arg::new("bootstrap") .long("bootstrap") .short('B') @@ -87,7 +88,18 @@ fn append_fs_options(app: Command) -> Command { .help("Mountpoint within the FUSE/virtiofs device to mount the RAFS/passthroughfs filesystem") .default_value("/") .required(false), - ) + ); + + #[cfg(feature = "dedup")] + { + app = app.arg( + Arg::new("dedup-db") + .long("dedup-db") + .help("Database file for chunk deduplication"), + ); + } + + app } fn append_fuse_options(app: Command) -> Command { @@ -750,6 +762,13 @@ fn main() -> Result<()> { dump_program_info(); handle_rlimit_nofile_option(&args, "rlimit-nofile")?; + #[cfg(feature = "dedup")] + if let Some(db) = args.get_one::("dedup-db") { + let mgr = CasMgr::new(db).map_err(|e| eother!(format!("{}", e)))?; + info!("Enable chunk deduplication by using database at {}", db); + CasMgr::set_singleton(mgr); + } + match args.subcommand_name() { Some("singleton") => { // Safe to unwrap because the subcommand is `singleton`. diff --git a/storage/Cargo.toml b/storage/Cargo.toml index a45ba0a1fe1..8636d5cb53c 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -58,7 +58,6 @@ regex = "1.7.0" toml = "0.5" [features] -default = ["dedup"] backend-localdisk = [] backend-localdisk-gpt = ["gpt", "backend-localdisk"] backend-localfs = [] diff --git a/storage/src/cache/cachedfile.rs b/storage/src/cache/cachedfile.rs index d30bcb1762b..43c65cc3f77 100644 --- a/storage/src/cache/cachedfile.rs +++ b/storage/src/cache/cachedfile.rs @@ -13,6 +13,7 @@ use std::collections::HashSet; use std::fs::File; use std::io::{ErrorKind, Read, Result}; use std::mem::ManuallyDrop; +use std::ops::Deref; use std::os::unix::io::{AsRawFd, RawFd}; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; @@ -29,7 +30,7 @@ use tokio::runtime::Runtime; use crate::backend::BlobReader; use crate::cache::state::ChunkMap; use crate::cache::worker::{AsyncPrefetchConfig, AsyncPrefetchMessage, AsyncWorkerMgr}; -use crate::cache::{BlobCache, BlobIoMergeState}; +use crate::cache::{BlobCache, BlobIoMergeState, CasMgr}; use crate::device::{ BlobChunkInfo, BlobInfo, BlobIoDesc, BlobIoRange, BlobIoSegment, BlobIoTag, BlobIoVec, BlobObject, BlobPrefetchRequest, @@ -184,8 +185,10 @@ pub(crate) struct FileCacheEntry { pub(crate) blob_info: Arc, pub(crate) cache_cipher_object: Arc, pub(crate) cache_cipher_context: Arc, + pub(crate) cas_mgr: Option>, pub(crate) chunk_map: Arc, pub(crate) file: Arc, + pub(crate) file_path: Arc, pub(crate) meta: Option, pub(crate) metrics: Arc, pub(crate) prefetch_state: Arc, @@ -233,13 +236,16 @@ impl FileCacheEntry { } fn delay_persist_chunk_data(&self, chunk: Arc, buffer: Arc) { + let blob_info = self.blob_info.clone(); let delayed_chunk_map = self.chunk_map.clone(); let file = self.file.clone(); + let file_path = self.file_path.clone(); let metrics = self.metrics.clone(); let is_raw_data = self.is_raw_data; let is_cache_encrypted = self.is_cache_encrypted; let cipher_object = self.cache_cipher_object.clone(); let cipher_context = self.cache_cipher_context.clone(); + let cas_mgr = self.cas_mgr.clone(); metrics.buffered_backend_size.add(buffer.size() as u64); self.runtime.spawn_blocking(move || { @@ -291,6 +297,14 @@ impl FileCacheEntry { }; let res = Self::persist_cached_data(&file, offset, buf); Self::_update_chunk_pending_status(&delayed_chunk_map, chunk.as_ref(), res.is_ok()); + if let Some(mgr) = cas_mgr { + if let Err(e) = mgr.record_chunk(&blob_info, chunk.deref(), file_path.as_ref()) { + warn!( + "failed to record chunk state for dedup in delay_persist_chunk_data, {}", + e + ); + } + } }); } @@ -298,6 +312,14 @@ impl FileCacheEntry { let offset = chunk.uncompressed_offset(); let res = Self::persist_cached_data(&self.file, offset, buf); self.update_chunk_pending_status(chunk, res.is_ok()); + if let Some(mgr) = &self.cas_mgr { + if let Err(e) = mgr.record_chunk(&self.blob_info, chunk, self.file_path.as_ref()) { + warn!( + "failed to record chunk state for dedup in persist_chunk_data, {}", + e + ); + } + } } fn persist_cached_data(file: &Arc, offset: u64, buffer: &[u8]) -> Result<()> { @@ -1051,13 +1073,21 @@ impl FileCacheEntry { trace!("dispatch single io range {:?}", req); let mut blob_cci = BlobCCI::new(); for (i, chunk) in req.chunks.iter().enumerate() { - let is_ready = match self.chunk_map.check_ready_and_mark_pending(chunk.as_ref()) { + let mut is_ready = match self.chunk_map.check_ready_and_mark_pending(chunk.as_ref()) { Ok(true) => true, Ok(false) => false, Err(StorageError::Timeout) => false, // Retry if waiting for inflight IO timeouts Err(e) => return Err(einval!(e)), }; + if !is_ready { + if let Some(mgr) = self.cas_mgr.as_ref() { + is_ready = mgr.dedup_chunk(&self.blob_info, chunk.deref(), &self.file); + if is_ready { + self.update_chunk_pending_status(chunk.deref(), true); + } + } + } // Directly read chunk data from file cache into user buffer iff: // - the chunk is ready in the file cache // - data in the file cache is plaintext. @@ -1454,6 +1484,16 @@ impl FileCacheEntry { } } +impl Drop for FileCacheEntry { + fn drop(&mut self) { + if let Some(cas_mgr) = &self.cas_mgr { + if let Err(e) = cas_mgr.gc() { + warn!("cas_mgr gc failed: {}", e); + } + } + } +} + /// An enum to reuse existing buffers for IO operations, and CoW on demand. #[allow(dead_code)] enum DataBuffer { diff --git a/storage/src/cache/dedup/db.rs b/storage/src/cache/dedup/db.rs index 6daff37c70b..f0cf493296b 100644 --- a/storage/src/cache/dedup/db.rs +++ b/storage/src/cache/dedup/db.rs @@ -8,7 +8,7 @@ use std::path::Path; use r2d2::{Pool, PooledConnection}; use r2d2_sqlite::SqliteConnectionManager; -use rusqlite::{Connection, DropBehavior, OptionalExtension, Transaction}; +use rusqlite::{Connection, DropBehavior, OpenFlags, OptionalExtension, Transaction}; use super::Result; @@ -24,7 +24,8 @@ impl CasDb { } pub fn from_file(db_path: impl AsRef) -> Result { - let mgr = SqliteConnectionManager::file(db_path); + let mgr = SqliteConnectionManager::file(db_path) + .with_flags(OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE); let pool = r2d2::Pool::new(mgr)?; let conn = pool.get()?; @@ -128,7 +129,7 @@ impl CasDb { Ok(conn.last_insert_rowid() as u64) } - pub fn delete_blobs(&mut self, blobs: &[String]) -> Result<()> { + pub fn delete_blobs(&self, blobs: &[String]) -> Result<()> { let delete_blobs_sql = "DELETE FROM Blobs WHERE BlobId = (?1)"; let delete_chunks_sql = "DELETE FROM Chunks WHERE BlobId = (?1)"; let mut conn = self.get_connection()?; diff --git a/storage/src/cache/dedup/mod.rs b/storage/src/cache/dedup/mod.rs index f52a8fcc1de..64bb17d6e82 100644 --- a/storage/src/cache/dedup/mod.rs +++ b/storage/src/cache/dedup/mod.rs @@ -2,11 +2,26 @@ // // SPDX-License-Identifier: Apache-2.0 +use std::collections::hash_map::Entry; +use std::collections::HashMap; use std::fmt::{self, Display, Formatter}; +use std::fs::{File, OpenOptions}; use std::io::Error; +use std::path::Path; +use std::sync::{Arc, Mutex, RwLock}; + +use nydus_utils::digest::RafsDigest; + +use crate::cache::dedup::db::CasDb; +use crate::device::{BlobChunkInfo, BlobInfo}; +use crate::utils::copy_file_range; mod db; +lazy_static::lazy_static!( + static ref CAS_MGR: Mutex>> = Mutex::new(None); +); + /// Error codes related to local cas. #[derive(Debug)] pub enum CasError { @@ -47,3 +62,262 @@ impl From for CasError { /// Specialized `Result` for local cas. type Result = std::result::Result; + +pub struct CasMgr { + db: CasDb, + fds: RwLock>>, +} + +impl CasMgr { + pub fn new(db_path: impl AsRef) -> Result { + let db = CasDb::from_file(db_path.as_ref())?; + + Ok(CasMgr { + db, + fds: RwLock::new(HashMap::new()), + }) + } + + pub fn set_singleton(mgr: CasMgr) { + *CAS_MGR.lock().unwrap() = Some(Arc::new(mgr)); + } + + pub fn get_singleton() -> Option> { + CAS_MGR.lock().unwrap().clone() + } + + /// Deduplicate chunk data from existing data files. + /// + /// If any error happens, just pretend there's no source data available for dedup. + pub fn dedup_chunk( + &self, + blob: &BlobInfo, + chunk: &dyn BlobChunkInfo, + cache_file: &File, + ) -> bool { + let key = Self::chunk_key(blob, chunk); + if key.is_empty() { + return false; + } + + if let Ok(Some((path, offset))) = self.db.get_chunk_info(&key) { + let guard = self.fds.read().unwrap(); + let mut d_file = guard.get(&path).cloned(); + drop(guard); + + // Open the source file for dedup on demand. + if d_file.is_none() { + match OpenOptions::new().read(true).open(&path) { + Err(e) => warn!("failed to open dedup source file {}, {}", path, e), + Ok(f) => { + let mut guard = self.fds.write().unwrap(); + match guard.entry(path) { + Entry::Vacant(e) => { + let f = Arc::new(f); + e.insert(f.clone()); + d_file = Some(f); + } + Entry::Occupied(f) => { + // Somebody else has inserted the file, use it + d_file = Some(f.get().clone()); + } + } + } + } + } else if d_file.as_ref().unwrap().metadata().is_err() { + // If the blob file no longer exists, delete if from fds and db. + let mut guard = self.fds.write().unwrap(); + guard.remove(&path); + let blob_ids: &[String] = &[path]; + if let Err(e) = self.db.delete_blobs(&blob_ids) { + warn!("failed to delete blobs: {}", e); + } + return false; + } + + if let Some(f) = d_file { + match copy_file_range( + f, + offset, + cache_file, + chunk.uncompressed_offset(), + chunk.uncompressed_size() as usize, + ) { + Ok(_) => { + return true; + } + Err(e) => warn!("{e}"), + } + } + } + + false + } + + /// Add an available chunk data into the CAS database. + pub fn record_chunk( + &self, + blob: &BlobInfo, + chunk: &dyn BlobChunkInfo, + path: impl AsRef, + ) -> Result<()> { + let key = Self::chunk_key(blob, chunk); + if key.is_empty() { + return Ok(()); + } + + let path = path.as_ref().canonicalize()?; + let path = path.display().to_string(); + self.record_chunk_raw(&key, &path, chunk.uncompressed_offset()) + } + + pub fn record_chunk_raw(&self, chunk_id: &str, path: &str, offset: u64) -> Result<()> { + self.db.add_blob(path)?; + self.db.add_chunk(chunk_id, offset, path)?; + Ok(()) + } + + fn chunk_key(blob: &BlobInfo, chunk: &dyn BlobChunkInfo) -> String { + let id = chunk.chunk_id(); + if *id == RafsDigest::default() { + String::new() + } else { + blob.digester().to_string() + ":" + &chunk.chunk_id().to_string() + } + } + + /// Check if blobs in the database still exist on the filesystem and perform garbage collection. + pub fn gc(&self) -> Result<()> { + let all_blobs = self.db.get_all_blobs()?; + let mut blobs_not_exist = Vec::new(); + for (_, file_path) in all_blobs { + if !std::path::Path::new(&file_path).exists() { + blobs_not_exist.push(file_path); + } + } + + // If there are any non-existent blobs, delete them from the database. + if !blobs_not_exist.is_empty() { + self.db.delete_blobs(&blobs_not_exist).map_err(|e| { + warn!("failed to delete blobs: {}", e); + e + })?; + } + + let mut guard = self.fds.write().unwrap(); + for path in blobs_not_exist { + // Remove the non-existent blob paths from the cache. + guard.remove(&path); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::device::BlobFeatures; + use crate::test::MockChunkInfo; + use crate::RAFS_DEFAULT_CHUNK_SIZE; + use std::io::{Read, Write}; + use vmm_sys_util::tempfile::TempFile; + + #[test] + fn test_cas_chunk_op() { + let dbfile = TempFile::new().unwrap(); + let tmpfile2 = TempFile::new().unwrap(); + let src_path = tmpfile2.as_path().display().to_string(); + let mgr = CasMgr::new(dbfile.as_path()).unwrap(); + + let blob = BlobInfo::new( + 1, + src_path.clone(), + 8192, + 8192, + RAFS_DEFAULT_CHUNK_SIZE as u32, + 1, + BlobFeatures::empty(), + ); + let mut chunk = MockChunkInfo::new(); + chunk.block_id = RafsDigest { data: [3u8; 32] }; + chunk.uncompress_offset = 0; + chunk.uncompress_size = 8192; + let chunk = Arc::new(chunk) as Arc; + + let buf = vec![0x9u8; 8192]; + let mut src_file = tmpfile2.as_file().try_clone().unwrap(); + src_file.write_all(&buf).unwrap(); + mgr.record_chunk(&blob, chunk.as_ref(), &src_path).unwrap(); + + let mut tmpfile3 = TempFile::new().unwrap().into_file(); + assert!(mgr.dedup_chunk(&blob, chunk.as_ref(), &tmpfile3)); + let mut buf2 = vec![0x0u8; 8192]; + tmpfile3.read_exact(&mut buf2).unwrap(); + assert_eq!(buf, buf2); + } + + #[test] + fn test_cas_dedup_chunk_failed() { + let dbfile = TempFile::new().unwrap(); + let mgr = CasMgr::new(dbfile.as_path()).unwrap(); + + let new_blob = BlobInfo::new( + 1, + "test_blob".to_string(), + 8192, + 8192, + RAFS_DEFAULT_CHUNK_SIZE as u32, + 1, + BlobFeatures::empty(), + ); + + let mut chunk = MockChunkInfo::new(); + chunk.block_id = RafsDigest::default(); + chunk.uncompress_offset = 0; + chunk.uncompress_size = 8192; + let chunk = Arc::new(chunk) as Arc; + + let tmpfile = TempFile::new().unwrap().into_file(); + + assert!(!mgr.dedup_chunk(&new_blob, chunk.as_ref(), &tmpfile)); + } + + #[test] + fn test_cas_gc() { + let dbfile = TempFile::new().unwrap(); + let mgr = CasMgr::new(dbfile.as_path()).unwrap(); + + let tmpfile = TempFile::new().unwrap(); + let blob_path = tmpfile + .as_path() + .canonicalize() + .unwrap() + .display() + .to_string(); + let blob = BlobInfo::new( + 1, + blob_path.clone(), + 8192, + 8192, + RAFS_DEFAULT_CHUNK_SIZE as u32, + 1, + BlobFeatures::empty(), + ); + let mut chunk = MockChunkInfo::new(); + chunk.block_id = RafsDigest { data: [3u8; 32] }; + chunk.uncompress_offset = 0; + chunk.uncompress_size = 8192; + let chunk = Arc::new(chunk) as Arc; + mgr.record_chunk(&blob, chunk.as_ref(), &blob_path).unwrap(); + + let all_blobs_before_gc = mgr.db.get_all_blobs().unwrap(); + assert_eq!(all_blobs_before_gc.len(), 1); + + drop(tmpfile); + mgr.gc().unwrap(); + + let all_blobs_after_gc = mgr.db.get_all_blobs().unwrap(); + assert_eq!(all_blobs_after_gc.len(), 0); + } +} diff --git a/storage/src/cache/filecache/mod.rs b/storage/src/cache/filecache/mod.rs index e6b8c5b80da..11c7ac1143d 100644 --- a/storage/src/cache/filecache/mod.rs +++ b/storage/src/cache/filecache/mod.rs @@ -21,8 +21,9 @@ use crate::cache::state::{ BlobStateMap, ChunkMap, DigestedChunkMap, IndexedChunkMap, NoopChunkMap, }; use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; -use crate::cache::{BlobCache, BlobCacheMgr}; +use crate::cache::{BlobCache, BlobCacheMgr, CasMgr}; use crate::device::{BlobFeatures, BlobInfo}; +use crate::utils::get_path_from_file; pub const BLOB_RAW_FILE_SUFFIX: &str = ".blob.raw"; pub const BLOB_DATA_FILE_SUFFIX: &str = ".blob.data"; @@ -209,10 +210,19 @@ impl FileCacheEntry { reader.clone() }; + // Turn off chunk deduplication in case of cache data encryption is enabled or is tarfs. + let cas_mgr = if mgr.cache_encrypted || mgr.cache_raw_data || is_tarfs { + warn!("chunk deduplication trun off"); + None + } else { + CasMgr::get_singleton() + }; + let blob_compressed_size = Self::get_blob_size(&reader, &blob_info)?; let blob_uncompressed_size = blob_info.uncompressed_size(); let is_legacy_stargz = blob_info.is_legacy_stargz(); + let blob_file_path = format!("{}/{}", mgr.work_dir, blob_id); let ( file, meta, @@ -221,7 +231,6 @@ impl FileCacheEntry { is_get_blob_object_supported, need_validation, ) = if is_tarfs { - let blob_file_path = format!("{}/{}", mgr.work_dir, blob_id); let file = OpenOptions::new() .create(false) .write(false) @@ -231,7 +240,6 @@ impl FileCacheEntry { Arc::new(BlobStateMap::from(NoopChunkMap::new(true))) as Arc; (file, None, chunk_map, true, true, false) } else { - let blob_file_path = format!("{}/{}", mgr.work_dir, blob_id); let (chunk_map, is_direct_chunkmap) = Self::create_chunk_map(mgr, &blob_info, &blob_file_path)?; // Validation is supported by RAFS v5 (which has no meta_ci) or v6 with chunk digest array. @@ -266,6 +274,7 @@ impl FileCacheEntry { ); return Err(einval!(msg)); } + let load_chunk_digest = need_validation || cas_mgr.is_some(); let meta = if blob_info.meta_ci_is_valid() || blob_info.has_feature(BlobFeatures::IS_CHUNKDICT_GENERATED) { @@ -275,7 +284,7 @@ impl FileCacheEntry { Some(blob_meta_reader), Some(runtime.clone()), false, - need_validation, + load_chunk_digest, )?; Some(meta) } else { @@ -307,6 +316,16 @@ impl FileCacheEntry { (Default::default(), Default::default()) }; + let mut blob_data_file_path = String::new(); + if cas_mgr.is_some() { + blob_data_file_path = if let Some(path) = get_path_from_file(&file) { + path + } else { + warn!("can't get path from file"); + "".to_string() + } + } + trace!( "filecache entry: is_raw_data {}, direct {}, legacy_stargz {}, separate_meta {}, tarfs {}, batch {}, zran {}", mgr.cache_raw_data, @@ -322,8 +341,10 @@ impl FileCacheEntry { blob_info, cache_cipher_object, cache_cipher_context, + cas_mgr, chunk_map, file: Arc::new(file), + file_path: Arc::new(blob_data_file_path), meta, metrics: mgr.metrics.clone(), prefetch_state: Arc::new(AtomicU32::new(0)), diff --git a/storage/src/cache/fscache/mod.rs b/storage/src/cache/fscache/mod.rs index 5b2285c9b0e..6b22386b2cd 100644 --- a/storage/src/cache/fscache/mod.rs +++ b/storage/src/cache/fscache/mod.rs @@ -15,13 +15,13 @@ use tokio::runtime::Runtime; use crate::backend::BlobBackend; use crate::cache::cachedfile::{FileCacheEntry, FileCacheMeta}; +use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX; use crate::cache::state::{BlobStateMap, IndexedChunkMap, RangeMap}; use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; -use crate::cache::{BlobCache, BlobCacheMgr}; +use crate::cache::{BlobCache, BlobCacheMgr, CasMgr}; use crate::device::{BlobFeatures, BlobInfo, BlobObject}; use crate::factory::BLOB_FACTORY; - -use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX; +use crate::utils::get_path_from_file; const FSCACHE_BLOBS_CHECK_NUM: u8 = 1; @@ -240,9 +240,18 @@ impl FileCacheEntry { }; let blob_compressed_size = Self::get_blob_size(&reader, &blob_info)?; + // Turn off chunk deduplication in case of tarfs. + let cas_mgr = if is_tarfs { + warn!("chunk deduplication trun off"); + None + } else { + CasMgr::get_singleton() + }; + let need_validation = mgr.need_validation && !blob_info.is_legacy_stargz() && blob_info.has_feature(BlobFeatures::INLINED_CHUNK_DIGEST); + let load_chunk_digest = need_validation || cas_mgr.is_some(); let blob_file_path = format!("{}/{}", mgr.work_dir, blob_meta_id); let meta = if blob_info.meta_ci_is_valid() { FileCacheMeta::new( @@ -251,7 +260,7 @@ impl FileCacheEntry { Some(blob_meta_reader), None, true, - need_validation, + load_chunk_digest, )? } else { return Err(enosys!( @@ -266,13 +275,25 @@ impl FileCacheEntry { )?)); Self::restore_chunk_map(blob_info.clone(), file.clone(), &meta, &chunk_map); + let mut blob_data_file_path = String::new(); + if cas_mgr.is_some() { + blob_data_file_path = if let Some(path) = get_path_from_file(&file) { + path + } else { + warn!("can't get path from file"); + "".to_string() + } + } + Ok(FileCacheEntry { blob_id, blob_info: blob_info.clone(), cache_cipher_object: Default::default(), cache_cipher_context: Default::default(), + cas_mgr, chunk_map, file, + file_path: Arc::new(blob_data_file_path), meta: Some(meta), metrics: mgr.metrics.clone(), prefetch_state: Arc::new(AtomicU32::new(0)), diff --git a/storage/src/cache/mod.rs b/storage/src/cache/mod.rs index 7d91862b78d..6a7417992f0 100644 --- a/storage/src/cache/mod.rs +++ b/storage/src/cache/mod.rs @@ -659,6 +659,12 @@ pub(crate) trait BlobCacheMgr: Send + Sync { fn check_stat(&self); } +#[cfg(feature = "dedup")] +pub use dedup::CasMgr; + +#[cfg(not(feature = "dedup"))] +pub struct CasMgr {} + #[cfg(test)] mod tests { use crate::device::{BlobChunkFlags, BlobFeatures}; diff --git a/storage/src/utils.rs b/storage/src/utils.rs index 726ad921cf4..d5ad9f074f1 100644 --- a/storage/src/utils.rs +++ b/storage/src/utils.rs @@ -1,14 +1,9 @@ // Copyright 2020 Ant Group. All rights reserved. +// Copyright 2024 Nydus Developers. All rights reserved. // // SPDX-License-Identifier: Apache-2.0 //! Utility helpers to support the storage subsystem. -use std::alloc::{alloc, Layout}; -use std::cmp::{self, min}; -use std::io::{ErrorKind, IoSliceMut, Result}; -use std::os::unix::io::RawFd; -use std::slice::from_raw_parts_mut; - use fuse_backend_rs::abi::fuse_abi::off64_t; use fuse_backend_rs::file_buf::FileVolatileSlice; #[cfg(target_os = "macos")] @@ -18,6 +13,16 @@ use nydus_utils::{ digest::{self, RafsDigest}, round_down_4k, }; +use std::alloc::{alloc, Layout}; +use std::cmp::{self, min}; +use std::io::{ErrorKind, IoSliceMut, Result}; +use std::os::fd::{AsFd, AsRawFd}; +use std::os::unix::io::RawFd; +#[cfg(target_os = "linux")] +use std::path::PathBuf; +use std::slice::from_raw_parts_mut; +#[cfg(target_os = "macos")] +use std::{ffi::CStr, mem, os::raw::c_char}; use vm_memory::bytes::Bytes; use crate::{StorageError, StorageResult}; @@ -97,6 +102,108 @@ pub fn copyv>( Ok((copied, (dst_index, dst_offset))) } +/// The copy_file_range system call performs an in-kernel copy between file descriptors src and dst +/// without the additional cost of transferring data from the kernel to user space and back again. +/// +/// There may be additional optimizations for specific file systems. It copies up to len bytes of +/// data from file descriptor fd_in to file descriptor fd_out, overwriting any data that exists +/// within the requested range of the target file. +#[cfg(target_os = "linux")] +pub fn copy_file_range( + src: impl AsFd, + src_off: u64, + dst: impl AsFd, + dst_off: u64, + mut len: usize, +) -> Result<()> { + let mut src_off = src_off as i64; + let mut dst_off = dst_off as i64; + + while len > 0 { + let ret = nix::fcntl::copy_file_range( + src.as_fd().as_raw_fd(), + Some(&mut src_off), + dst.as_fd().as_raw_fd(), + Some(&mut dst_off), + len, + )?; + if ret == 0 { + return Err(eio!("reach end of file when copy file range")); + } + len -= ret; + } + + Ok(()) +} + +#[cfg(not(target_os = "linux"))] +pub fn copy_file_range( + src: impl AsFd, + mut src_off: u64, + dst: impl AsFd, + mut dst_off: u64, + mut len: usize, +) -> Result<()> { + let buf_size = 4096; + let mut buf = vec![0u8; buf_size]; + + while len > 0 { + let bytes_to_read = buf_size.min(len); + let read_bytes = nix::sys::uio::pread( + src.as_fd().as_raw_fd(), + &mut buf[..bytes_to_read], + src_off as libc::off_t, + )?; + + if read_bytes == 0 { + return Err(eio!("reach end of file when read in copy_file_range")); + } + + let write_bytes = nix::sys::uio::pwrite( + dst.as_fd().as_raw_fd(), + &buf[..read_bytes], + dst_off as libc::off_t, + )?; + if write_bytes == 0 { + return Err(eio!("reach end of file when write in copy_file_range")); + } + + src_off += read_bytes as u64; + dst_off += read_bytes as u64; + len -= read_bytes; + } + + Ok(()) +} + +#[cfg(target_os = "linux")] +pub fn get_path_from_file(file: &impl AsRawFd) -> Option { + let path = PathBuf::from("/proc/self/fd").join(file.as_raw_fd().to_string()); + match std::fs::read_link(&path) { + Ok(v) => Some(v.display().to_string()), + Err(e) => { + warn!("Failed to get path from file descriptor: {}", e); + None + } + } +} + +#[cfg(target_os = "macos")] +pub fn get_path_from_file(file: &impl AsRawFd) -> Option { + let fd = file.as_raw_fd(); + let mut buf: [c_char; 1024] = unsafe { mem::zeroed() }; + + let result = unsafe { fcntl(fd, libc::F_GETPATH, buf.as_mut_ptr()) }; + + if result == -1 { + warn!("Failed to get path from file descriptor"); + return None; + } + + let cstr = unsafe { CStr::from_ptr(buf.as_ptr()) }; + cstr.to_str().ok().map(|s| s.to_string()) +} + /// An memory cursor to access an `FileVolatileSlice` array. pub struct MemSliceCursor<'a> { pub mem_slice: &'a [FileVolatileSlice<'a>], @@ -239,6 +346,8 @@ pub fn check_digest(data: &[u8], digest: &RafsDigest, digester: digest::Algorith #[cfg(test)] mod tests { use super::*; + use std::io::Write; + use vmm_sys_util::tempfile::TempFile; #[test] fn test_copyv() { @@ -372,4 +481,34 @@ mod tests { assert_eq!(cursor.index, 2); assert_eq!(cursor.offset, 0); } + + #[test] + fn test_copy_file_range() { + let mut src = TempFile::new().unwrap().into_file(); + let dst = TempFile::new().unwrap(); + + let buf = vec![8u8; 4096]; + src.write_all(&buf).unwrap(); + copy_file_range(&src, 0, dst.as_file(), 4096, 4096).unwrap(); + assert_eq!(dst.as_file().metadata().unwrap().len(), 8192); + + let small_buf = vec![8u8; 2048]; + let mut small_src = TempFile::new().unwrap().into_file(); + small_src.write_all(&small_buf).unwrap(); + assert!(copy_file_range(&small_src, 0, dst.as_file(), 4096, 4096).is_err()); + + let empty_src = TempFile::new().unwrap().into_file(); + assert!(copy_file_range(&empty_src, 0, dst.as_file(), 4096, 4096).is_err()); + } + + #[test] + fn test_get_path_from_file() { + let temp_file = TempFile::new().unwrap(); + let file = temp_file.as_file(); + let path = get_path_from_file(file).unwrap(); + assert_eq!(path, temp_file.as_path().display().to_string()); + + let invalid_fd: RawFd = -1; + assert!(get_path_from_file(&invalid_fd).is_none()); + } } diff --git a/utils/src/digest.rs b/utils/src/digest.rs index 12e74486f3b..2f04ca69909 100644 --- a/utils/src/digest.rs +++ b/utils/src/digest.rs @@ -217,7 +217,7 @@ impl AsRef<[u8]> for RafsDigest { impl fmt::Display for RafsDigest { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { for c in &self.data { - write!(f, "{:02x}", c).unwrap() + write!(f, "{:02x}", c)?; } Ok(()) }