From e49a9c6ab00c4a87e4103557b7084a7d358d5c1e Mon Sep 17 00:00:00 2001 From: zhijian Date: Wed, 17 Nov 2021 16:17:41 +0800 Subject: [PATCH 1/5] speed up for sql --- pkg/meta/load_dump_test.go | 18 +++ pkg/meta/sql.go | 265 ++++++++++++++++++++++++++++++++----- 2 files changed, 249 insertions(+), 34 deletions(-) diff --git a/pkg/meta/load_dump_test.go b/pkg/meta/load_dump_test.go index 6b3b2fd4da80..cd73a64af1d5 100644 --- a/pkg/meta/load_dump_test.go +++ b/pkg/meta/load_dump_test.go @@ -113,6 +113,24 @@ func TestLoadDump(t *testing.T) { t.Fatalf("dump meta: %s", err) } }) + + t.Run("Metadata Engine: SQLite --SubDir d1", func(t *testing.T) { + os.Remove("test10.db") + m := testLoad(t, "sqlite3://test10.db", sampleFile) + fp, err := os.OpenFile("sqlite3_subdir.dump", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + t.Fatalf("open file: %s", "sqlite3_subdir.dump") + } + defer fp.Close() + switch r := m.(type) { + case *dbMeta: + r.root = 3 + } + if err = m.DumpMeta(fp); err != nil { + t.Fatalf("dump meta: %s", err) + } + }) + t.Run("Metadata Engine: TKV", func(t *testing.T) { os.Remove(settingPath) m := testLoad(t, "memkv://test/jfs", sampleFile) diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index ab2507e536c5..2f4495e54a67 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -18,6 +18,7 @@ package meta import ( + "bufio" "bytes" "database/sql" "encoding/json" @@ -150,6 +151,15 @@ type dbMeta struct { freeMu sync.Mutex freeInodes freeID freeChunks freeID + + snap *dbSnap +} +type dbSnap struct { + node map[Ino]*node + symlink map[Ino]*symlink + xattr map[Ino][]*xattr + edges map[Ino][]*edge + chunk map[string]*chunk } func newSQLMeta(driver, addr string, conf *Config) (Meta, error) { @@ -2637,35 +2647,170 @@ func (m *dbMeta) dumpEntry(inode Ino) (*DumpedEntry, error) { return nil }) } +func (m *dbMeta) dumpEntryFast(inode Ino) (*DumpedEntry, error) { + e := &DumpedEntry{} + n, ok := m.snap.node[inode] + if !ok { + return nil, fmt.Errorf("inode %d not found", inode) + } + attr := &Attr{} + m.parseAttr(n, attr) + e.Attr = dumpAttr(attr) + e.Attr.Inode = inode -func (m *dbMeta) dumpDir(inode Ino, showProgress func(totalIncr, currentIncr int64)) (map[string]*DumpedEntry, error) { - var edges []edge - if err := m.engine.Find(&edges, &edge{Parent: inode}); err != nil { - return nil, err + rows, ok := m.snap.xattr[inode] + if ok && len(rows) > 0 { + xattrs := make([]*DumpedXattr, 0, len(rows)) + for _, x := range rows { + xattrs = append(xattrs, &DumpedXattr{x.Name, string(x.Value)}) + } + sort.Slice(xattrs, func(i, j int) bool { return xattrs[i].Name < xattrs[j].Name }) + e.Xattrs = xattrs } + + if attr.Typ == TypeFile { + for indx := uint32(0); uint64(indx)*ChunkSize < attr.Length; indx++ { + c, ok := m.snap.chunk[fmt.Sprintf("%d-%d", inode, indx)] + if !ok { + return nil, fmt.Errorf("no found chunk target for inode %d indx %d", inode, indx) + } + ss := readSliceBuf(c.Slices) + slices := make([]*DumpedSlice, 0, len(ss)) + for _, s := range ss { + slices = append(slices, &DumpedSlice{Chunkid: s.chunkid, Pos: s.pos, Size: s.size, Off: s.off, Len: s.len}) + } + e.Chunks = append(e.Chunks, &DumpedChunk{indx, slices}) + } + } else if attr.Typ == TypeSymlink { + l, ok := m.snap.symlink[inode] + if !ok { + return nil, fmt.Errorf("no link target for inode %d", inode) + } + e.Symlink = l.Target + } + return e, nil +} + +func (m *dbMeta) dumpDir(inode Ino, tree *DumpedEntry, bw *bufio.Writer, depth int, showProgress func(totalIncr, currentIncr int64)) error { + bwWrite := func(s string) { + if _, err := bw.WriteString(s); err != nil { + panic(err) + } + } + var edges []*edge + var err error + var ok bool + if m.root == 1 { + edges, ok = m.snap.edges[inode] + if !ok { + return fmt.Errorf("no edge target for inode %d", inode) + } + } else { + if err := m.engine.Find(&edges, &edge{Parent: inode}); err != nil { + return err + } + } + if showProgress != nil { showProgress(int64(len(edges)), 0) } - entries := make(map[string]*DumpedEntry) - for _, e := range edges { - entry, err := m.dumpEntry(e.Inode) + if err := tree.writeJsonWithOutEntry(bw, depth); err != nil { + return err + } + sort.Slice(edges, func(i, j int) bool { return edges[i].Name < edges[j].Name }) + + for idx, e := range edges { + var entry *DumpedEntry + if m.root == 1 { + entry, err = m.dumpEntryFast(e.Inode) + } else { + entry, err = m.dumpEntry(e.Inode) + } if err != nil { - return nil, err + return err } + entry.Name = e.Name if e.Type == TypeDirectory { - if entry.Entries, err = m.dumpDir(e.Inode, showProgress); err != nil { - return nil, err - } + err = m.dumpDir(e.Inode, entry, bw, depth+2, showProgress) + } else { + err = entry.writeJSON(bw, depth+2) + } + if err != nil { + return err + } + if idx != len(edges)-1 { + bwWrite(",") } - entries[e.Name] = entry if showProgress != nil { showProgress(0, 1) } } - return entries, nil + bwWrite(fmt.Sprintf("\n%s}\n%s}", strings.Repeat(jsonIndent, depth+1), strings.Repeat(jsonIndent, depth))) + return nil } -func (m *dbMeta) DumpMeta(w io.Writer) error { +func (m *dbMeta) makeSnap() error { + m.snap = &dbSnap{ + node: make(map[Ino]*node), + symlink: make(map[Ino]*symlink), + xattr: make(map[Ino][]*xattr), + edges: make(map[Ino][]*edge), + chunk: make(map[string]*chunk), + } + + bufferSize := 10000 + + if err := m.engine.BufferSize(bufferSize).Iterate(new(node), func(idx int, bean interface{}) error { + n := bean.(*node) + m.snap.node[n.Inode] = n + return nil + }); err != nil { + return err + } + + if err := m.engine.BufferSize(bufferSize).Iterate(new(symlink), func(idx int, bean interface{}) error { + s := bean.(*symlink) + m.snap.symlink[s.Inode] = s + return nil + }); err != nil { + return err + } + if err := m.engine.BufferSize(bufferSize).Iterate(new(edge), func(idx int, bean interface{}) error { + e := bean.(*edge) + m.snap.edges[e.Parent] = append(m.snap.edges[e.Parent], e) + return nil + }); err != nil { + return err + } + + if err := m.engine.BufferSize(bufferSize).Iterate(new(xattr), func(idx int, bean interface{}) error { + x := bean.(*xattr) + m.snap.xattr[x.Inode] = append(m.snap.xattr[x.Inode], x) + return nil + }); err != nil { + return err + } + + if err := m.engine.BufferSize(bufferSize).Iterate(new(chunk), func(idx int, bean interface{}) error { + c := bean.(*chunk) + m.snap.chunk[fmt.Sprintf("%d-%d", c.Inode, c.Indx)] = c + return nil + }); err != nil { + return err + } + return nil +} + +func (m *dbMeta) DumpMeta(w io.Writer) (err error) { + defer func() { + if p := recover(); p != nil { + if e, ok := p.(error); ok { + err = e + } else { + err = fmt.Errorf("DumpMeta error: %v", p) + } + } + }() var drows []delfile if err := m.engine.Find(&drows); err != nil { return err @@ -2675,32 +2820,24 @@ func (m *dbMeta) DumpMeta(w io.Writer) error { dels = append(dels, &DumpedDelFile{row.Inode, row.Length, row.Expire}) } - tree, err := m.dumpEntry(m.root) - if err != nil { - return err + var tree *DumpedEntry + if m.root == 1 { + if err = m.makeSnap(); err != nil { + return fmt.Errorf("Fetch all metadata from DB: %s", err) + } + tree, err = m.dumpEntryFast(m.root) + } else { + tree, err = m.dumpEntry(m.root) } - - var total int64 = 1 // root - progress, bar := utils.NewDynProgressBar("Dump dir progress: ", false) - bar.Increment() - if tree.Entries, err = m.dumpDir(m.root, func(totalIncr, currentIncr int64) { - total += totalIncr - bar.SetTotal(total, false) - bar.IncrInt64(currentIncr) - }); err != nil { + if err != nil { return err } - if bar.Current() != total { - logger.Warnf("Dumped %d / total %d, some entries are not dumped", bar.Current(), total) - } - bar.SetTotal(0, true) - progress.Wait() + tree.Name = "FSTree" format, err := m.Load() if err != nil { return err } - var crows []counter if err = m.engine.Find(&crows); err != nil { return err @@ -2741,7 +2878,34 @@ func (m *dbMeta) DumpMeta(w io.Writer) error { dels, tree, } - return dm.writeJSON(w) + + bw, err := dm.writeJsonWithOutTree(w) + if err != nil { + return err + } + + var total int64 = 1 // root + progress, bar := utils.NewDynProgressBar("Dump dir progress: ", false) + bar.Increment() + + if err = m.dumpDir(m.root, tree, bw, 1, func(totalIncr, currentIncr int64) { + total += totalIncr + bar.SetTotal(total, false) + bar.IncrInt64(currentIncr) + }); err != nil { + return err + } + + if _, err = bw.WriteString("\n}\n"); err != nil { + return err + } + if bar.Current() != total { + logger.Warnf("Dumped %d / total %d, some entries are not dumped", bar.Current(), total) + } + bar.SetTotal(0, true) + progress.Wait() + + return bw.Flush() } func (m *dbMeta) loadEntry(e *DumpedEntry, cs *DumpedCounters, refs map[uint64]*chunkRef) error { @@ -2772,11 +2936,13 @@ func (m *dbMeta) loadEntry(e *DumpedEntry, cs *DumpedCounters, refs map[uint64]* slices := make([]byte, 0, sliceBytes*len(c.Slices)) for _, s := range c.Slices { slices = append(slices, marshalSlice(s.Pos, s.Chunkid, s.Size, s.Off, s.Len)...) + m.Lock() if refs[s.Chunkid] == nil { refs[s.Chunkid] = &chunkRef{s.Chunkid, s.Size, 1} } else { refs[s.Chunkid].Refs++ } + m.Unlock() if cs.NextChunk <= int64(s.Chunkid) { cs.NextChunk = int64(s.Chunkid) + 1 } @@ -2882,11 +3048,42 @@ func (m *dbMeta) LoadMeta(r io.Reader) error { NextSession: 1, } refs := make(map[uint64]*chunkRef) + + maxNum := 100 + pool := make(chan struct{}, maxNum) + errCh := make(chan error, 100) + done := make(chan struct{}, 1) + var wg sync.WaitGroup for _, entry := range entries { - if err = m.loadEntry(entry, counters, refs); err != nil { + select { + case err = <-errCh: return err + default: } + pool <- struct{}{} + wg.Add(1) + go func(entry *DumpedEntry) { + defer func() { + wg.Done() + <-pool + }() + if err = m.loadEntry(entry, counters, refs); err != nil { + errCh <- err + } + }(entry) + } + + go func() { + wg.Wait() + close(done) + }() + + select { + case err = <-errCh: + return err + case <-done: } + logger.Infof("Dumped counters: %+v", *dm.Counters) logger.Infof("Loaded counters: %+v", *counters) From 069606145076844cf33254e7962b5cde5df0d698 Mon Sep 17 00:00:00 2001 From: zhijian Date: Wed, 17 Nov 2021 16:24:52 +0800 Subject: [PATCH 2/5] Remove unused functions --- pkg/meta/dump.go | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/pkg/meta/dump.go b/pkg/meta/dump.go index 938c48852a0e..db226b223bb4 100644 --- a/pkg/meta/dump.go +++ b/pkg/meta/dump.go @@ -211,27 +211,6 @@ type DumpedMeta struct { FSTree *DumpedEntry `json:",omitempty"` } -func (dm *DumpedMeta) writeJSON(w io.Writer) error { - tree := dm.FSTree - dm.FSTree = nil - data, err := json.MarshalIndent(dm, "", jsonIndent) - if err != nil { - return err - } - bw := bufio.NewWriterSize(w, jsonWriteSize) - if _, err = bw.Write(append(data[:len(data)-2], ',')); err != nil { // delete \n} - return err - } - tree.Name = "FSTree" - if err = tree.writeJSON(bw, 1); err != nil { - return err - } - if _, err = bw.WriteString("\n}\n"); err != nil { - return err - } - return bw.Flush() -} - func (dm *DumpedMeta) writeJsonWithOutTree(w io.Writer) (*bufio.Writer, error) { dm.FSTree = nil data, err := json.MarshalIndent(dm, "", jsonIndent) From 7a960d40ad2ffb9a8a9637ff01de3bd6fed14992 Mon Sep 17 00:00:00 2001 From: zhijian Date: Thu, 18 Nov 2021 10:03:13 +0800 Subject: [PATCH 3/5] 1. Remove unused code 2. Skip the case where entry is equal to nil --- pkg/meta/dump.go | 28 ---------------------------- pkg/meta/redis.go | 4 ++++ pkg/meta/sql.go | 45 +++++++++++++++++++++++++++++++-------------- 3 files changed, 35 insertions(+), 42 deletions(-) diff --git a/pkg/meta/dump.go b/pkg/meta/dump.go index db226b223bb4..8391576ad7a6 100644 --- a/pkg/meta/dump.go +++ b/pkg/meta/dump.go @@ -171,34 +171,6 @@ func (de *DumpedEntry) writeJsonWithOutEntry(bw *bufio.Writer, depth int) error return err } write(fmt.Sprintf("\n%s\"attr\": %s", fieldPrefix, data)) - if len(de.Symlink) > 0 { - write(fmt.Sprintf(",\n%s\"symlink\": \"%s\"", fieldPrefix, de.Symlink)) - } - if len(de.Xattrs) > 0 { - if data, err = json.Marshal(de.Xattrs); err != nil { - return err - } - write(fmt.Sprintf(",\n%s\"xattrs\": %s", fieldPrefix, data)) - } - if len(de.Chunks) == 1 { - if data, err = json.Marshal(de.Chunks); err != nil { - return err - } - write(fmt.Sprintf(",\n%s\"chunks\": %s", fieldPrefix, data)) - } else if len(de.Chunks) > 1 { - chunkPrefix := fieldPrefix + jsonIndent - write(fmt.Sprintf(",\n%s\"chunks\": [", fieldPrefix)) - for i, c := range de.Chunks { - if data, err = json.Marshal(c); err != nil { - return err - } - write(fmt.Sprintf("\n%s%s", chunkPrefix, data)) - if i != len(de.Chunks)-1 { - write(",") - } - } - write(fmt.Sprintf("\n%s]", fieldPrefix)) - } write(fmt.Sprintf(",\n%s\"entries\": {", fieldPrefix)) return nil } diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index 4102f75cbebf..f6684580c0ff 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -2929,6 +2929,10 @@ func (m *redisMeta) dumpDir(inode Ino, tree *DumpedEntry, bw *bufio.Writer, dept return err } } + if entry == nil { + continue + } + entry.Name = name if typ == TypeDirectory { err = m.dumpDir(inode, entry, bw, depth+2, showProgress) diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 2f4495e54a67..5bd6752e4088 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -2599,7 +2599,8 @@ func (m *dbMeta) dumpEntry(inode Ino) (*DumpedEntry, error) { return err } if !ok { - return fmt.Errorf("inode %d not found", inode) + logger.Warnf("The entry of the inode was not found. inode: %v", inode) + return nil } attr := &Attr{} m.parseAttr(n, attr) @@ -2622,9 +2623,13 @@ func (m *dbMeta) dumpEntry(inode Ino) (*DumpedEntry, error) { if attr.Typ == TypeFile { for indx := uint32(0); uint64(indx)*ChunkSize < attr.Length; indx++ { c := &chunk{Inode: inode, Indx: indx} - if _, err = m.engine.Get(c); err != nil { + if ok, err = m.engine.Get(c); err != nil { return err } + if !ok { + logger.Warnf("no found chunk target for inode %d indx %d", inode, indx) + return nil + } ss := readSliceBuf(c.Slices) slices := make([]*DumpedSlice, 0, len(ss)) for _, s := range ss { @@ -2639,7 +2644,8 @@ func (m *dbMeta) dumpEntry(inode Ino) (*DumpedEntry, error) { return err } if !ok { - return fmt.Errorf("no link target for inode %d", inode) + logger.Warnf("no link target for inode %d", inode) + return nil } e.Symlink = l.Target } @@ -2647,11 +2653,12 @@ func (m *dbMeta) dumpEntry(inode Ino) (*DumpedEntry, error) { return nil }) } -func (m *dbMeta) dumpEntryFast(inode Ino) (*DumpedEntry, error) { +func (m *dbMeta) dumpEntryFast(inode Ino) *DumpedEntry { e := &DumpedEntry{} n, ok := m.snap.node[inode] if !ok { - return nil, fmt.Errorf("inode %d not found", inode) + logger.Warnf("The entry of the inode was not found. inode: %v", inode) + return nil } attr := &Attr{} m.parseAttr(n, attr) @@ -2672,7 +2679,8 @@ func (m *dbMeta) dumpEntryFast(inode Ino) (*DumpedEntry, error) { for indx := uint32(0); uint64(indx)*ChunkSize < attr.Length; indx++ { c, ok := m.snap.chunk[fmt.Sprintf("%d-%d", inode, indx)] if !ok { - return nil, fmt.Errorf("no found chunk target for inode %d indx %d", inode, indx) + logger.Warnf("no found chunk target for inode %d indx %d", inode, indx) + return nil } ss := readSliceBuf(c.Slices) slices := make([]*DumpedSlice, 0, len(ss)) @@ -2684,11 +2692,12 @@ func (m *dbMeta) dumpEntryFast(inode Ino) (*DumpedEntry, error) { } else if attr.Typ == TypeSymlink { l, ok := m.snap.symlink[inode] if !ok { - return nil, fmt.Errorf("no link target for inode %d", inode) + logger.Warnf("no link target for inode %d", inode) + return nil } e.Symlink = l.Target } - return e, nil + return e } func (m *dbMeta) dumpDir(inode Ino, tree *DumpedEntry, bw *bufio.Writer, depth int, showProgress func(totalIncr, currentIncr int64)) error { @@ -2722,13 +2731,18 @@ func (m *dbMeta) dumpDir(inode Ino, tree *DumpedEntry, bw *bufio.Writer, depth i for idx, e := range edges { var entry *DumpedEntry if m.root == 1 { - entry, err = m.dumpEntryFast(e.Inode) + entry = m.dumpEntryFast(e.Inode) } else { entry, err = m.dumpEntry(e.Inode) + if err != nil { + return err + } } - if err != nil { - return err + + if entry == nil { + continue } + entry.Name = e.Name if e.Type == TypeDirectory { err = m.dumpDir(e.Inode, entry, bw, depth+2, showProgress) @@ -2825,12 +2839,15 @@ func (m *dbMeta) DumpMeta(w io.Writer) (err error) { if err = m.makeSnap(); err != nil { return fmt.Errorf("Fetch all metadata from DB: %s", err) } - tree, err = m.dumpEntryFast(m.root) + tree = m.dumpEntryFast(m.root) } else { tree, err = m.dumpEntry(m.root) + if err != nil { + return err + } } - if err != nil { - return err + if tree == nil { + return errors.New("The entry of the root inode was not found") } tree.Name = "FSTree" From a077919ff1fa6520e8e076c856374367096ac271 Mon Sep 17 00:00:00 2001 From: zhijian Date: Thu, 18 Nov 2021 11:10:26 +0800 Subject: [PATCH 4/5] fix add dir xattrs write --- pkg/meta/dump.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/meta/dump.go b/pkg/meta/dump.go index 8391576ad7a6..ae093be2dc4a 100644 --- a/pkg/meta/dump.go +++ b/pkg/meta/dump.go @@ -171,6 +171,12 @@ func (de *DumpedEntry) writeJsonWithOutEntry(bw *bufio.Writer, depth int) error return err } write(fmt.Sprintf("\n%s\"attr\": %s", fieldPrefix, data)) + if len(de.Xattrs) > 0 { + if data, err = json.Marshal(de.Xattrs); err != nil { + return err + } + write(fmt.Sprintf(",\n%s\"xattrs\": %s", fieldPrefix, data)) + } write(fmt.Sprintf(",\n%s\"entries\": {", fieldPrefix)) return nil } From 4accf06bc810ac4107e0c4406b79a5f5ae067f25 Mon Sep 17 00:00:00 2001 From: zhijian Date: Thu, 18 Nov 2021 20:08:56 +0800 Subject: [PATCH 5/5] Add progress bar for make snap and load entries; now progress bar will show estimated arrival time Fix the problem that the SQL statement of a single submission is too long, which causes the placeholder to exceed the upper limit --- pkg/meta/redis.go | 6 +++++- pkg/meta/sql.go | 48 ++++++++++++++++++++++++++++++++++++++++------ pkg/meta/tkv.go | 6 +++++- pkg/utils/utils.go | 3 +++ 4 files changed, 55 insertions(+), 8 deletions(-) diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index f6684580c0ff..3ca371a3b6b7 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -3275,6 +3275,8 @@ func (m *redisMeta) LoadMeta(r io.Reader) error { counters := &DumpedCounters{} refs := make(map[string]int) + lProgress, lBar := utils.NewDynProgressBar("LoadEntry progress: ", false) + lBar.SetTotal(int64(len(entries)), false) maxNum := 100 pool := make(chan struct{}, maxNum) errCh := make(chan error, 100) @@ -3291,6 +3293,7 @@ func (m *redisMeta) LoadMeta(r io.Reader) error { go func(entry *DumpedEntry) { defer func() { wg.Done() + lBar.Increment() <-pool }() if err = m.loadEntry(entry, counters, refs); err != nil { @@ -3309,7 +3312,8 @@ func (m *redisMeta) LoadMeta(r io.Reader) error { return err case <-done: } - + lBar.SetTotal(0, true) + lProgress.Wait() logger.Infof("Dumped counters: %+v", *dm.Counters) logger.Infof("Loaded counters: %+v", *counters) diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 5bd6752e4088..db6e0b4ec0d7 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -540,11 +540,23 @@ func (m *dbMeta) nextInode() (Ino, error) { } func mustInsert(s *xorm.Session, beans ...interface{}) error { - inserted, err := s.Insert(beans...) - if err == nil && int(inserted) < len(beans) { - err = fmt.Errorf("%d records not inserted: %+v", len(beans)-int(inserted), beans) + var start, end int + batchSize := 200 + for i := 0; i < len(beans)/batchSize; i++ { + end = start + batchSize + inserted, err := s.Insert(beans[start:end]...) + if err == nil && int(inserted) < end-start { + return fmt.Errorf("%d records not inserted: %+v", end-start-int(inserted), beans[start:end]) + } + start = end } - return err + if len(beans)%batchSize != 0 { + inserted, err := s.Insert(beans[end:]...) + if err == nil && int(inserted) < len(beans)-end { + return fmt.Errorf("%d records not inserted: %+v", len(beans)-end-int(inserted), beans[end:]) + } + } + return nil } var errBusy error @@ -2712,7 +2724,7 @@ func (m *dbMeta) dumpDir(inode Ino, tree *DumpedEntry, bw *bufio.Writer, depth i if m.root == 1 { edges, ok = m.snap.edges[inode] if !ok { - return fmt.Errorf("no edge target for inode %d", inode) + logger.Warnf("no edge target for inode %d", inode) } } else { if err := m.engine.Find(&edges, &edge{Parent: inode}); err != nil { @@ -2772,11 +2784,23 @@ func (m *dbMeta) makeSnap() error { chunk: make(map[string]*chunk), } - bufferSize := 10000 + var total int64 + for _, s := range []interface{}{new(node), new(symlink), new(edge), new(xattr), new(chunk)} { + count, err := m.engine.Count(s) + if err != nil { + return err + } + total += count + } + progress, bar := utils.NewDynProgressBar("Make snap progress: ", false) + bar.SetTotal(total, false) + + bufferSize := 10000 if err := m.engine.BufferSize(bufferSize).Iterate(new(node), func(idx int, bean interface{}) error { n := bean.(*node) m.snap.node[n.Inode] = n + bar.Increment() return nil }); err != nil { return err @@ -2785,6 +2809,7 @@ func (m *dbMeta) makeSnap() error { if err := m.engine.BufferSize(bufferSize).Iterate(new(symlink), func(idx int, bean interface{}) error { s := bean.(*symlink) m.snap.symlink[s.Inode] = s + bar.Increment() return nil }); err != nil { return err @@ -2792,6 +2817,7 @@ func (m *dbMeta) makeSnap() error { if err := m.engine.BufferSize(bufferSize).Iterate(new(edge), func(idx int, bean interface{}) error { e := bean.(*edge) m.snap.edges[e.Parent] = append(m.snap.edges[e.Parent], e) + bar.Increment() return nil }); err != nil { return err @@ -2800,6 +2826,7 @@ func (m *dbMeta) makeSnap() error { if err := m.engine.BufferSize(bufferSize).Iterate(new(xattr), func(idx int, bean interface{}) error { x := bean.(*xattr) m.snap.xattr[x.Inode] = append(m.snap.xattr[x.Inode], x) + bar.Increment() return nil }); err != nil { return err @@ -2808,10 +2835,13 @@ func (m *dbMeta) makeSnap() error { if err := m.engine.BufferSize(bufferSize).Iterate(new(chunk), func(idx int, bean interface{}) error { c := bean.(*chunk) m.snap.chunk[fmt.Sprintf("%d-%d", c.Inode, c.Indx)] = c + bar.Increment() return nil }); err != nil { return err } + bar.SetTotal(0, true) + progress.Wait() return nil } @@ -3066,6 +3096,8 @@ func (m *dbMeta) LoadMeta(r io.Reader) error { } refs := make(map[uint64]*chunkRef) + lProgress, lBar := utils.NewDynProgressBar("LoadEntry progress: ", false) + lBar.SetTotal(int64(len(entries)), false) maxNum := 100 pool := make(chan struct{}, maxNum) errCh := make(chan error, 100) @@ -3081,6 +3113,7 @@ func (m *dbMeta) LoadMeta(r io.Reader) error { wg.Add(1) go func(entry *DumpedEntry) { defer func() { + lBar.Increment() wg.Done() <-pool }() @@ -3101,6 +3134,9 @@ func (m *dbMeta) LoadMeta(r io.Reader) error { case <-done: } + lBar.SetTotal(0, true) + lProgress.Wait() + logger.Infof("Dumped counters: %+v", *dm.Counters) logger.Infof("Loaded counters: %+v", *counters) diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 05ba320752ca..d0dc65b84360 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -2764,6 +2764,8 @@ func (m *kvMeta) LoadMeta(r io.Reader) error { } refs := make(map[string]int64) + lProgress, lBar := utils.NewDynProgressBar("LoadEntry progress: ", false) + lBar.SetTotal(int64(len(entries)), false) maxNum := 100 pool := make(chan struct{}, maxNum) errCh := make(chan error, 100) @@ -2780,6 +2782,7 @@ func (m *kvMeta) LoadMeta(r io.Reader) error { go func(entry *DumpedEntry) { defer func() { wg.Done() + lBar.Increment() <-pool }() if err = m.loadEntry(entry, counters, refs); err != nil { @@ -2798,7 +2801,8 @@ func (m *kvMeta) LoadMeta(r io.Reader) error { return err case <-done: } - + lBar.SetTotal(0, true) + lProgress.Wait() logger.Infof("Dumped counters: %+v", *dm.Counters) logger.Infof("Loaded counters: %+v", *counters) diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 26945eb34be3..88a5e7483a31 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -63,6 +63,9 @@ func NewDynProgressBar(title string, quiet bool) (*mpb.Progress, *mpb.Bar) { ), mpb.AppendDecorators( decor.OnComplete(decor.Percentage(decor.WC{W: 5}), "done"), + decor.OnComplete( + decor.AverageETA(decor.ET_STYLE_GO, decor.WC{W: 6}), "", + ), ), ) return progress, bar