diff --git a/pkg/meta/base.go b/pkg/meta/base.go index 70dc5154174f..5c05e1a9cc68 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -17,6 +17,7 @@ package meta import ( + "bytes" "encoding/binary" "encoding/json" "fmt" @@ -47,6 +48,11 @@ const ( ) var ( + DirBatchNum = map[string]int{ + "redis": 4096, + "kv": 4096, + "db": 40960, + } maxCompactSlices = 1000 maxSlices = 2500 inodeNeedPrefetch = uint64(utils.JitterIt(inodeBatch * 0.1)) // Add jitter to reduce probability of txn conflicts @@ -125,6 +131,8 @@ type engine interface { doSetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.Rule) syscall.Errno doGetFacl(ctx Context, ino Ino, aclType uint8, aclId uint32, rule *aclAPI.Rule) syscall.Errno cacheACLs(ctx Context) error + + newDirHandler(inode Ino, plus bool, entries []*Entry) DirHandler } type trashSliceScan func(ss []Slice, ts int64) (clean bool, err error) @@ -2889,3 +2897,178 @@ func inGroup(ctx Context, gid uint32) bool { } return false } + +type DirHandler interface { + List(ctx Context, offset int) ([]*Entry, syscall.Errno) + Insert(inode Ino, name string, attr *Attr) + Delete(name string) + Read(offset int) + Close() +} + +func (m *baseMeta) NewDirHandler(ctx Context, inode Ino, plus bool, initEntries []*Entry) (DirHandler, syscall.Errno) { + var attr Attr + var st syscall.Errno + defer func() { + if st == 0 { + m.touchAtime(ctx, inode, &attr) + } + }() + + inode = m.checkRoot(inode) + if st = m.GetAttr(ctx, inode, &attr); st != 0 { + return nil, st + } + defer m.timeit("NewDirHandler", time.Now()) + var mmask uint8 = MODE_MASK_R + if plus { + mmask |= MODE_MASK_X + } + + if st = m.Access(ctx, inode, mmask, &attr); st != 0 { + return nil, st + } + if inode == m.root { + attr.Parent = m.root + } + + initEntries = append(initEntries, &Entry{ + Inode: inode, + Name: []byte("."), + Attr: &Attr{Typ: TypeDirectory}, + }, &Entry{ + Inode: attr.Parent, + Name: []byte(".."), + Attr: &Attr{Typ: TypeDirectory}, + }) + + return m.en.newDirHandler(inode, plus, initEntries), 0 +} + +type dirBatch struct { + isEnd bool + offset int + cursor interface{} + maxName []byte + entries []*Entry + indexes map[string]int +} + +func (b *dirBatch) contain(offset int) bool { + if b == nil { + return false + } + return b.offset <= offset && offset < b.offset+len(b.entries) || (len(b.entries) == 0 && b.offset == offset) +} + +func (b *dirBatch) predecessor(offset int) bool { + return b.offset+len(b.entries) == offset +} + +type dirFetcher func(ctx Context, inode Ino, cursor interface{}, offset, limit int, plus bool) (interface{}, []*Entry, error) + +type dirHandler struct { + sync.Mutex + inode Ino + plus bool + initEntries []*Entry + batch *dirBatch + fetcher dirFetcher + readOff int + batchNum int +} + +func (h *dirHandler) fetch(ctx Context, offset int) (*dirBatch, error) { + var cursor interface{} + if h.batch != nil && h.batch.predecessor(offset) { + if h.batch.isEnd { + return h.batch, nil + } + cursor = h.batch.cursor + } + nextCursor, entries, err := h.fetcher(ctx, h.inode, cursor, offset, h.batchNum, h.plus) + if err != nil { + return nil, err + } + if entries == nil { + entries = []*Entry{} + nextCursor = cursor + } + indexes := make(map[string]int, len(entries)) + maxName := []byte("") + for i, e := range entries { + indexes[string(e.Name)] = i + if bytes.Compare(e.Name, maxName) > 0 { + maxName = e.Name + } + } + return &dirBatch{isEnd: len(entries) < h.batchNum, offset: offset, cursor: nextCursor, entries: entries, indexes: indexes, maxName: maxName}, nil +} + +func (h *dirHandler) List(ctx Context, offset int) ([]*Entry, syscall.Errno) { + var prefix []*Entry + if offset < len(h.initEntries) { + prefix = h.initEntries[offset:] + offset = 0 + } else { + offset -= len(h.initEntries) + } + + var err error + h.Lock() + defer h.Unlock() + if !h.batch.contain(offset) { + h.batch, err = h.fetch(ctx, offset) + } + + if err != nil { + return nil, errno(err) + } + + h.readOff = h.batch.offset + len(h.batch.entries) + if len(prefix) > 0 { + return append(prefix, h.batch.entries...), 0 + } + return h.batch.entries[offset-h.batch.offset:], 0 +} + +func (h *dirHandler) delete(name string) { + if h.batch == nil || len(h.batch.entries) == 0 { + return + } + + if idx, ok := h.batch.indexes[name]; ok && idx >= h.readOff { + delete(h.batch.indexes, name) + n := len(h.batch.entries) + if idx < n-1 { + // TODO: sorted + h.batch.entries[idx] = h.batch.entries[n-1] + h.batch.indexes[string(h.batch.entries[idx].Name)] = idx + } + h.batch.entries = h.batch.entries[:n-1] + } +} + +func (h *dirHandler) Insert(inode Ino, name string, attr *Attr) { + h.Lock() + defer h.Unlock() + if h.batch == nil { + return + } + if h.batch.isEnd || bytes.Compare([]byte(name), h.batch.maxName) < 0 { + // TODO: sorted + h.batch.entries = append(h.batch.entries, &Entry{Inode: inode, Name: []byte(name), Attr: attr}) + h.batch.indexes[name] = len(h.batch.entries) - 1 + } +} + +func (h *dirHandler) Read(offset int) { + h.readOff = offset - len(h.initEntries) +} + +func (h *dirHandler) Close() { + h.Lock() + h.batch = nil + h.readOff = 0 + h.Unlock() +} diff --git a/pkg/meta/interface.go b/pkg/meta/interface.go index 3fc721179140..e9e6fb12bcdb 100644 --- a/pkg/meta/interface.go +++ b/pkg/meta/interface.go @@ -376,6 +376,8 @@ type Meta interface { Link(ctx Context, inodeSrc, parent Ino, name string, attr *Attr) syscall.Errno // Readdir returns all entries for given directory, which include attributes if plus is true. Readdir(ctx Context, inode Ino, wantattr uint8, entries *[]*Entry) syscall.Errno + // NewDirHandler returns a stream for directory entries. + NewDirHandler(ctx Context, inode Ino, plus bool, initEntries []*Entry) (DirHandler, syscall.Errno) // Create creates a file in a directory with given name. Create(ctx Context, parent Ino, name string, mode uint16, cumask uint16, flags uint32, inode *Ino, attr *Attr) syscall.Errno // Open checks permission on a node and track it as open. diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index c875eb97ef69..7892c500a080 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -1957,6 +1957,28 @@ func (m *redisMeta) doLink(ctx Context, inode, parent Ino, name string, attr *At }, m.inodeKey(parent), m.entryKey(parent), m.inodeKey(inode))) } +func (m *redisMeta) fillAttr(ctx Context, es []*Entry) error { + if len(es) == 0 { + return nil + } + var keys = make([]string, len(es)) + for i, e := range es { + keys[i] = m.inodeKey(e.Inode) + } + rs, err := m.rdb.MGet(ctx, keys...).Result() + if err != nil { + return err + } + for j, re := range rs { + if re != nil { + if a, ok := re.(string); ok { + m.parseAttr([]byte(a), es[j].Attr) + } + } + } + return nil +} + func (m *redisMeta) doReaddir(ctx Context, inode Ino, plus uint8, entries *[]*Entry, limit int) syscall.Errno { var stop = errors.New("stop") err := m.hscan(ctx, m.entryKey(inode), func(keys []string) error { @@ -1988,28 +2010,10 @@ func (m *redisMeta) doReaddir(ctx Context, inode Ino, plus uint8, entries *[]*En } if plus != 0 && len(*entries) != 0 { - fillAttr := func(es []*Entry) error { - var keys = make([]string, len(es)) - for i, e := range es { - keys[i] = m.inodeKey(e.Inode) - } - rs, err := m.rdb.MGet(ctx, keys...).Result() - if err != nil { - return err - } - for j, re := range rs { - if re != nil { - if a, ok := re.(string); ok { - m.parseAttr([]byte(a), es[j].Attr) - } - } - } - return nil - } batchSize := 4096 nEntries := len(*entries) if nEntries <= batchSize { - err = fillAttr(*entries) + err = m.fillAttr(ctx, *entries) } else { indexCh := make(chan []*Entry, 10) var wg sync.WaitGroup @@ -2018,7 +2022,7 @@ func (m *redisMeta) doReaddir(ctx Context, inode Ino, plus uint8, entries *[]*En go func() { defer wg.Done() for es := range indexCh { - e := fillAttr(es) + e := m.fillAttr(ctx, es) if e != nil { err = e break @@ -4555,3 +4559,135 @@ func (m *redisMeta) loadDumpedACLs(ctx Context) error { return tx.Set(ctx, m.prefix+aclCounter, maxId, 0).Err() }, m.inodeKey(RootInode)) } + +func (m *redisMeta) newDirHandler(inode Ino, plus bool, entries []*Entry) DirHandler { + return &redisDirHandler{ + en: m, + inode: inode, + plus: plus, + initEntries: entries, + batchNum: DirBatchNum["redis"], + } +} + +type redisDirHandler struct { + sync.Mutex + inode Ino + plus bool + en *redisMeta + initEntries []*Entry + entries []*Entry + indexes map[string]int + readOff int + batchNum int +} + +func (s *redisDirHandler) Close() { + s.Lock() + s.entries = nil + s.readOff = 0 + s.Unlock() +} + +func (s *redisDirHandler) Delete(name string) { + s.Lock() + defer s.Unlock() + + if len(s.entries) == 0 { + return + } + + if idx, ok := s.indexes[name]; ok && idx >= s.readOff { + delete(s.indexes, name) + n := len(s.entries) + if idx < n-1 { + // TODO: sorted + s.entries[idx] = s.entries[n-1] + s.indexes[string(s.entries[idx].Name)] = idx + } + s.entries = s.entries[:n-1] + } +} + +func (s *redisDirHandler) Insert(inode Ino, name string, attr *Attr) { + s.Lock() + defer s.Unlock() + + if len(s.entries) == 0 { + return + } + + // TODO: sorted + s.entries = append(s.entries, &Entry{Inode: inode, Name: []byte(name), Attr: attr}) + s.indexes[name] = len(s.entries) - 1 +} + +func (s *redisDirHandler) List(ctx Context, offset int) ([]*Entry, syscall.Errno) { + var prefix []*Entry + if offset < len(s.initEntries) { + prefix = s.initEntries[offset:] + offset = 0 + } else { + offset -= len(s.initEntries) + } + + s.Lock() + if s.entries == nil { + var entries []*Entry + err := s.en.hscan(ctx, s.en.entryKey(s.inode), func(keys []string) error { + newEntries := make([]Entry, len(keys)/2) + newAttrs := make([]Attr, len(keys)/2) + for i := 0; i < len(keys); i += 2 { + typ, ino := s.en.parseEntry([]byte(keys[i+1])) + if keys[i] == "" { + logger.Errorf("Corrupt entry with empty name: inode %d parent %d", ino, s.inode) + continue + } + ent := &newEntries[i/2] + ent.Inode = ino + ent.Name = []byte(keys[i]) + ent.Attr = &newAttrs[i/2] + ent.Attr.Typ = typ + entries = append(entries, ent) + } + return nil + }) + if err != nil { + return nil, errno(err) + } + + if s.en.conf.SortDir { + sort.Slice(entries, func(i, j int) bool { + return string(entries[i].Name) < string(entries[j].Name) + }) + } + if s.plus { + if err := s.en.fillAttr(ctx, entries); err != nil { + return nil, errno(err) + } + } + s.entries = entries + + indexes := make(map[string]int, len(entries)) + for i, e := range entries { + indexes[string(e.Name)] = i + } + s.indexes = indexes + } + s.Unlock() + + size := len(s.entries) - offset + if size > s.batchNum { + size = s.batchNum + } + s.readOff = offset + size + entries := s.entries[offset : offset+size] + if len(prefix) > 0 { + entries = append(prefix, entries...) + } + return entries, 0 +} + +func (s *redisDirHandler) Read(offset int) { + s.readOff = offset - len(s.initEntries) +} diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 9803051fe54d..4cb2d10fd094 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -4446,3 +4446,94 @@ func (m *dbMeta) loadDumpedACLs(ctx Context) error { return nil }) } + +type dbDirHandler struct { + dirHandler +} + +func (h *dbDirHandler) Insert(inode Ino, name string, attr *Attr) { + h.Lock() + defer h.Unlock() + if h.batch == nil { + return + } + if h.batch.isEnd || bytes.Compare([]byte(name), h.batch.maxName) < 0 { + h.batch.entries = append(h.batch.entries, &Entry{Inode: inode, Name: []byte(name), Attr: attr}) + h.batch.indexes[name] = len(h.batch.entries) - 1 + } +} + +func (h *dbDirHandler) Delete(name string) { + h.Lock() + defer h.Unlock() + + h.dirHandler.delete(name) + if h.batch != nil && !h.batch.isEnd && bytes.Compare(h.batch.maxName, []byte(name)) > 0 && h.batch.cursor != nil { + h.batch.cursor = h.batch.cursor.(int) - 1 + } +} + +func (m *dbMeta) newDirHandler(inode Ino, plus bool, entries []*Entry) DirHandler { + h := &dbDirHandler{ + dirHandler: dirHandler{ + inode: inode, + plus: plus, + initEntries: entries, + fetcher: m.getDirFetcher(), + batchNum: DirBatchNum["db"], + }, + } + h.batch, _ = h.fetch(Background, 0) + return h +} + +func (m *dbMeta) getDirFetcher() dirFetcher { + return func(ctx Context, inode Ino, cursor interface{}, offset, limit int, plus bool) (interface{}, []*Entry, error) { + entries := make([]*Entry, 0, limit) + err := m.roTxn(func(s *xorm.Session) error { + iCursor := offset + if cursor != nil { + iCursor = cursor.(int) + } + + var ids []int64 + if err := s.Table(&edge{}).Cols("id").Where("parent = ?", inode).Limit(limit, iCursor).Find(&ids); err != nil { + return err + } + + s = s.Table(&edge{}).In("jfs_edge.id", ids).OrderBy("jfs_edge.name") + if plus { + s = s.Join("INNER", &node{}, "jfs_edge.inode=jfs_node.inode").Cols("jfs_edge.name", "jfs_node.*") + } else { + s = s.Cols("jfs_edge.inode", "jfs_edge.name", "jfs_edge.type") + } + var nodes []namedNode + if err := s.Find(&nodes); err != nil { + return err + } + + for _, n := range nodes { + if len(n.Name) == 0 { + logger.Errorf("Corrupt entry with empty name: inode %d parent %d", n.Inode, inode) + continue + } + entry := &Entry{ + Inode: n.Inode, + Name: n.Name, + Attr: &Attr{}, + } + if plus { + m.parseAttr(&n.node, entry.Attr) + } else { + entry.Attr.Typ = n.Type + } + entries = append(entries, entry) + } + return nil + }) + if err != nil { + return nil, nil, err + } + return offset + len(entries), entries, nil + } +} diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 357bcfd1e97f..e74e0797acbb 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -415,6 +415,26 @@ func (m *kvMeta) scanValues(prefix []byte, limit int, filter func(k, v []byte) b return values, err } +func (m *kvMeta) scan(startKey, endKey []byte, limit int, filter func(k, v []byte) bool) ([][]byte, [][]byte, error) { + if limit == 0 { + return nil, nil, nil + } + var keys, vals [][]byte + err := m.client.txn(func(tx *kvTxn) error { + var c int + tx.scan(startKey, endKey, false, func(k, v []byte) bool { + if filter == nil || filter(k, v) { + keys = append(keys, k) + vals = append(vals, v) + c++ + } + return limit < 0 || c < limit + }) + return nil + }, 0) + return keys, vals, err +} + func (m *kvMeta) doInit(format *Format, force bool) error { body, err := m.get(m.fmtKey("setting")) if err != nil { @@ -1775,6 +1795,30 @@ func (m *kvMeta) doLink(ctx Context, inode, parent Ino, name string, attr *Attr) }, parent)) } +func (m *kvMeta) fillAttr(entries []*Entry) (err error) { + if len(entries) == 0 { + return nil + } + var keys = make([][]byte, len(entries)) + for i, e := range entries { + keys[i] = m.inodeKey(e.Inode) + } + var rs [][]byte + err = m.client.txn(func(tx *kvTxn) error { + rs = tx.gets(keys...) + return nil + }, 0) + if err != nil { + return err + } + for j, re := range rs { + if re != nil { + m.parseAttr(re, entries[j].Attr) + } + } + return err +} + func (m *kvMeta) doReaddir(ctx Context, inode Ino, plus uint8, entries *[]*Entry, limit int) syscall.Errno { // TODO: handle big directory vals, err := m.scanValues(m.entryKey(inode, ""), limit, nil) @@ -1796,30 +1840,10 @@ func (m *kvMeta) doReaddir(ctx Context, inode Ino, plus uint8, entries *[]*Entry } if plus != 0 && len(*entries) != 0 { - fillAttr := func(es []*Entry) error { - var keys = make([][]byte, len(es)) - for i, e := range es { - keys[i] = m.inodeKey(e.Inode) - } - var rs [][]byte - err := m.client.txn(func(tx *kvTxn) error { - rs = tx.gets(keys...) - return nil - }, 0) - if err != nil { - return err - } - for j, re := range rs { - if re != nil { - m.parseAttr(re, es[j].Attr) - } - } - return nil - } batchSize := 4096 nEntries := len(*entries) if nEntries <= batchSize { - err = fillAttr(*entries) + err = m.fillAttr(*entries) } else { indexCh := make(chan []*Entry, 10) var wg sync.WaitGroup @@ -1828,7 +1852,7 @@ func (m *kvMeta) doReaddir(ctx Context, inode Ino, plus uint8, entries *[]*Entry go func() { defer wg.Done() for es := range indexCh { - if e := fillAttr(es); e != nil { + if e := m.fillAttr(es); e != nil { err = e break } @@ -3756,3 +3780,89 @@ func (m *kvMeta) loadDumpedACLs(ctx Context) error { return nil }) } + +type kvDirHandler struct { + dirHandler +} + +func (h *kvDirHandler) Delete(name string) { + h.Lock() + defer h.Unlock() + h.dirHandler.delete(name) +} + +func (m *kvMeta) newDirHandler(inode Ino, plus bool, entries []*Entry) DirHandler { + s := &kvDirHandler{ + dirHandler: dirHandler{ + inode: inode, + plus: plus, + initEntries: entries, + fetcher: m.getDirFetcher(), + batchNum: DirBatchNum["kv"], + }, + } + s.batch, _ = s.fetch(Background, 0) + return s +} + +func (m *kvMeta) getDirFetcher() dirFetcher { + return func(ctx Context, inode Ino, cursor interface{}, offset, limit int, plus bool) (interface{}, []*Entry, error) { + var startKey []byte + sCursor := "" + var total int + if cursor == nil { + if offset > 0 { + total += offset + } + } else { + limit += 1 // skip the cursor + sCursor = cursor.(string) + } + total += limit + startKey = m.entryKey(inode, sCursor) + endKey := nextKey(m.entryKey(inode, "")) + + keys, vals, err := m.scan(startKey, endKey, total, nil) + if err != nil { + return nil, nil, err + } + + if cursor != nil { + keys, vals = keys[1:], vals[1:] + } + + if total > limit && offset <= len(keys) { + keys, vals = keys[offset:], vals[offset:] + } + + prefix := len(m.entryKey(inode, "")) + entries := make([]*Entry, 0, len(keys)) + var name []byte + var typ uint8 + var ino Ino + for i, buf := range vals { + name = keys[i] + typ, ino = m.parseEntry(buf) + if len(name) == prefix { + logger.Errorf("Corrupt entry with empty name: inode %d parent %d", ino, inode) + continue + } + entries = append(entries, &Entry{ + Inode: ino, + Name: []byte(name)[prefix:], + Attr: &Attr{Typ: typ}, + }) + } + + if plus { + if err = m.fillAttr(entries); err != nil { + return nil, nil, err + } + } + + if len(entries) == 0 { + return nil, nil, nil + } + return string(entries[len(entries)-1].Name), entries, nil + } +} diff --git a/pkg/vfs/backup_test.go b/pkg/vfs/backup_test.go index 0a89c9e9e2ce..583f954d8117 100644 --- a/pkg/vfs/backup_test.go +++ b/pkg/vfs/backup_test.go @@ -68,7 +68,7 @@ func TestRotate(t *testing.T) { } func TestBackup(t *testing.T) { - v, blob := createTestVFS(nil) + v, blob := createTestVFS(nil, "") go Backup(v.Meta, blob, time.Millisecond*100, false) time.Sleep(time.Millisecond * 100) diff --git a/pkg/vfs/fill_test.go b/pkg/vfs/fill_test.go index 4422bb4d95ce..6da86d04e447 100644 --- a/pkg/vfs/fill_test.go +++ b/pkg/vfs/fill_test.go @@ -24,7 +24,7 @@ import ( ) func TestFill(t *testing.T) { - v, _ := createTestVFS(nil) + v, _ := createTestVFS(nil, "") ctx := NewLogContext(meta.Background) entry, _ := v.Mkdir(ctx, 1, "test", 0777, 022) fe, fh, _ := v.Create(ctx, entry.Inode, "file", 0644, 0, uint32(os.O_WRONLY)) diff --git a/pkg/vfs/handle.go b/pkg/vfs/handle.go index aff941c5bdf6..fc56d45e59f3 100644 --- a/pkg/vfs/handle.go +++ b/pkg/vfs/handle.go @@ -35,10 +35,8 @@ type handle struct { fh uint64 // for dir - children []*meta.Entry - readAt time.Time - readOff int - index map[string]int + dirHandler meta.DirHandler + readAt time.Time // for file flags uint32 @@ -219,6 +217,10 @@ func (v *VFS) releaseHandle(inode Ino, fh uint64) { hs := v.handles[inode] for i, f := range hs { if f.fh == fh { + if hs[i].dirHandler != nil { + hs[i].dirHandler.Close() + hs[i].dirHandler = nil + } if i+1 < len(hs) { hs[i] = hs[len(hs)-1] } @@ -268,26 +270,11 @@ func (v *VFS) invalidateDirHandle(parent Ino, name string, inode Ino, attr *Attr v.hanleM.Unlock() for _, h := range hs { h.Lock() - if h.children != nil && h.index != nil { + if h.dirHandler != nil { if inode > 0 { - h.children = append(h.children, &meta.Entry{ - Inode: inode, - Name: []byte(name), - Attr: attr, - }) - h.index[name] = len(h.children) - 1 + h.dirHandler.Insert(inode, name, attr) } else { - i, ok := h.index[name] - if ok { - delete(h.index, name) - h.children[i].Inode = 0 // invalid - if i >= h.readOff { - // not read yet, remove it - h.children[i] = h.children[len(h.children)-1] - h.index[string(h.children[i].Name)] = i - h.children = h.children[:len(h.children)-1] - } - } + h.dirHandler.Delete(name) } } h.Unlock() diff --git a/pkg/vfs/vfs.go b/pkg/vfs/vfs.go index 43dfa200230a..e72256d03ed0 100644 --- a/pkg/vfs/vfs.go +++ b/pkg/vfs/vfs.go @@ -424,45 +424,31 @@ func (v *VFS) Readdir(ctx Context, ino Ino, size uint32, off int, fh uint64, plu h.Lock() defer h.Unlock() - if h.children == nil || off == 0 { - var inodes []*meta.Entry - h.readAt = time.Now() - err = v.Meta.Readdir(ctx, ino, 1, &inodes) - if err == syscall.EACCES { - err = v.Meta.Readdir(ctx, ino, 0, &inodes) - } - if err != 0 { - return + if h.dirHandler == nil || off == 0 { + if h.dirHandler != nil { + h.dirHandler.Close() + h.dirHandler = nil } + var initEntries []*meta.Entry if ino == rootID && !v.Conf.HideInternal { - // add internal nodes for _, node := range internalNodes[1:] { - inodes = append(inodes, &meta.Entry{ + initEntries = append(initEntries, &meta.Entry{ Inode: node.inode, Name: []byte(node.name), Attr: node.attr, }) } } - if v.Conf.Meta.SortDir { - sort.SliceStable(inodes[2:], func(i, j int) bool { - return string(inodes[i+2].Name) < string(inodes[j+2].Name) - }) - } - h.children = inodes - - index := make(map[string]int) - for i, e := range inodes { - index[string(e.Name)] = i + h.readAt = time.Now() + if h.dirHandler, err = v.Meta.NewDirHandler(ctx, ino, plus, initEntries); err != 0 { + return } - h.index = index } - if off < len(h.children) { - entries = h.children[off:] - // we don't know how much of them will be sent, assume all of them - h.readOff = len(h.children) - 1 + if entries, err = h.dirHandler.List(ctx, off); err != 0 { + return } readAt = h.readAt + logger.Debugf("readdir: [%d:%d] %d entries, offset=%d", ino, fh, len(entries), off) return } @@ -473,7 +459,9 @@ func (v *VFS) UpdateReaddirOffset(ctx Context, ino Ino, fh uint64, off int) { } h.Lock() defer h.Unlock() - h.readOff = off + if h.dirHandler != nil { + h.dirHandler.Read(off) + } } func (v *VFS) Releasedir(ctx Context, ino Ino, fh uint64) int { diff --git a/pkg/vfs/vfs_test.go b/pkg/vfs/vfs_test.go index 4d688e871bfd..3e69dd2198e1 100644 --- a/pkg/vfs/vfs_test.go +++ b/pkg/vfs/vfs_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "log" + "math/rand" "reflect" "slices" "strings" @@ -34,19 +35,23 @@ import ( "github.com/juicedata/juicefs/pkg/object" "github.com/juicedata/juicefs/pkg/utils" "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" "golang.org/x/sys/unix" ) // nolint:errcheck -func createTestVFS(applyMetaConfOption func(metaConfig *meta.Config)) (*VFS, object.ObjectStorage) { +func createTestVFS(applyMetaConfOption func(metaConfig *meta.Config), metaUri string) (*VFS, object.ObjectStorage) { mp := "/jfs" metaConf := meta.DefaultConf() metaConf.MountPoint = mp if applyMetaConfOption != nil { applyMetaConfOption(metaConf) } - m := meta.NewClient("memkv://", metaConf) + if metaUri == "" { + metaUri = "memkv://" + } + m := meta.NewClient(metaUri, metaConf) format := &meta.Format{ Name: "test", UUID: uuid.New().String(), @@ -82,7 +87,7 @@ func createTestVFS(applyMetaConfOption func(metaConfig *meta.Config)) (*VFS, obj } func TestVFSBasic(t *testing.T) { - v, _ := createTestVFS(nil) + v, _ := createTestVFS(nil, "") ctx := NewLogContext(meta.NewContext(10, 1, []uint32{2, 3})) if st, e := v.StatFS(ctx, 1); e != 0 { @@ -191,7 +196,7 @@ func TestVFSBasic(t *testing.T) { } func TestVFSIO(t *testing.T) { - v, _ := createTestVFS(nil) + v, _ := createTestVFS(nil, "") ctx := NewLogContext(meta.Background) fe, fh, e := v.Create(ctx, 1, "file", 0755, 0, syscall.O_RDWR) if e != 0 { @@ -356,7 +361,7 @@ func TestVFSIO(t *testing.T) { } func TestVFSXattrs(t *testing.T) { - v, _ := createTestVFS(nil) + v, _ := createTestVFS(nil, "") ctx := NewLogContext(meta.Background) fe, e := v.Mkdir(ctx, 1, "xattrs", 0755, 0) if e != 0 { @@ -498,7 +503,7 @@ func TestSetattrStr(t *testing.T) { } func TestVFSLocks(t *testing.T) { - v, _ := createTestVFS(nil) + v, _ := createTestVFS(nil, "") ctx := NewLogContext(meta.Background) fe, fh, e := v.Create(ctx, 1, "flock", 0644, 0, syscall.O_RDWR) if e != 0 { @@ -599,7 +604,7 @@ func TestVFSLocks(t *testing.T) { } func TestInternalFile(t *testing.T) { - v, _ := createTestVFS(nil) + v, _ := createTestVFS(nil, "") ctx := NewLogContext(meta.Background) // list internal files fh, _ := v.Opendir(ctx, 1, 0) @@ -869,26 +874,54 @@ func TestInternalFile(t *testing.T) { } func TestReaddirCache(t *testing.T) { - v, _ := createTestVFS(nil) + engines := map[string]string{ + "kv": "", + "db": "sqlite3://", + "redis": "redis://127.0.0.1:6379/2", + } + for typ, metaUri := range engines { + testReaddirCache(t, metaUri, typ, 20) + testReaddirCache(t, metaUri, typ, 4096) + } +} + +func testReaddirCache(t *testing.T, metaUri string, typ string, batchNum int) { + old := meta.DirBatchNum + meta.DirBatchNum[typ] = batchNum + defer func() { + meta.DirBatchNum = old + }() + + v, _ := createTestVFS(nil, metaUri) ctx := NewLogContext(meta.Background) + entry, st := v.Mkdir(ctx, 1, "testdir", 0777, 022) if st != 0 { t.Fatalf("mkdir testdir: %s", st) } parent := entry.Inode - for i := 0; i < 100; i++ { + for i := 0; i <= 100; i++ { _, _ = v.Mkdir(ctx, parent, fmt.Sprintf("d%d", i), 0777, 022) } + + defer func() { + for i := 0; i <= 120; i++ { + _ = v.Rmdir(ctx, parent, fmt.Sprintf("d%d", i)) + } + _ = v.Rmdir(ctx, 1, "testdir") + }() + fh, _ := v.Opendir(ctx, parent, 0) - _, _ = v.Mkdir(ctx, parent, fmt.Sprintf("d%d", 100), 0777, 022) defer v.Releasedir(ctx, parent, fh) - var off = 20 + initNum, num := 2, 20 var files = make(map[string]bool) // read first 20 - entries, _, _ := v.Readdir(ctx, parent, 20, 0, fh, true) - for _, e := range entries[:off] { + entries, _, _ := v.Readdir(ctx, parent, 20, initNum, fh, true) + for _, e := range entries[:num] { files[string(e.Name)] = true } + + off := num + initNum v.UpdateReaddirOffset(ctx, parent, fh, off) for i := 0; i < 100; i += 10 { name := fmt.Sprintf("d%d", i) @@ -939,9 +972,15 @@ func TestReaddirCache(t *testing.T) { } func TestVFSReadDirSort(t *testing.T) { + for _, metaUri := range []string{"", "sqlite3://", "redis://127.0.0.1:6379/2"} { + testVFSReadDirSort(t, metaUri) + } +} + +func testVFSReadDirSort(t *testing.T, metaUri string) { v, _ := createTestVFS(func(metaConfig *meta.Config) { metaConfig.SortDir = true - }) + }, metaUri) ctx := NewLogContext(meta.Background) entry, st := v.Mkdir(ctx, 1, "testdir", 0777, 022) if st != 0 { @@ -951,6 +990,12 @@ func TestVFSReadDirSort(t *testing.T) { for i := 0; i < 100; i++ { _, _ = v.Mkdir(ctx, parent, fmt.Sprintf("d%d", i), 0777, 022) } + defer func() { + for i := 0; i < 100; i++ { + _ = v.Rmdir(ctx, parent, fmt.Sprintf("d%d", i)) + } + _ = v.Rmdir(ctx, 1, "testdir") + }() fh, _ := v.Opendir(ctx, parent, 0) entries1, _, _ := v.Readdir(ctx, parent, 60, 10, fh, true) sorted := slices.IsSortedFunc(entries1, func(i, j *meta.Entry) int { @@ -970,3 +1015,138 @@ func TestVFSReadDirSort(t *testing.T) { } v.Releasedir(ctx, parent, fh2) } + +func testReaddirBatch(t *testing.T, metaUri string, typ string, batchNum int) { + old := meta.DirBatchNum + meta.DirBatchNum[typ] = batchNum + defer func() { + meta.DirBatchNum = old + }() + + n, extra := 5, 40 + + v, _ := createTestVFS(nil, metaUri) + ctx := NewLogContext(meta.Background) + + entry, st := v.Mkdir(ctx, 1, "testdir", 0777, 022) + if st != 0 { + t.Fatalf("mkdir testdir: %s", st) + } + + parent := entry.Inode + for i := 0; i < n*batchNum+extra; i++ { + _, _ = v.Mkdir(ctx, parent, fmt.Sprintf("d%d", i), 0777, 022) + } + defer func() { + for i := 0; i < n*batchNum+extra; i++ { + _ = v.Rmdir(ctx, parent, fmt.Sprintf("d%d", i)) + } + v.Rmdir(ctx, 1, "testdir") + }() + + fh, _ := v.Opendir(ctx, parent, 0) + defer v.Releasedir(ctx, parent, fh) + entries1, _, _ := v.Readdir(ctx, parent, 0, 0, fh, true) + require.NotNil(t, entries1) + require.Equal(t, 2+batchNum, len(entries1)) // init entries: "." and ".." + + entries2, _, _ := v.Readdir(ctx, parent, 0, 2, fh, true) + require.NotNil(t, entries2) + require.Equal(t, batchNum, len(entries2)) + + entries3, _, _ := v.Readdir(ctx, parent, 0, 2+batchNum, fh, true) + require.NotNil(t, entries3) + require.Equal(t, batchNum, len(entries3)) + + // reach the end + entries4, _, _ := v.Readdir(ctx, parent, 0, n*batchNum+extra+2, fh, true) + require.NotNil(t, entries4) + require.Equal(t, 0, len(entries4)) + + // skip-style readdir + entries5, _, _ := v.Readdir(ctx, parent, 0, n*batchNum+2, fh, true) + require.NotNil(t, entries5) + require.Equal(t, extra, len(entries5)) + + entries6, _, _ := v.Readdir(ctx, parent, 0, 2, fh, true) + require.Equal(t, len(entries2), len(entries6)) + for i := 0; i < len(entries2); i++ { + require.Equal(t, entries2[i].Inode, entries6[i].Inode) + } + + // dir seak + entries7, _, _ := v.Readdir(ctx, parent, 0, n*batchNum+2-20, fh, true) + require.True(t, reflect.DeepEqual(entries5, entries7[20:])) +} + +func TestReadDirSteaming(t *testing.T) { + engines := map[string]string{ + "kv": "", + "db": "sqlite3://", + "redis": "redis://127.0.0.1:6379/2", + } + for typ, metaUri := range engines { + testReaddirBatch(t, metaUri, typ, 100) + testReaddirBatch(t, metaUri, typ, 4096) + } +} + +func TestReaddir(t *testing.T) { + engines := map[string]string{ + "kv": "", + "db": "sqlite3://", + "redis": "redis://127.0.0.1:6379/2", + } + for typ, metaUri := range engines { + batchNum := meta.DirBatchNum[typ] + extra := rand.Intn(batchNum) + testReaddir(t, metaUri, 20, 0) + testReaddir(t, metaUri, 20, 5) + testReaddir(t, metaUri, 2*batchNum, 0) + testReaddir(t, metaUri, 2*batchNum, extra) + testReaddir(t, metaUri, 4*batchNum, 0) + testReaddir(t, metaUri, 4*batchNum, 2*batchNum+extra) + } +} + +func testReaddir(t *testing.T, metaUri string, dirNum int, offset int) { + v, _ := createTestVFS(nil, metaUri) + ctx := NewLogContext(meta.Background) + + entry, st := v.Mkdir(ctx, 1, "testdir", 0777, 022) + if st != 0 { + t.Fatalf("mkdir testdir: %s", st) + } + + parent := entry.Inode + for i := 0; i < dirNum; i++ { + _, _ = v.Mkdir(ctx, parent, fmt.Sprintf("d%d", i), 0777, 022) + } + defer func() { + for i := 0; i < dirNum; i++ { + _ = v.Rmdir(ctx, parent, fmt.Sprintf("d%d", i)) + } + v.Rmdir(ctx, 1, "testdir") + }() + + fh, _ := v.Opendir(ctx, parent, 0) + defer v.Releasedir(ctx, parent, fh) + + readAll := func(ctx Context, parent Ino, fh uint64, off int) []*meta.Entry { + var entries []*meta.Entry + for { + ents, _, st := v.Readdir(ctx, parent, 0, off, fh, true) + require.Equal(t, st, syscall.Errno(0)) + if len(ents) == 0 { + break + } + off += len(ents) + entries = append(entries, ents...) + } + return entries + } + + entriesOne := readAll(ctx, parent, fh, offset) + entriesTwo := readAll(ctx, parent, fh, offset) + require.True(t, reflect.DeepEqual(entriesOne, entriesTwo)) +}