diff --git a/.gitignore b/.gitignore index 88aa9df399b1..6dcf958fa451 100644 --- a/.gitignore +++ b/.gitignore @@ -5,26 +5,37 @@ ltmain.sh *.rej .deps .dirstamp -.vscode -.idea fstests/secfs.test fstests/flock !fstests/Makefile jfs -/juicefs -/juicefs.ceph -/juicefs.exe -/juicefs.lite -dist/ *.rdb .release-env *.so libjfs.h -.DS_Store docs/node_modules cmd/cmd -*.dump -*.out .hypothesis -__pycache__ /node_modules + +# os +.DS_Store + +# ide +.vscode +.idea + +# lang +__pycache__ + +# temp +pkg/meta/badger +*.dump +*.out + +# gen +/juicefs +/juicefs.ceph +/juicefs.exe +/juicefs.lite +dist/ diff --git a/cmd/compact_test.go b/cmd/compact_test.go new file mode 100644 index 000000000000..1dac61ac599e --- /dev/null +++ b/cmd/compact_test.go @@ -0,0 +1,110 @@ +/* + * JuiceFS, Copyright 2020 Juicedata, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "os" + "path/filepath" + "strings" + "testing" +) + +func createTestFile(path string, size int) error { + file, err := os.OpenFile(path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0666) + if err != nil { + return err + } + defer file.Close() + + content := []byte(strings.Repeat("a", size/2)) + for i := 0; i < 2; i++ { + if _, err = file.Write(content); err != nil { + return err + } + if err = file.Sync(); err != nil { + return err + } + } + return nil +} + +type testDir struct { + path string + fileCnt int + fileSize int +} + +func initForCompactTest(mountDir string, dirs []testDir) { + for _, d := range dirs { + dirPath := filepath.Join(mountDir, d.path) + + err := os.MkdirAll(dirPath, 0755) + if err != nil { + panic(err) + } + + for i := 0; i < d.fileCnt; i++ { + if err := createTestFile(filepath.Join(dirPath, fmt.Sprintf("%d", i)), d.fileSize); err != nil { + panic(err) + } + } + } +} + +func TestCompact(t *testing.T) { + var bucket string + mountTemp(t, &bucket, []string{"--trash-days=0"}, nil) + defer umountTemp(t) + + dirs := []testDir{ + { + path: "d1/d11", + fileCnt: 10, + fileSize: 10, + }, + { + path: "d1", + fileCnt: 20, + fileSize: 10, + }, + { + path: "d2", + fileCnt: 5, + fileSize: 10, + }, + } + initForCompactTest(testMountPoint, dirs) + dataDir := filepath.Join(bucket, testVolume, "chunks") + + beforeCompactFileNum := getFileCount(dataDir) + // file + err := Main([]string{"", "compact", fmt.Sprintf("--path=%s", filepath.Join(testMountPoint, "d1", "1")), testMeta}) + assert.Nil(t, err) + + // dir + for _, d := range dirs { + err := Main([]string{"", "compact", fmt.Sprintf("--path=%s", filepath.Join(testMountPoint, d.path)), testMeta}) + assert.Nil(t, err) + } + + afterCompactFileNum := getFileCount(dataDir) + if beforeCompactFileNum <= afterCompactFileNum { + t.Fatalf("blocks before gc compact %d <= after %d", beforeCompactFileNum, afterCompactFileNum) + } +} diff --git a/cmd/compact_unix.go b/cmd/compact_unix.go new file mode 100644 index 000000000000..eaf2c98b244f --- /dev/null +++ b/cmd/compact_unix.go @@ -0,0 +1,199 @@ +//go:build !windows +// +build !windows + +/* + * JuiceFS, Copyright 2020 Juicedata, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "fmt" + "github.com/juicedata/juicefs/pkg/chunk" + "github.com/juicedata/juicefs/pkg/meta" + "github.com/juicedata/juicefs/pkg/utils" + "github.com/juicedata/juicefs/pkg/vfs" + "github.com/urfave/cli/v2" + "sync" + "time" +) + +func cmdCompact() *cli.Command { + return &cli.Command{ + Name: "compact", + Action: compact, + Category: "ADMIN", + Usage: "Trigger compaction of slices", + ArgsUsage: "META-URL", + Description: ` + Examples: + # compact with path + $ juicefs compact --path /mnt/jfs/foo redis://localhost + + # max depth of 5 + $ juicefs summary --path /mnt/jfs/foo --depth 5 /mnt/jfs/foo redis://localhost + `, + Flags: []cli.Flag{ + &cli.StringSliceFlag{ + Name: "path", + Required: true, + Usage: "path to be compact", + }, + &cli.UintFlag{ + Name: "depth", + Aliases: []string{"d"}, + Value: 2, + Usage: "dir depth to be scan", + }, + &cli.IntFlag{ + Name: "compact-concurrency", + Aliases: []string{"cc"}, + Value: 10, + Usage: "compact concurrency", + }, + &cli.IntFlag{ + Name: "delete-concurrency", + Aliases: []string{"dc"}, + Value: 10, + Usage: "delete concurrency", + }, + }, + } +} + +func compact(ctx *cli.Context) error { + setup(ctx, 1) + removePassword(ctx.Args().Get(0)) + + // parse flags + metaUri := ctx.Args().Get(0) + paths := ctx.StringSlice("path") + + depth := ctx.Uint("depth") + if depth > 10 { + logger.Warn("depth should be <= 10") + depth = 10 + } + + deleteConcurrency := ctx.Int("delete-concurrency") + if deleteConcurrency <= 0 { + logger.Warn("thread number should be > 0") + deleteConcurrency = 1 + } + + compactConcurrency := ctx.Int("compact-concurrency") + if compactConcurrency <= 0 { + logger.Warn("thread number should be > 0") + compactConcurrency = 1 + } + + // new meta client + metaConf := meta.DefaultConf() + metaConf.MaxDeletes = int(deleteConcurrency) + client := meta.NewClient(metaUri, metaConf) + metaFormat, err := client.Load(true) + if err != nil { + logger.Fatalf("load setting: %s", err) + } + + chunkConf := chunk.Config{ + BlockSize: metaFormat.BlockSize * 1024, + Compress: metaFormat.Compression, + GetTimeout: time.Second * 60, + PutTimeout: time.Second * 60, + MaxUpload: 20, + BufferSize: 300 << 20, + CacheDir: "memory", + } + blob, err := createStorage(*metaFormat) + if err != nil { + logger.Fatalf("object storage: %s", err) + } + logger.Infof("Data use %s", blob) + store := chunk.NewCachedStore(blob, chunkConf, nil) + + progress := utils.NewProgress(false) + + // delete slice handle + var wg sync.WaitGroup + delSpin := progress.AddCountSpinner("Cleaned pending slices") + sliceChan := make(chan meta.Slice, 10240) + client.OnMsg(meta.DeleteSlice, func(args ...interface{}) error { + delSpin.Increment() + sliceChan <- meta.Slice{Id: args[0].(uint64), Size: args[1].(uint32)} + return nil + }) + for i := 0; i < deleteConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for s := range sliceChan { + if err := store.Remove(s.Id, int(s.Size)); err != nil { + logger.Warnf("remove %d_%d: %s", s.Id, s.Size, err) + } + } + }() + } + + for i := 0; i < len(paths); i++ { + path := paths[i] + + // path to inode + inodeNo, err := utils.GetInode(path) + if err != nil { + logger.Fatal(err) + } + inode := meta.Ino(inodeNo) + + if !inode.IsValid() { + logger.Fatalf("inode numbe %d not valid", inode) + } + + logger.Debugf("compact path: %v, inode %v", path, inode) + + // do meta.Compact + bar := progress.AddCountBar(fmt.Sprintf("compacted chunks for %s", path), 0) + spin := progress.AddDoubleSpinnerTwo(fmt.Sprintf("compacted slices for %s", path), "compacted data") + client.OnMsg(meta.CompactChunk, func(args ...interface{}) error { + slices := args[0].([]meta.Slice) + err := vfs.Compact(chunkConf, store, slices, args[1].(uint64)) + for _, s := range slices { + spin.IncrInt64(int64(s.Len)) + } + return err + }) + + compactErr := client.Compact(meta.Background, inode, int(depth), compactConcurrency, + func() { + bar.IncrTotal(1) + }, + func() { + bar.Increment() + }) + if compactErr == 0 { + if progress.Quiet { + c, b := spin.Current() + logger.Infof("compacted [%s] %d chunks (%d slices, %d bytes).", path, bar.Current(), c, b) + } + } else { + logger.Errorf("compact [%s] chunks: %s", path, compactErr) + } + bar.Done() + spin.Done() + } + + progress.Done() + return nil +} diff --git a/cmd/compact_windows.go b/cmd/compact_windows.go new file mode 100644 index 000000000000..02b4ef4e3171 --- /dev/null +++ b/cmd/compact_windows.go @@ -0,0 +1,72 @@ +//go:build windows +// +build windows + +/* + * JuiceFS, Copyright 2020 Juicedata, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "github.com/urfave/cli/v2" +) + +func cmdCompact() *cli.Command { + return &cli.Command{ + Name: "compact", + Action: compact, + Category: "ADMIN", + Usage: "Trigger compaction of slices, not supported for Windows", + ArgsUsage: "META-URL", + Description: ` + Examples: + # compact with path + $ juicefs compact --path /mnt/jfs/foo redis://localhost + + # max depth of 5 + $ juicefs summary --path /mnt/jfs/foo --depth 5 /mnt/jfs/foo redis://localhost + `, + Flags: []cli.Flag{ + &cli.StringSliceFlag{ + Name: "path", + Required: true, + Usage: "path to be compact", + }, + &cli.UintFlag{ + Name: "depth", + Aliases: []string{"d"}, + Value: 2, + Usage: "dir depth to be scan", + }, + &cli.IntFlag{ + Name: "compact-concurrency", + Aliases: []string{"cc"}, + Value: 10, + Usage: "compact concurrency", + }, + &cli.IntFlag{ + Name: "delete-concurrency", + Aliases: []string{"dc"}, + Value: 10, + Usage: "delete concurrency", + }, + }, + } +} + +func compact(ctx *cli.Context) error { + logger.Warnf("not supported for Windows.") + return nil +} diff --git a/cmd/gc.go b/cmd/gc.go index 06f66de70f08..c2d001ee5b0f 100644 --- a/cmd/gc.go +++ b/cmd/gc.go @@ -104,10 +104,10 @@ func gc(ctx *cli.Context) error { // Scan all chunks first and do compaction if necessary progress := utils.NewProgress(false) // Delete pending slices while listing all slices - delete := ctx.Bool("delete") + deleteOp := ctx.Bool("delete") threads := ctx.Int("threads") - compact := ctx.Bool("compact") - if (delete || compact) && threads <= 0 { + compactOp := ctx.Bool("compact") + if (deleteOp || compactOp) && threads <= 0 { logger.Fatal("threads should be greater than 0 to delete or compact objects") } @@ -115,7 +115,7 @@ func gc(ctx *cli.Context) error { var delSpin *utils.Bar var sliceChan chan meta.Slice // pending delete slices - if delete || compact { + if deleteOp || compactOp { delSpin = progress.AddCountSpinner("Cleaned pending slices") sliceChan = make(chan meta.Slice, 10240) m.OnMsg(meta.DeleteSlice, func(args ...interface{}) error { @@ -140,7 +140,7 @@ func gc(ctx *cli.Context) error { delayedFileSpin := progress.AddDoubleSpinnerTwo("Pending deleted files", "Pending deleted data") cleanedFileSpin := progress.AddDoubleSpinnerTwo("Cleaned pending files", "Cleaned pending data") edge := time.Now().Add(-time.Duration(format.TrashDays) * 24 * time.Hour) - if delete { + if deleteOp { cleanTrashSpin := progress.AddCountSpinner("Cleaned trash") m.CleanupTrashBefore(c, edge, cleanTrashSpin.IncrBy) cleanTrashSpin.Done() @@ -155,7 +155,7 @@ func gc(ctx *cli.Context) error { nil, nil, nil, func(_ meta.Ino, size uint64, ts int64) (bool, error) { delayedFileSpin.IncrInt64(int64(size)) - if delete { + if deleteOp { cleanedFileSpin.IncrInt64(int64(size)) return true, nil } @@ -168,7 +168,7 @@ func gc(ctx *cli.Context) error { delayedFileSpin.Done() cleanedFileSpin.Done() - if compact { + if compactOp { bar := progress.AddCountBar("Compacted chunks", 0) spin := progress.AddDoubleSpinnerTwo("Compacted slices", "Compacted data") m.OnMsg(meta.CompactChunk, func(args ...interface{}) error { @@ -200,7 +200,7 @@ func gc(ctx *cli.Context) error { // List all slices in metadata engine slices := make(map[meta.Ino][]meta.Slice) - r := m.ListSlices(c, slices, delete, sliceCSpin.Increment) + r := m.ListSlices(c, slices, deleteOp, sliceCSpin.Increment) if r != 0 { logger.Fatalf("list all slices: %s", r) } @@ -213,11 +213,11 @@ func gc(ctx *cli.Context) error { func(ss []meta.Slice, ts int64) (bool, error) { for _, s := range ss { delayedSliceSpin.IncrInt64(int64(s.Size)) - if delete && ts < edge.Unix() { + if deleteOp && ts < edge.Unix() { cleanedSliceSpin.IncrInt64(int64(s.Size)) } } - if delete && ts < edge.Unix() { + if deleteOp && ts < edge.Unix() { return true, nil } return false, nil @@ -289,7 +289,7 @@ func gc(ctx *cli.Context) error { foundLeaked := func(obj object.Object) { bar.IncrTotal(1) leaked.IncrInt64(obj.Size()) - if delete { + if deleteOp { leakedObj <- obj.Key() } } @@ -361,7 +361,7 @@ func gc(ctx *cli.Context) error { } close(leakedObj) wg.Wait() - if delete || compact { + if deleteOp || compactOp { delSpin.Done() if progress.Quiet { logger.Infof("Deleted %d pending slices", delSpin.Current()) @@ -378,7 +378,7 @@ func gc(ctx *cli.Context) error { fc, fb := cleanedFileSpin.Current() logger.Infof("scanned %d objects, %d valid, %d compacted (%d bytes), %d leaked (%d bytes), %d delslices (%d bytes), %d delfiles (%d bytes), %d skipped (%d bytes)", bar.Current(), vc, cc, cb, lc, lb, dsc, dsb, fc, fb, sc, sb) - if lc > 0 && !delete { + if lc > 0 && !deleteOp { logger.Infof("Please add `--delete` to clean leaked objects") } return nil diff --git a/cmd/main.go b/cmd/main.go index 71df5111ca27..9dca71798094 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -83,6 +83,7 @@ func Main(args []string) error { cmdDebug(), cmdClone(), cmdSummary(), + cmdCompact(), }, } diff --git a/cmd/summary.go b/cmd/summary.go index 49d24dcf79a7..fb2eaebe8a64 100644 --- a/cmd/summary.go +++ b/cmd/summary.go @@ -42,11 +42,11 @@ func cmdSummary() *cli.Command { ArgsUsage: "PATH", Description: ` It is used to show tree summary of target directory. - + Examples: # Show with path $ juicefs summary /mnt/jfs/foo - + # Show max depth of 5 $ juicefs summary --depth 5 /mnt/jfs/foo @@ -67,7 +67,7 @@ func cmdSummary() *cli.Command { Name: "entries", Aliases: []string{"e"}, Value: 10, - Usage: "show top N entries (sort by size)", + Usage: "show top N entries per dir (sort by size)", }, &cli.BoolFlag{ Name: "strict", @@ -186,7 +186,7 @@ func renderTree(results *[][]string, tree *meta.TreeSummary, csv bool) { if csv { size = strconv.FormatUint(tree.Size, 10) } else { - size = humanize.IBytes(uint64(tree.Size)) + size = humanize.IBytes(tree.Size) } path := tree.Path diff --git a/go.mod b/go.mod index 2f7457c909a5..72a6182de07c 100644 --- a/go.mod +++ b/go.mod @@ -56,6 +56,7 @@ require ( github.com/redis/go-redis/v9 v9.0.2 github.com/sirupsen/logrus v1.9.0 github.com/smartystreets/goconvey v1.7.2 + github.com/stretchr/testify v1.8.4 github.com/studio-b12/gowebdav v0.0.0-20230203202212-3282f94193f2 github.com/tencentyun/cos-go-sdk-v5 v0.7.45 github.com/tikv/client-go/v2 v2.0.4 @@ -112,6 +113,7 @@ require ( github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dchest/siphash v1.2.1 // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect @@ -172,7 +174,7 @@ require ( github.com/klauspost/reedsolomon v1.9.11 // indirect github.com/kr/fs v0.1.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect - github.com/mattn/go-colorable v0.1.12 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/miekg/dns v1.1.41 // indirect @@ -198,6 +200,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7 // indirect github.com/prometheus/procfs v0.11.0 // indirect @@ -246,6 +249,7 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/square/go-jose.v2 v2.3.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect xorm.io/builder v0.3.7 // indirect ) diff --git a/go.sum b/go.sum index efe3d05b3102..0613725c12e3 100644 --- a/go.sum +++ b/go.sum @@ -693,8 +693,9 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= -github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= @@ -702,6 +703,7 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98= github.com/mattn/go-isatty v0.0.18/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= @@ -1362,6 +1364,7 @@ golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/pkg/meta/base.go b/pkg/meta/base.go index 03215e9e3d14..4273ae8248be 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -64,6 +64,7 @@ type engine interface { doInit(format *Format, force bool) error scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar) error + scanChunks(ctx Context, inode Ino, ch chan<- cchunk) error compactChunk(inode Ino, indx uint32, force bool) doDeleteSustainedInode(sid uint64, inode Ino) error doFindDeletedFiles(ts int64, limit int) (map[Ino]uint64, error) // limit < 0 means all @@ -395,24 +396,24 @@ func (m *baseMeta) doFlushDirStat() { } } -func (r *baseMeta) txLock(idx uint) { - r.txlocks[idx%nlocks].Lock() +func (m *baseMeta) txLock(idx uint) { + m.txlocks[idx%nlocks].Lock() } -func (r *baseMeta) txUnlock(idx uint) { - r.txlocks[idx%nlocks].Unlock() +func (m *baseMeta) txUnlock(idx uint) { + m.txlocks[idx%nlocks].Unlock() } -func (r *baseMeta) OnMsg(mtype uint32, cb MsgCallback) { - r.msgCallbacks.Lock() - defer r.msgCallbacks.Unlock() - r.msgCallbacks.callbacks[mtype] = cb +func (m *baseMeta) OnMsg(mtype uint32, cb MsgCallback) { + m.msgCallbacks.Lock() + defer m.msgCallbacks.Unlock() + m.msgCallbacks.callbacks[mtype] = cb } -func (r *baseMeta) newMsg(mid uint32, args ...interface{}) error { - r.msgCallbacks.Lock() - cb, ok := r.msgCallbacks.callbacks[mid] - r.msgCallbacks.Unlock() +func (m *baseMeta) newMsg(mid uint32, args ...interface{}) error { + m.msgCallbacks.Lock() + cb, ok := m.msgCallbacks.callbacks[mid] + m.msgCallbacks.Unlock() if ok { return cb(args...) } @@ -1225,7 +1226,7 @@ func clearSUGID(ctx Context, cur *Attr, set *Attr) { } } -func (r *baseMeta) Resolve(ctx Context, parent Ino, path string, inode *Ino, attr *Attr) syscall.Errno { +func (m *baseMeta) Resolve(ctx Context, parent Ino, path string, inode *Ino, attr *Attr) syscall.Errno { return syscall.ENOTSUP } @@ -2197,12 +2198,12 @@ func (m *baseMeta) CompactAll(ctx Context, threads int, bar *utils.Bar) syscall. for i := 0; i < threads; i++ { wg.Add(1) go func() { + defer wg.Done() for c := range ch { logger.Debugf("Compacting chunk %d:%d (%d slices)", c.inode, c.indx, c.slices) m.en.compactChunk(c.inode, c.indx, true) bar.Increment() } - wg.Done() }() } @@ -2216,6 +2217,44 @@ func (m *baseMeta) CompactAll(ctx Context, threads int, bar *utils.Bar) syscall. return 0 } +func (m *baseMeta) Compact(ctx Context, inode Ino, maxDepth int, compactConcurrency int, preFunc, postFunc func()) syscall.Errno { + inodes, err := m.GetFileInodes(ctx, inode, maxDepth) + if err != 0 { + return err + } + + // compact + var wg sync.WaitGroup + ch := make(chan cchunk, 10000) + for i := 0; i < compactConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for c := range ch { + logger.Debugf("Compacting chunk %d:%d (%d slices)", c.inode, c.indx, c.slices) + preFunc() + m.en.compactChunk(c.inode, c.indx, true) + postFunc() + } + }() + } + + // scan + var scanErr error + for _, ino := range inodes { + logger.Debugf("scan chunks [inode %v]", ino) + scanErr = m.en.scanChunks(ctx, ino, ch) + if scanErr != nil { + logger.Warnf("scan inode %d chunks error: %v", ino, scanErr) + break + } + } + + close(ch) + wg.Wait() + return errno(scanErr) +} + func (m *baseMeta) fileDeleted(opened, force bool, inode Ino, length uint64) { if opened { m.Lock() diff --git a/pkg/meta/base_test.go b/pkg/meta/base_test.go index 55a45700508a..ba870b570693 100644 --- a/pkg/meta/base_test.go +++ b/pkg/meta/base_test.go @@ -36,6 +36,7 @@ import ( "github.com/juicedata/juicefs/pkg/utils" "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" "xorm.io/xorm" ) @@ -148,6 +149,8 @@ func testMeta(t *testing.T, m Meta) { testClone(t, m) base.conf.ReadOnly = true testReadOnly(t, m) + testGetInodes(t, m) + testScanChunks(t, m) } func testMetaClient(t *testing.T, m Meta) { @@ -2108,6 +2111,243 @@ func testCheckAndRepair(t *testing.T, m Meta) { } } +func testGetInodes(t *testing.T, m Meta) { + mainDir := "testGetInodes" + var mainInode Ino + if st := m.Mkdir(Background, RootInode, mainDir, 0640, 022, 0, &mainInode, nil); st != 0 { + t.Fatalf("mkdir: %s", st) + } + defer m.Rmdir(Background, RootInode, mainDir) + + type dir struct { + name string + inode Ino + fileCnt int + files []Ino + subDirs []*dir + } + + testDir := &dir{ + name: "d", + subDirs: []*dir{ + { + name: "d1", + fileCnt: 1, + subDirs: []*dir{ + { + name: "d11", + fileCnt: 1, + }, + { + name: "d12", + fileCnt: 2, + }, + }, + }, + { + name: "d2", + fileCnt: 2, + }, + { + name: "d3", + fileCnt: 3, + }, + }, + } + + var buildDir func(d *dir, pIno Ino) Ino + buildDir = func(d *dir, pIno Ino) Ino { + // self inode + if st := m.Mkdir(Background, pIno, d.name, 0640, 022, 0, &d.inode, nil); st != 0 { + t.Fatalf("mkdir: %s", st) + } + + // file inode + for i := 0; i < d.fileCnt; i++ { + var fileInode Ino + if st := m.Create(Background, d.inode, fmt.Sprintf("f%d.txt", i), 0640, 022, 0, &fileInode, nil); st != 0 { + t.Fatalf("create: %s", st) + } + d.files = append(d.files, fileInode) + } + + // dir inode + for i, subDir := range d.subDirs { + d.subDirs[i].inode = buildDir(subDir, d.inode) + } + + return d.inode + } + buildDir(testDir, mainInode) + + var getInodes func(d *dir, depth int) []Ino + getInodes = func(d *dir, depth int) []Ino { + if depth == 0 { + return nil + } + var inodes []Ino + inodes = append(inodes, d.files...) + for _, subDir := range d.subDirs { + inodes = append(inodes, getInodes(subDir, depth-1)...) + } + return inodes + } + + type result struct { + inodes []Ino + err syscall.Errno + } + type testCase struct { + name string + inode Ino + depth int + want result + } + cases := []testCase{ + { + name: "case1", + inode: testDir.inode, + depth: 0, + want: result{ + inodes: getInodes(testDir, 0), + err: 0, + }, + }, + { + name: "case2", + inode: testDir.inode, + depth: 1, + want: result{ + inodes: getInodes(testDir, 1), + }, + }, + { + name: "case3", + inode: testDir.inode, + depth: 10, + want: result{ + inodes: getInodes(testDir, 10), + }, + }, + { + name: "case4", + inode: testDir.subDirs[0].inode, + depth: 10, + want: result{ + inodes: getInodes(testDir.subDirs[0], 10), + }, + }, + } + + var isEqual func([]Ino, []Ino) bool + isEqual = func(inodes1, inodes2 []Ino) bool { + if len(inodes1) != len(inodes2) { + return false + } + inoMap := make(map[Ino]bool) + for _, inode := range inodes1 { + inoMap[inode] = true + } + for _, inode := range inodes2 { + if _, ok := inoMap[inode]; !ok { + return false + } + } + return true + } + + for _, c := range cases { + inodes, err := m.getBase().GetFileInodes(Background, c.inode, c.depth) + assert.Equal(t, true, isEqual(c.want.inodes, inodes), "case: %v", c.name) + assert.Equal(t, c.want.err, err, "case: %v", c.name) + } +} + +type testSlice struct { + Slice + chunkIndex uint32 + chunkOffset uint32 +} + +func testScanChunks(t *testing.T, m Meta) { + mainDir := "testScanChunks" + var mainInode Ino + if st := m.Mkdir(Background, RootInode, mainDir, 0640, 022, 0, &mainInode, nil); st != 0 { + t.Fatalf("mkdir: %s", st) + } + defer m.Rmdir(Background, RootInode, mainDir) + + cases := [][]testSlice{ + { + { + Slice: Slice{Id: 1, Size: 10, Off: 0, Len: 10}, + chunkIndex: 0, + chunkOffset: 0, + }, + }, + { + { + Slice: Slice{Id: 2, Size: 10, Off: 0, Len: 10}, + chunkIndex: 0, + chunkOffset: 0, + }, + { + Slice: Slice{Id: 3, Size: 10, Off: 0, Len: 10}, + chunkIndex: 0, + chunkOffset: 10, + }, + { + Slice: Slice{Id: 4, Size: 20, Off: 0, Len: 20}, + chunkIndex: 0, + chunkOffset: 20, + }, + }, + } + wants := []map[uint32]int{ + {0: 1}, + {0: 3}, + } + + for i, c := range cases { + doTestScanChunks(t, m, i, mainInode, c, wants[i]) + } +} + +func doTestScanChunks(t *testing.T, m Meta, testCnt int, dirInode Ino, ss []testSlice, wantChunk map[uint32]int) { + // create file + var fileInode Ino + if st := m.Create(Background, dirInode, fmt.Sprintf("%d.txt", testCnt), 0640, 022, 0, &fileInode, nil); st != 0 { + t.Fatalf("create: %s", st) + } + time.Sleep(500 * time.Millisecond) + + for _, s := range ss { + if st := m.Write(Background, fileInode, s.chunkIndex, s.chunkOffset, s.Slice, time.Now()); st != 0 { + t.Fatalf("write end: %s", st) + } + } + + var wg sync.WaitGroup + chunkChan := make(chan cchunk) + + wg.Add(1) + go func() { + defer wg.Done() + for c := range chunkChan { + if wantChunk[c.indx] != c.slices { + t.Errorf("slice cnt [inode %v, chunk index %v] not right, expect %d, actual %d", fileInode, c.indx, wantChunk[c.indx], c.slices) + return + } + } + }() + + err := m.getBase().en.scanChunks(Background, fileInode, chunkChan) + assert.Nil(t, err) + + close(chunkChan) + wg.Wait() +} + func testDirStat(t *testing.T, m Meta) { testDir := "testDirStat" var testInode Ino diff --git a/pkg/meta/context.go b/pkg/meta/context.go index fbaa450be1c9..712d317836f5 100644 --- a/pkg/meta/context.go +++ b/pkg/meta/context.go @@ -18,15 +18,8 @@ package meta import ( "context" - "strconv" ) -type Ino uint64 - -func (i Ino) String() string { - return strconv.FormatUint(uint64(i), 10) -} - type CtxKey string type Context interface { diff --git a/pkg/meta/interface.go b/pkg/meta/interface.go index 973bb9384e9b..cae1fd8c56b7 100644 --- a/pkg/meta/interface.go +++ b/pkg/meta/interface.go @@ -22,6 +22,7 @@ import ( "io" "net/url" "os" + "strconv" "strings" "sync/atomic" "syscall" @@ -102,8 +103,24 @@ const ( const MaxName = 255 const MaxSymlink = 4096 + +type Ino uint64 + const RootInode Ino = 1 const TrashInode Ino = 0x7FFFFFFF10000000 // larger than vfs.minInternalNode + +func (i Ino) String() string { + return strconv.FormatUint(uint64(i), 10) +} + +func (i Ino) IsValid() bool { + return i >= RootInode +} + +func (i Ino) IsTrash() bool { + return i >= TrashInode +} + var TrashName = ".trash" func isTrash(ino Ino) bool { @@ -220,7 +237,7 @@ type Entry struct { } // Slice is a slice of a chunk. -// Multiple slices could be combined together as a chunk. +// Multiple slices could be combined as a chunk. type Slice struct { Id uint64 Size uint32 @@ -414,8 +431,11 @@ type Meta interface { // Setlk sets a file range lock on given file. Setlk(ctx Context, inode Ino, owner uint64, block bool, ltype uint32, start, end uint64, pid uint32) syscall.Errno - // Compact all the chunks by merge small slices together + // CompactAll Compact all the chunks by merge small slices together CompactAll(ctx Context, threads int, bar *utils.Bar) syscall.Errno + // Compact specified path + Compact(ctx Context, inode Ino, maxDepth int, compactConcurrency int, preFunc, postFunc func()) syscall.Errno + // ListSlices returns all slices used by all files. ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, showProgress func()) syscall.Errno // Remove all files and directories recursively. diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index e38cb1495a24..77a82f206a5d 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -613,12 +613,12 @@ func (m *redisMeta) detachedNodes() string { return m.prefix + "detachedNodes" } -func (r *redisMeta) delSlices() string { - return r.prefix + "delSlices" +func (m *redisMeta) delSlices() string { + return m.prefix + "delSlices" } -func (r *redisMeta) allSessions() string { - return r.prefix + "allSessions" +func (m *redisMeta) allSessions() string { + return m.prefix + "allSessions" } func (m *redisMeta) sessionInfos() string { @@ -2823,14 +2823,14 @@ func (m *redisMeta) doDeleteFileData_(inode Ino, length uint64, tracking string) _ = m.rdb.ZRem(ctx, m.delfiles(), tracking) } -func (r *redisMeta) doCleanupDelayedSlices(edge int64) (int, error) { +func (m *redisMeta) doCleanupDelayedSlices(edge int64) (int, error) { ctx := Background start := time.Now() stop := fmt.Errorf("reach limit") var count int var ss []Slice var rs []*redis.IntCmd - err := r.hscan(ctx, r.delSlices(), func(keys []string) error { + err := m.hscan(ctx, m.delSlices(), func(keys []string) error { for i := 0; i < len(keys); i += 2 { key := keys[i] ps := strings.Split(key, "_") @@ -2845,34 +2845,34 @@ func (r *redisMeta) doCleanupDelayedSlices(edge int64) (int, error) { continue } - if err := r.txn(ctx, func(tx *redis.Tx) error { + if err := m.txn(ctx, func(tx *redis.Tx) error { ss, rs = ss[:0], rs[:0] - val, e := tx.HGet(ctx, r.delSlices(), key).Result() + val, e := tx.HGet(ctx, m.delSlices(), key).Result() if e == redis.Nil { return nil } else if e != nil { return e } buf := []byte(val) - r.decodeDelayedSlices(buf, &ss) + m.decodeDelayedSlices(buf, &ss) if len(ss) == 0 { return fmt.Errorf("invalid value for delSlices %s: %v", key, buf) } _, e = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { for _, s := range ss { - rs = append(rs, pipe.HIncrBy(ctx, r.sliceRefs(), r.sliceKey(s.Id, s.Size), -1)) + rs = append(rs, pipe.HIncrBy(ctx, m.sliceRefs(), m.sliceKey(s.Id, s.Size), -1)) } - pipe.HDel(ctx, r.delSlices(), key) + pipe.HDel(ctx, m.delSlices(), key) return nil }) return e - }, r.delSlices()); err != nil { + }, m.delSlices()); err != nil { logger.Warnf("Cleanup delSlices %s: %s", key, err) continue } for i, s := range ss { if rs[i].Err() == nil && rs[i].Val() < 0 { - r.deleteSlice(s.Id, s.Size) + m.deleteSlice(s.Id, s.Size) count++ } } @@ -3054,6 +3054,36 @@ func (m *redisMeta) scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar) }) } +func (m *redisMeta) scanChunks(ctx Context, inode Ino, ch chan<- cchunk) error { + p := m.rdb.Pipeline() + return m.scan(ctx, fmt.Sprintf("c%d_*", inode), func(keys []string) error { + for _, key := range keys { + _ = p.LLen(ctx, key) + } + cmds, err := p.Exec(ctx) + if err != nil { + for _, c := range cmds { + if c.Err() != nil { + logger.Warnf("Scan chunks with command %s: %s", c.String(), c.Err()) + } + } + return err + } + keyFormat := m.prefix + "c" + inode.String() + "_%d" + for i, cmd := range cmds { + cnt := cmd.(*redis.IntCmd).Val() + if cnt > 1 { + var index uint32 + n, err := fmt.Sscanf(keys[i], keyFormat, &index) + if err == nil && n == 1 { + ch <- cchunk{inode, index, int(cnt)} + } + } + } + return nil + }) +} + func (m *redisMeta) cleanupLeakedInodes(delete bool) { var ctx = Background var foundInodes = make(map[Ino]struct{}) diff --git a/pkg/meta/redis_lock.go b/pkg/meta/redis_lock.go index 897964a44200..595a8572afd4 100644 --- a/pkg/meta/redis_lock.go +++ b/pkg/meta/redis_lock.go @@ -29,11 +29,11 @@ import ( "github.com/redis/go-redis/v9" ) -func (r *redisMeta) Flock(ctx Context, inode Ino, owner uint64, ltype uint32, block bool) syscall.Errno { - ikey := r.flockKey(inode) - lkey := r.ownerKey(owner) +func (m *redisMeta) Flock(ctx Context, inode Ino, owner uint64, ltype uint32, block bool) syscall.Errno { + ikey := m.flockKey(inode) + lkey := m.ownerKey(owner) if ltype == F_UNLCK { - return errno(r.txn(ctx, func(tx *redis.Tx) error { + return errno(m.txn(ctx, func(tx *redis.Tx) error { lkeys, err := tx.HKeys(ctx, ikey).Result() if err != nil { return err @@ -41,7 +41,7 @@ func (r *redisMeta) Flock(ctx Context, inode Ino, owner uint64, ltype uint32, bl _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { pipe.HDel(ctx, ikey, lkey) if len(lkeys) == 1 && lkeys[0] == lkey { - pipe.SRem(ctx, r.lockedKey(r.sid), ikey) + pipe.SRem(ctx, m.lockedKey(m.sid), ikey) } return nil }) @@ -50,7 +50,7 @@ func (r *redisMeta) Flock(ctx Context, inode Ino, owner uint64, ltype uint32, bl } var err error for { - err = r.txn(ctx, func(tx *redis.Tx) error { + err = m.txn(ctx, func(tx *redis.Tx) error { owners, err := tx.HGetAll(ctx, ikey).Result() if err != nil { return err @@ -73,7 +73,7 @@ func (r *redisMeta) Flock(ctx Context, inode Ino, owner uint64, ltype uint32, bl } _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { pipe.HSet(ctx, ikey, lkey, "W") - pipe.SAdd(ctx, r.lockedKey(r.sid), ikey) + pipe.SAdd(ctx, m.lockedKey(m.sid), ikey) return nil }) return err @@ -94,15 +94,15 @@ func (r *redisMeta) Flock(ctx Context, inode Ino, owner uint64, ltype uint32, bl return errno(err) } -func (r *redisMeta) Getlk(ctx Context, inode Ino, owner uint64, ltype *uint32, start, end *uint64, pid *uint32) syscall.Errno { +func (m *redisMeta) Getlk(ctx Context, inode Ino, owner uint64, ltype *uint32, start, end *uint64, pid *uint32) syscall.Errno { if *ltype == F_UNLCK { *start = 0 *end = 0 *pid = 0 return 0 } - lkey := r.ownerKey(owner) - owners, err := r.rdb.HGetAll(ctx, r.plockKey(inode)).Result() + lkey := m.ownerKey(owner) + owners, err := m.rdb.HGetAll(ctx, m.plockKey(inode)).Result() if err != nil { return errno(err) } @@ -116,7 +116,7 @@ func (r *redisMeta) Getlk(ctx Context, inode Ino, owner uint64, ltype *uint32, s *start = l.Start *end = l.End sid, _ := strconv.Atoi(strings.Split(k, "_")[0]) - if uint64(sid) == r.sid { + if uint64(sid) == m.sid { *pid = l.Pid } else { *pid = 0 @@ -132,13 +132,13 @@ func (r *redisMeta) Getlk(ctx Context, inode Ino, owner uint64, ltype *uint32, s return 0 } -func (r *redisMeta) Setlk(ctx Context, inode Ino, owner uint64, block bool, ltype uint32, start, end uint64, pid uint32) syscall.Errno { - ikey := r.plockKey(inode) - lkey := r.ownerKey(owner) +func (m *redisMeta) Setlk(ctx Context, inode Ino, owner uint64, block bool, ltype uint32, start, end uint64, pid uint32) syscall.Errno { + ikey := m.plockKey(inode) + lkey := m.ownerKey(owner) var err error lock := plockRecord{ltype, pid, start, end} for { - err = r.txn(ctx, func(tx *redis.Tx) error { + err = m.txn(ctx, func(tx *redis.Tx) error { if ltype == F_UNLCK { d, err := tx.HGet(ctx, ikey, lkey).Result() if err != nil && err != redis.Nil { @@ -160,7 +160,7 @@ func (r *redisMeta) Setlk(ctx Context, inode Ino, owner uint64, block bool, ltyp if len(ls) == 0 { pipe.HDel(ctx, ikey, lkey) if len(lkeys) == 1 && lkeys[0] == lkey { - pipe.SRem(ctx, r.lockedKey(r.sid), ikey) + pipe.SRem(ctx, m.lockedKey(m.sid), ikey) } } else { pipe.HSet(ctx, ikey, lkey, dumpLocks(ls)) @@ -187,7 +187,7 @@ func (r *redisMeta) Setlk(ctx Context, inode Ino, owner uint64, block bool, ltyp ls = updateLocks(ls, lock) _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { pipe.HSet(ctx, ikey, lkey, dumpLocks(ls)) - pipe.SAdd(ctx, r.lockedKey(r.sid), ikey) + pipe.SAdd(ctx, m.lockedKey(m.sid), ikey) return nil }) return err @@ -208,11 +208,11 @@ func (r *redisMeta) Setlk(ctx Context, inode Ino, owner uint64, block bool, ltyp return errno(err) } -func (r *redisMeta) ListLocks(ctx context.Context, inode Ino) ([]PLockItem, []FLockItem, error) { - fKey := r.flockKey(inode) - pKey := r.plockKey(inode) +func (m *redisMeta) ListLocks(ctx context.Context, inode Ino) ([]PLockItem, []FLockItem, error) { + fKey := m.flockKey(inode) + pKey := m.plockKey(inode) - rawFLocks, err := r.rdb.HGetAll(ctx, fKey).Result() + rawFLocks, err := m.rdb.HGetAll(ctx, fKey).Result() if err != nil { return nil, nil, err } @@ -225,7 +225,7 @@ func (r *redisMeta) ListLocks(ctx context.Context, inode Ino) ([]PLockItem, []FL flocks = append(flocks, FLockItem{*owner, v}) } - rawPLocks, err := r.rdb.HGetAll(ctx, pKey).Result() + rawPLocks, err := m.rdb.HGetAll(ctx, pKey).Result() if err != nil { return nil, nil, err } diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index bbddf0965b26..037a3622b490 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -2914,6 +2914,22 @@ func (m *dbMeta) scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar) er }) } +func (m *dbMeta) scanChunks(ctx Context, inode Ino, ch chan<- cchunk) error { + return m.roTxn(func(s *xorm.Session) error { + var cs []chunk + err := s.Table(&chunk{}).Where("inode = ?", inode).Find(&cs) + if err != nil { + return err + } + for _, c := range cs { + if len(c.Slices) > sliceBytes { + ch <- cchunk{c.Inode, c.Indx, len(c.Slices) / sliceBytes} + } + } + return nil + }) +} + func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, showProgress func()) syscall.Errno { if delete { m.doCleanupSlices() diff --git a/pkg/meta/sql_lock.go b/pkg/meta/sql_lock.go index 3dcd356aa3f8..ee00d3869dd2 100644 --- a/pkg/meta/sql_lock.go +++ b/pkg/meta/sql_lock.go @@ -249,9 +249,9 @@ func (m *dbMeta) Setlk(ctx Context, inode Ino, owner_ uint64, block bool, ltype return err } -func (r *dbMeta) ListLocks(ctx context.Context, inode Ino) ([]PLockItem, []FLockItem, error) { +func (m *dbMeta) ListLocks(ctx context.Context, inode Ino) ([]PLockItem, []FLockItem, error) { var fs []flock - if err := r.db.Find(&fs, &flock{Inode: inode}); err != nil { + if err := m.db.Find(&fs, &flock{Inode: inode}); err != nil { return nil, nil, err } @@ -261,7 +261,7 @@ func (r *dbMeta) ListLocks(ctx context.Context, inode Ino) ([]PLockItem, []FLock } var ps []plock - if err := r.db.Find(&ps, &plock{Inode: inode}); err != nil { + if err := m.db.Find(&ps, &plock{Inode: inode}); err != nil { return nil, nil, err } diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 38a9c0748946..fa9b13defc09 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -2286,13 +2286,13 @@ func (m *kvMeta) deleteChunk(inode Ino, indx uint32) error { return nil } -func (r *kvMeta) cleanupZeroRef(id uint64, size uint32) { - _ = r.txn(func(tx *kvTxn) error { - v := tx.incrBy(r.sliceKey(id, size), 0) +func (m *kvMeta) cleanupZeroRef(id uint64, size uint32) { + _ = m.txn(func(tx *kvTxn) error { + v := tx.incrBy(m.sliceKey(id, size), 0) if v != 0 { return syscall.EINVAL } - tx.delete(r.sliceKey(id, size)) + tx.delete(m.sliceKey(id, size)) return nil }) } @@ -2517,6 +2517,19 @@ func (m *kvMeta) scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar) er }) } +func (m *kvMeta) scanChunks(ctx Context, inode Ino, ch chan<- cchunk) error { + klen := 1 + 8 + 1 + 4 + return m.client.scan(m.fmtKey("A", inode), func(k, v []byte) { + if len(k) == klen && k[1+8] == 'C' && len(v) > sliceBytes { + ch <- cchunk{ + inode: m.decodeInode(k[1:9]), + indx: binary.BigEndian.Uint32(k[10:]), + slices: len(v) / sliceBytes, + } + } + }) +} + func (m *kvMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, showProgress func()) syscall.Errno { if delete { m.doCleanupSlices() diff --git a/pkg/meta/utils.go b/pkg/meta/utils.go index 30a0e9c20253..56bc1f72da4e 100644 --- a/pkg/meta/utils.go +++ b/pkg/meta/utils.go @@ -17,6 +17,7 @@ package meta import ( + "context" "fmt" "net/url" "path" @@ -454,6 +455,70 @@ func (m *baseMeta) getDirSummary(ctx Context, inode Ino, summary *Summary, recur return err } +// GetFileInodes get all file inodes recursively +func (m *baseMeta) GetFileInodes(ctx Context, root Ino, maxDepth int) ([]Ino, syscall.Errno) { + var attr Attr + if st := m.GetAttr(ctx, root, &attr); st != 0 { + return nil, st + } + + if attr.Typ != TypeDirectory { + return []Ino{root}, 0 + } + + var inodes []Ino + inodeMu := sync.Mutex{} + concurrent := make(chan struct{}, 50) + + newCtx, cancel := context.WithCancelCause(ctx) + defer cancel(nil) + + var dfs func(inode Ino, depth int) + dfs = func(inode Ino, depth int) { + if depth <= 0 { + return + } + + var entries []*Entry + if err := m.en.doReaddir(ctx, inode, 1, &entries, -1); err != 0 { + logger.Errorf("doReaddir inode %d error: %v", inode, err) + cancel(err) + return + } + + var wg sync.WaitGroup + for _, e := range entries { + currInode := e.Inode + if e.Attr.Typ != TypeDirectory { + inodeMu.Lock() + inodes = append(inodes, currInode) + inodeMu.Unlock() + continue + } + + select { + case <-newCtx.Done(): + return + case concurrent <- struct{}{}: + wg.Add(1) + go func() { + defer wg.Done() + dfs(currInode, depth-1) + <-concurrent + }() + } + } + + wg.Wait() + } + + dfs(root, maxDepth) + if newCtx.Err() != nil { + return nil, errno(newCtx.Err()) + } + return inodes, 0 +} + func (m *baseMeta) GetTreeSummary(ctx Context, root *TreeSummary, depth, topN uint8, strict bool, updateProgress func(count uint64, bytes uint64)) syscall.Errno { var attr Attr diff --git a/pkg/object/interface.go b/pkg/object/interface.go index 4e6b2f0393ee..3b33376ee971 100644 --- a/pkg/object/interface.go +++ b/pkg/object/interface.go @@ -74,7 +74,7 @@ type Limits struct { // ObjectStorage is the interface for object storage. // all of these API should be idempotent. type ObjectStorage interface { - // Description of the object storage. + // String Description of the object storage. String() string // Limits of the object storage. Limits() Limits diff --git a/pkg/utils/file.go b/pkg/utils/file.go new file mode 100644 index 000000000000..03c1c5f11c82 --- /dev/null +++ b/pkg/utils/file.go @@ -0,0 +1,35 @@ +/* + * JuiceFS, Copyright 2020 Juicedata, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +import ( + "fmt" + "path/filepath" +) + +func GetInode(path string) (uint64, error) { + dpath, err := filepath.Abs(path) + if err != nil { + return 0, fmt.Errorf("abs of %s error: %w", path, err) + } + inode, err := GetFileInode(dpath) + if err != nil { + return 0, fmt.Errorf("lookup inode for %s error: %w", path, err) + } + + return inode, nil +}