diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index 0d12e62fcd37..833a684f7bf4 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -2992,31 +2992,32 @@ func (m *redisMeta) DumpMeta(w io.Writer, root Ino) (err error) { return bw.Flush() } -func (m *redisMeta) loadEntry(e *DumpedEntry, cs *DumpedCounters, refs map[string]int) error { +func (m *redisMeta) loadEntry(e *DumpedEntry, p redis.Pipeliner, tryExec func(), cs *DumpedCounters, refs map[string]int) { inode := e.Attr.Inode logger.Debugf("Loading entry inode %d name %s", inode, unescape(e.Name)) ctx := Background attr := loadAttr(e.Attr) attr.Parent = e.Parent - p := m.rdb.Pipeline() - batch := 10000 + batch := 100 if attr.Typ == TypeFile { attr.Length = e.Attr.Length for _, c := range e.Chunks { - if len(c.Slices) == 0 { - continue - } slices := make([]string, 0, len(c.Slices)) - m.Lock() for _, s := range c.Slices { slices = append(slices, string(marshalSlice(s.Pos, s.Chunkid, s.Size, s.Off, s.Len))) refs[m.sliceKey(s.Chunkid, s.Size)]++ if cs.NextChunk < int64(s.Chunkid) { cs.NextChunk = int64(s.Chunkid) } + if len(slices) > batch { + p.RPush(ctx, m.chunkKey(inode, c.Index), slices) + tryExec() + slices = slices[:0] + } + } + if len(slices) > 0 { + p.RPush(ctx, m.chunkKey(inode, c.Index), slices) } - m.Unlock() - p.RPush(ctx, m.chunkKey(inode, c.Index), slices) } } else if attr.Typ == TypeDirectory { attr.Length = 4 << 10 @@ -3026,6 +3027,7 @@ func (m *redisMeta) loadEntry(e *DumpedEntry, cs *DumpedCounters, refs map[strin dentries[string(unescape(k))] = m.packEntry(typeFromString(entry.Attr.Type), entry.Attr.Inode) if len(dentries) >= batch { p.HSet(ctx, m.entryKey(inode), dentries) + tryExec() dentries = make(map[string]interface{}, batch) } } @@ -3037,7 +3039,6 @@ func (m *redisMeta) loadEntry(e *DumpedEntry, cs *DumpedCounters, refs map[strin attr.Length = uint64(len(symL)) p.Set(ctx, m.symKey(inode), symL, 0) } - m.Lock() if inode > 1 && inode != TrashInode { cs.UsedSpace += align4K(attr.Length) cs.UsedInodes += 1 @@ -3051,8 +3052,6 @@ func (m *redisMeta) loadEntry(e *DumpedEntry, cs *DumpedCounters, refs map[strin cs.NextTrash = int64(inode) - TrashInode } } - m.Unlock() - if len(e.Xattrs) > 0 { xattrs := make(map[string]interface{}) for _, x := range e.Xattrs { @@ -3061,13 +3060,10 @@ func (m *redisMeta) loadEntry(e *DumpedEntry, cs *DumpedCounters, refs map[strin p.HSet(ctx, m.xattrKey(inode), xattrs) } p.Set(ctx, m.inodeKey(inode), m.marshal(attr), 0) - _, err := p.Exec(ctx) - return err } -func (m *redisMeta) LoadMeta(r io.Reader) error { +func (m *redisMeta) LoadMeta(r io.Reader) (err error) { ctx := Background - var err error if _, ok := m.rdb.(*redis.ClusterClient); ok { err = m.scan(ctx, "*", func(s []string) error { return fmt.Errorf("found key with same prefix: %s", s) @@ -3085,6 +3081,7 @@ func (m *redisMeta) LoadMeta(r io.Reader) error { } } + logger.Infoln("Reading file ...") dec := json.NewDecoder(r) dm := &DumpedMeta{} if err = dec.Decode(dm); err != nil { @@ -3117,46 +3114,38 @@ func (m *redisMeta) LoadMeta(r io.Reader) error { counters := &DumpedCounters{} refs := make(map[string]int) bar = progress.AddCountBar("Loaded entries", int64(len(entries))) - maxNum := 100 - pool := make(chan struct{}, maxNum) - errCh := make(chan error, 100) - done := make(chan struct{}, 1) - var wg sync.WaitGroup - for _, entry := range entries { - select { - case err = <-errCh: - return err - default: - } - pool <- struct{}{} - wg.Add(1) - go func(entry *DumpedEntry) { - defer func() { - wg.Done() - bar.Increment() - <-pool - }() - if err = m.loadEntry(entry, counters, refs); err != nil { - errCh <- err + p := m.rdb.TxPipeline() + tryExec := func() { + if p.Len() > 1000 { + if rs, err := p.Exec(ctx); err != nil { + for i, r := range rs { + if r.Err() != nil { + logger.Errorf("failed command %d %+v: %s", i, r, r.Err()) + break + } + } + panic(err) } - }(entry) + } } - - go func() { - wg.Wait() - close(done) + defer func() { + if e := recover(); e != nil { + if ee, ok := e.(error); ok { + err = ee + } else { + panic(e) + } + } }() - - select { - case err = <-errCh: - return err - case <-done: + for _, entry := range entries { + bar.Increment() + m.loadEntry(entry, p, tryExec, counters, refs) + tryExec() } progress.Done() + logger.Infof("Dumped counters: %+v", *dm.Counters) logger.Infof("Loaded counters: %+v", *counters) - - p := m.rdb.Pipeline() p.Set(ctx, m.setting(), format, 0) cs := make(map[string]interface{}) cs[usedSpace] = counters.UsedSpace @@ -3169,6 +3158,11 @@ func (m *redisMeta) LoadMeta(r io.Reader) error { if len(dm.DelFiles) > 0 { zs := make([]*redis.Z, 0, len(dm.DelFiles)) for _, d := range dm.DelFiles { + if len(zs) > 100 { + p.ZAdd(ctx, m.delfiles(), zs...) + tryExec() + zs = zs[:0] + } zs = append(zs, &redis.Z{ Score: float64(d.Expire), Member: m.toDelete(d.Inode, d.Length), @@ -3179,6 +3173,11 @@ func (m *redisMeta) LoadMeta(r io.Reader) error { slices := make(map[string]interface{}) for k, v := range refs { if v > 1 { + if len(slices) > 100 { + p.HSet(ctx, m.sliceRefs(), slices) + tryExec() + slices = make(map[string]interface{}) + } slices[k] = v - 1 } }