Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

meta: support dir stream #5162

Merged
merged 13 commits into from
Oct 8, 2024
183 changes: 183 additions & 0 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package meta

import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -47,6 +48,11 @@ const (
)

var (
DirBatchNum = map[string]int{
"redis": 4096,
"kv": 4096,
"db": 40960,
}
maxCompactSlices = 1000
maxSlices = 2500
inodeNeedPrefetch = uint64(utils.JitterIt(inodeBatch * 0.1)) // Add jitter to reduce probability of txn conflicts
Expand Down Expand Up @@ -125,6 +131,8 @@ type engine interface {
doSetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.Rule) syscall.Errno
doGetFacl(ctx Context, ino Ino, aclType uint8, aclId uint32, rule *aclAPI.Rule) syscall.Errno
cacheACLs(ctx Context) error

newDirHandler(inode Ino, plus bool, entries []*Entry) DirHandler
}

type trashSliceScan func(ss []Slice, ts int64) (clean bool, err error)
Expand Down Expand Up @@ -2889,3 +2897,178 @@ func inGroup(ctx Context, gid uint32) bool {
}
return false
}

type DirHandler interface {
List(ctx Context, offset int) ([]*Entry, syscall.Errno)
Insert(inode Ino, name string, attr *Attr)
Delete(name string)
Read(offset int)
Close()
}

func (m *baseMeta) NewDirHandler(ctx Context, inode Ino, plus bool, initEntries []*Entry) (DirHandler, syscall.Errno) {
var attr Attr
var st syscall.Errno
defer func() {
if st == 0 {
m.touchAtime(ctx, inode, &attr)
}
}()

inode = m.checkRoot(inode)
if st = m.GetAttr(ctx, inode, &attr); st != 0 {
return nil, st
}
defer m.timeit("NewDirHandler", time.Now())
var mmask uint8 = MODE_MASK_R
if plus {
mmask |= MODE_MASK_X
}

if st = m.Access(ctx, inode, mmask, &attr); st != 0 {
return nil, st
}
if inode == m.root {
attr.Parent = m.root
}

initEntries = append(initEntries, &Entry{
Inode: inode,
Name: []byte("."),
Attr: &Attr{Typ: TypeDirectory},
}, &Entry{
Inode: attr.Parent,
Name: []byte(".."),
Attr: &Attr{Typ: TypeDirectory},
})

return m.en.newDirHandler(inode, plus, initEntries), 0
}

type dirBatch struct {
isEnd bool
offset int
cursor interface{}
maxName []byte
entries []*Entry
indexes map[string]int
}

func (b *dirBatch) contain(offset int) bool {
if b == nil {
return false
}
return b.offset <= offset && offset < b.offset+len(b.entries) || (len(b.entries) == 0 && b.offset == offset)
}

func (b *dirBatch) predecessor(offset int) bool {
return b.offset+len(b.entries) == offset
}

type dirFetcher func(ctx Context, inode Ino, cursor interface{}, offset, limit int, plus bool) (interface{}, []*Entry, error)

type dirHandler struct {
sync.Mutex
inode Ino
plus bool
initEntries []*Entry
batch *dirBatch
fetcher dirFetcher
readOff int
batchNum int
}

func (h *dirHandler) fetch(ctx Context, offset int) (*dirBatch, error) {
var cursor interface{}
if h.batch != nil && h.batch.predecessor(offset) {
if h.batch.isEnd {
return h.batch, nil
}
cursor = h.batch.cursor
}
nextCursor, entries, err := h.fetcher(ctx, h.inode, cursor, offset, h.batchNum, h.plus)
if err != nil {
return nil, err
}
if entries == nil {
entries = []*Entry{}
nextCursor = cursor
}
indexes := make(map[string]int, len(entries))
maxName := []byte("")
for i, e := range entries {
indexes[string(e.Name)] = i
if bytes.Compare(e.Name, maxName) > 0 {
maxName = e.Name
}
}
return &dirBatch{isEnd: len(entries) < h.batchNum, offset: offset, cursor: nextCursor, entries: entries, indexes: indexes, maxName: maxName}, nil
}

func (h *dirHandler) List(ctx Context, offset int) ([]*Entry, syscall.Errno) {
var prefix []*Entry
if offset < len(h.initEntries) {
prefix = h.initEntries[offset:]
offset = 0
} else {
offset -= len(h.initEntries)
}

var err error
h.Lock()
defer h.Unlock()
if !h.batch.contain(offset) {
h.batch, err = h.fetch(ctx, offset)
}

if err != nil {
return nil, errno(err)
}

h.readOff = h.batch.offset + len(h.batch.entries)
if len(prefix) > 0 {
return append(prefix, h.batch.entries...), 0
}
return h.batch.entries[offset-h.batch.offset:], 0
}

func (h *dirHandler) delete(name string) {
if h.batch == nil || len(h.batch.entries) == 0 {
return
}

if idx, ok := h.batch.indexes[name]; ok && idx >= h.readOff {
delete(h.batch.indexes, name)
n := len(h.batch.entries)
if idx < n-1 {
// TODO: sorted
h.batch.entries[idx] = h.batch.entries[n-1]
h.batch.indexes[string(h.batch.entries[idx].Name)] = idx
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
}
h.batch.entries = h.batch.entries[:n-1]
}
}

func (h *dirHandler) Insert(inode Ino, name string, attr *Attr) {
h.Lock()
defer h.Unlock()
if h.batch == nil {
return
}
if h.batch.isEnd || bytes.Compare([]byte(name), h.batch.maxName) < 0 {
// TODO: sorted
h.batch.entries = append(h.batch.entries, &Entry{Inode: inode, Name: []byte(name), Attr: attr})
h.batch.indexes[name] = len(h.batch.entries) - 1
}
}

func (h *dirHandler) Read(offset int) {
h.readOff = offset - len(h.initEntries)
}

func (h *dirHandler) Close() {
h.Lock()
h.batch = nil
h.readOff = 0
h.Unlock()
}
2 changes: 2 additions & 0 deletions pkg/meta/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ type Meta interface {
Link(ctx Context, inodeSrc, parent Ino, name string, attr *Attr) syscall.Errno
// Readdir returns all entries for given directory, which include attributes if plus is true.
Readdir(ctx Context, inode Ino, wantattr uint8, entries *[]*Entry) syscall.Errno
// NewDirHandler returns a stream for directory entries.
NewDirHandler(ctx Context, inode Ino, plus bool, initEntries []*Entry) (DirHandler, syscall.Errno)
// Create creates a file in a directory with given name.
Create(ctx Context, parent Ino, name string, mode uint16, cumask uint16, flags uint32, inode *Ino, attr *Attr) syscall.Errno
// Open checks permission on a node and track it as open.
Expand Down
Loading
Loading