Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up dump for sql engine #1006

Merged
merged 5 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions pkg/meta/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,27 +211,6 @@ type DumpedMeta struct {
FSTree *DumpedEntry `json:",omitempty"`
}

func (dm *DumpedMeta) writeJSON(w io.Writer) error {
tree := dm.FSTree
dm.FSTree = nil
data, err := json.MarshalIndent(dm, "", jsonIndent)
if err != nil {
return err
}
bw := bufio.NewWriterSize(w, jsonWriteSize)
if _, err = bw.Write(append(data[:len(data)-2], ',')); err != nil { // delete \n}
return err
}
tree.Name = "FSTree"
if err = tree.writeJSON(bw, 1); err != nil {
return err
}
if _, err = bw.WriteString("\n}\n"); err != nil {
return err
}
return bw.Flush()
}

func (dm *DumpedMeta) writeJsonWithOutTree(w io.Writer) (*bufio.Writer, error) {
dm.FSTree = nil
data, err := json.MarshalIndent(dm, "", jsonIndent)
Expand Down
18 changes: 18 additions & 0 deletions pkg/meta/load_dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,24 @@ func TestLoadDump(t *testing.T) {
t.Fatalf("dump meta: %s", err)
}
})

t.Run("Metadata Engine: SQLite --SubDir d1", func(t *testing.T) {
os.Remove("test10.db")
m := testLoad(t, "sqlite3://test10.db", sampleFile)
fp, err := os.OpenFile("sqlite3_subdir.dump", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
t.Fatalf("open file: %s", "sqlite3_subdir.dump")
}
defer fp.Close()
switch r := m.(type) {
case *dbMeta:
r.root = 3
}
if err = m.DumpMeta(fp); err != nil {
t.Fatalf("dump meta: %s", err)
}
})

t.Run("Metadata Engine: TKV", func(t *testing.T) {
os.Remove(settingPath)
m := testLoad(t, "memkv://test/jfs", sampleFile)
Expand Down
265 changes: 231 additions & 34 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package meta

import (
"bufio"
"bytes"
"database/sql"
"encoding/json"
Expand Down Expand Up @@ -150,6 +151,15 @@ type dbMeta struct {
freeMu sync.Mutex
freeInodes freeID
freeChunks freeID

snap *dbSnap
}
type dbSnap struct {
node map[Ino]*node
symlink map[Ino]*symlink
xattr map[Ino][]*xattr
edges map[Ino][]*edge
chunk map[string]*chunk
}

func newSQLMeta(driver, addr string, conf *Config) (Meta, error) {
Expand Down Expand Up @@ -2637,35 +2647,170 @@ func (m *dbMeta) dumpEntry(inode Ino) (*DumpedEntry, error) {
return nil
})
}
func (m *dbMeta) dumpEntryFast(inode Ino) (*DumpedEntry, error) {
e := &DumpedEntry{}
n, ok := m.snap.node[inode]
if !ok {
return nil, fmt.Errorf("inode %d not found", inode)
}
attr := &Attr{}
m.parseAttr(n, attr)
e.Attr = dumpAttr(attr)
e.Attr.Inode = inode

func (m *dbMeta) dumpDir(inode Ino, showProgress func(totalIncr, currentIncr int64)) (map[string]*DumpedEntry, error) {
var edges []edge
if err := m.engine.Find(&edges, &edge{Parent: inode}); err != nil {
return nil, err
rows, ok := m.snap.xattr[inode]
if ok && len(rows) > 0 {
xattrs := make([]*DumpedXattr, 0, len(rows))
for _, x := range rows {
xattrs = append(xattrs, &DumpedXattr{x.Name, string(x.Value)})
}
sort.Slice(xattrs, func(i, j int) bool { return xattrs[i].Name < xattrs[j].Name })
e.Xattrs = xattrs
}

if attr.Typ == TypeFile {
for indx := uint32(0); uint64(indx)*ChunkSize < attr.Length; indx++ {
c, ok := m.snap.chunk[fmt.Sprintf("%d-%d", inode, indx)]
if !ok {
return nil, fmt.Errorf("no found chunk target for inode %d indx %d", inode, indx)
}
ss := readSliceBuf(c.Slices)
slices := make([]*DumpedSlice, 0, len(ss))
for _, s := range ss {
slices = append(slices, &DumpedSlice{Chunkid: s.chunkid, Pos: s.pos, Size: s.size, Off: s.off, Len: s.len})
}
e.Chunks = append(e.Chunks, &DumpedChunk{indx, slices})
}
} else if attr.Typ == TypeSymlink {
l, ok := m.snap.symlink[inode]
if !ok {
return nil, fmt.Errorf("no link target for inode %d", inode)
}
e.Symlink = l.Target
}
return e, nil
}

func (m *dbMeta) dumpDir(inode Ino, tree *DumpedEntry, bw *bufio.Writer, depth int, showProgress func(totalIncr, currentIncr int64)) error {
bwWrite := func(s string) {
if _, err := bw.WriteString(s); err != nil {
panic(err)
}
}
var edges []*edge
var err error
var ok bool
if m.root == 1 {
edges, ok = m.snap.edges[inode]
if !ok {
return fmt.Errorf("no edge target for inode %d", inode)
}
} else {
if err := m.engine.Find(&edges, &edge{Parent: inode}); err != nil {
return err
}
}

if showProgress != nil {
showProgress(int64(len(edges)), 0)
}
entries := make(map[string]*DumpedEntry)
for _, e := range edges {
entry, err := m.dumpEntry(e.Inode)
if err := tree.writeJsonWithOutEntry(bw, depth); err != nil {
return err
}
sort.Slice(edges, func(i, j int) bool { return edges[i].Name < edges[j].Name })

for idx, e := range edges {
var entry *DumpedEntry
if m.root == 1 {
entry, err = m.dumpEntryFast(e.Inode)
} else {
entry, err = m.dumpEntry(e.Inode)
}
if err != nil {
return nil, err
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we skip the edge and continue (log it as warning)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been modified

}
entry.Name = e.Name
if e.Type == TypeDirectory {
if entry.Entries, err = m.dumpDir(e.Inode, showProgress); err != nil {
return nil, err
}
err = m.dumpDir(e.Inode, entry, bw, depth+2, showProgress)
} else {
err = entry.writeJSON(bw, depth+2)
}
if err != nil {
return err
}
if idx != len(edges)-1 {
bwWrite(",")
}
entries[e.Name] = entry
if showProgress != nil {
showProgress(0, 1)
}
}
return entries, nil
bwWrite(fmt.Sprintf("\n%s}\n%s}", strings.Repeat(jsonIndent, depth+1), strings.Repeat(jsonIndent, depth)))
return nil
}

func (m *dbMeta) DumpMeta(w io.Writer) error {
func (m *dbMeta) makeSnap() error {
m.snap = &dbSnap{
node: make(map[Ino]*node),
symlink: make(map[Ino]*symlink),
xattr: make(map[Ino][]*xattr),
edges: make(map[Ino][]*edge),
chunk: make(map[string]*chunk),
}

bufferSize := 10000

if err := m.engine.BufferSize(bufferSize).Iterate(new(node), func(idx int, bean interface{}) error {
n := bean.(*node)
m.snap.node[n.Inode] = n
return nil
}); err != nil {
return err
}

if err := m.engine.BufferSize(bufferSize).Iterate(new(symlink), func(idx int, bean interface{}) error {
s := bean.(*symlink)
m.snap.symlink[s.Inode] = s
return nil
}); err != nil {
return err
}
if err := m.engine.BufferSize(bufferSize).Iterate(new(edge), func(idx int, bean interface{}) error {
e := bean.(*edge)
m.snap.edges[e.Parent] = append(m.snap.edges[e.Parent], e)
return nil
}); err != nil {
return err
}

if err := m.engine.BufferSize(bufferSize).Iterate(new(xattr), func(idx int, bean interface{}) error {
x := bean.(*xattr)
m.snap.xattr[x.Inode] = append(m.snap.xattr[x.Inode], x)
return nil
}); err != nil {
return err
}

if err := m.engine.BufferSize(bufferSize).Iterate(new(chunk), func(idx int, bean interface{}) error {
c := bean.(*chunk)
m.snap.chunk[fmt.Sprintf("%d-%d", c.Inode, c.Indx)] = c
return nil
}); err != nil {
return err
}
return nil
}

func (m *dbMeta) DumpMeta(w io.Writer) (err error) {
defer func() {
if p := recover(); p != nil {
if e, ok := p.(error); ok {
err = e
} else {
err = fmt.Errorf("DumpMeta error: %v", p)
}
}
}()
var drows []delfile
if err := m.engine.Find(&drows); err != nil {
return err
Expand All @@ -2675,32 +2820,24 @@ func (m *dbMeta) DumpMeta(w io.Writer) error {
dels = append(dels, &DumpedDelFile{row.Inode, row.Length, row.Expire})
}

tree, err := m.dumpEntry(m.root)
if err != nil {
return err
var tree *DumpedEntry
if m.root == 1 {
if err = m.makeSnap(); err != nil {
return fmt.Errorf("Fetch all metadata from DB: %s", err)
}
tree, err = m.dumpEntryFast(m.root)
} else {
tree, err = m.dumpEntry(m.root)
}

var total int64 = 1 // root
progress, bar := utils.NewDynProgressBar("Dump dir progress: ", false)
bar.Increment()
if tree.Entries, err = m.dumpDir(m.root, func(totalIncr, currentIncr int64) {
total += totalIncr
bar.SetTotal(total, false)
bar.IncrInt64(currentIncr)
}); err != nil {
if err != nil {
return err
}
if bar.Current() != total {
logger.Warnf("Dumped %d / total %d, some entries are not dumped", bar.Current(), total)
}
bar.SetTotal(0, true)
progress.Wait()

tree.Name = "FSTree"
format, err := m.Load()
if err != nil {
return err
}

var crows []counter
if err = m.engine.Find(&crows); err != nil {
return err
Expand Down Expand Up @@ -2741,7 +2878,34 @@ func (m *dbMeta) DumpMeta(w io.Writer) error {
dels,
tree,
}
return dm.writeJSON(w)

bw, err := dm.writeJsonWithOutTree(w)
if err != nil {
return err
}

var total int64 = 1 // root
progress, bar := utils.NewDynProgressBar("Dump dir progress: ", false)
bar.Increment()

if err = m.dumpDir(m.root, tree, bw, 1, func(totalIncr, currentIncr int64) {
total += totalIncr
bar.SetTotal(total, false)
bar.IncrInt64(currentIncr)
}); err != nil {
return err
}

if _, err = bw.WriteString("\n}\n"); err != nil {
return err
}
if bar.Current() != total {
logger.Warnf("Dumped %d / total %d, some entries are not dumped", bar.Current(), total)
}
bar.SetTotal(0, true)
progress.Wait()

return bw.Flush()
}

func (m *dbMeta) loadEntry(e *DumpedEntry, cs *DumpedCounters, refs map[uint64]*chunkRef) error {
Expand Down Expand Up @@ -2772,11 +2936,13 @@ func (m *dbMeta) loadEntry(e *DumpedEntry, cs *DumpedCounters, refs map[uint64]*
slices := make([]byte, 0, sliceBytes*len(c.Slices))
for _, s := range c.Slices {
slices = append(slices, marshalSlice(s.Pos, s.Chunkid, s.Size, s.Off, s.Len)...)
m.Lock()
if refs[s.Chunkid] == nil {
refs[s.Chunkid] = &chunkRef{s.Chunkid, s.Size, 1}
} else {
refs[s.Chunkid].Refs++
}
m.Unlock()
if cs.NextChunk <= int64(s.Chunkid) {
cs.NextChunk = int64(s.Chunkid) + 1
}
Expand Down Expand Up @@ -2882,11 +3048,42 @@ func (m *dbMeta) LoadMeta(r io.Reader) error {
NextSession: 1,
}
refs := make(map[uint64]*chunkRef)

maxNum := 100
pool := make(chan struct{}, maxNum)
errCh := make(chan error, 100)
done := make(chan struct{}, 1)
var wg sync.WaitGroup
for _, entry := range entries {
if err = m.loadEntry(entry, counters, refs); err != nil {
select {
case err = <-errCh:
return err
default:
}
pool <- struct{}{}
wg.Add(1)
go func(entry *DumpedEntry) {
defer func() {
wg.Done()
<-pool
}()
if err = m.loadEntry(entry, counters, refs); err != nil {
errCh <- err
}
}(entry)
}

go func() {
wg.Wait()
close(done)
}()

select {
case err = <-errCh:
return err
case <-done:
}

logger.Infof("Dumped counters: %+v", *dm.Counters)
logger.Infof("Loaded counters: %+v", *counters)

Expand Down