Skip to content

Commit

Permalink
feat: hdfs glob (#284)
Browse files Browse the repository at this point in the history
  • Loading branch information
veezhang authored Sep 1, 2023
1 parent 61c3b00 commit f0347ba
Show file tree
Hide file tree
Showing 9 changed files with 538 additions and 134 deletions.
212 changes: 106 additions & 106 deletions docs/configuration-reference.md

Large diffs are not rendered by default.

37 changes: 37 additions & 0 deletions pkg/config/base/source.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package configbase

import (
"io/fs"
"os"

"github.com/vesoft-inc/nebula-importer/v4/pkg/reader"
"github.com/vesoft-inc/nebula-importer/v4/pkg/source"
)
Expand Down Expand Up @@ -33,3 +36,37 @@ func (s *Source) BuildSourceAndReader(opts ...reader.Option) (
brr := reader.NewBatchRecordReader(rr, opts...)
return src, brr, nil
}

func (s *Source) Glob() ([]*Source, bool, error) {
sourceConfig := s.SourceConfig
src, err := sourceNew(&sourceConfig)
if err != nil {
return nil, false, err
}

g, ok := src.(source.Globber)
if !ok {
// Do not support glob.
return nil, false, nil
}
defer src.Close()

cs, err := g.Glob()
if err != nil {
return nil, true, err
}

if len(cs) == 0 {
return nil, true, &os.PathError{Op: "open", Path: src.Name(), Err: fs.ErrNotExist}
}

ss := make([]*Source, 0, len(cs))
for _, c := range cs {
cpy := *s
cpySourceConfig := c.Clone()
cpy.SourceConfig = *cpySourceConfig
ss = append(ss, &cpy)
}

return ss, true, nil
}
149 changes: 149 additions & 0 deletions pkg/config/base/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,153 @@ var _ = Describe("Source", func() {
Expect(brr).To(BeNil())
})
})

Describe(".Glob", func() {
var (
s *Source
ctrl *gomock.Controller
mockSource *source.MockSource
mockGlobber *source.MockGlobber
patches *gomonkey.Patches
)
BeforeEach(func() {
ctrl = gomock.NewController(GinkgoT())
mockSource = source.NewMockSource(ctrl)
mockGlobber = source.NewMockGlobber(ctrl)
patches = gomonkey.NewPatches()
s = &Source{
SourceConfig: source.Config{
Local: &source.LocalConfig{
Path: "path*",
},
CSV: &source.CSVConfig{
Delimiter: ",",
},
},
Batch: 7,
}
})
AfterEach(func() {
ctrl.Finish()
patches.Reset()
})

It("failed", func() {
patches.ApplyGlobalVar(&sourceNew, func(_ *source.Config) (source.Source, error) {
return nil, stderrors.New("test error")
})
ss, isSupportGlob, err := s.Glob()
Expect(err).To(HaveOccurred())
Expect(isSupportGlob).To(Equal(false))
Expect(ss).To(BeNil())
})

It("unsupported", func() {
patches.ApplyGlobalVar(&sourceNew, func(_ *source.Config) (source.Source, error) {
return mockSource, nil
})
ss, isSupportGlob, err := s.Glob()
Expect(err).NotTo(HaveOccurred())
Expect(isSupportGlob).To(Equal(false))
Expect(ss).To(BeNil())
})

It("failed at glob", func() {
patches.ApplyGlobalVar(&sourceNew, func(_ *source.Config) (source.Source, error) {
return struct {
*source.MockSource
*source.MockGlobber
}{
MockSource: mockSource,
MockGlobber: mockGlobber,
}, nil
})
mockGlobber.EXPECT().Glob().Return(nil, stderrors.New("test error"))
mockSource.EXPECT().Close().Return(nil)

ss, isSupportGlob, err := s.Glob()
Expect(err).To(HaveOccurred())
Expect(isSupportGlob).To(Equal(true))
Expect(ss).To(BeNil())
})

It("glob return empty", func() {
patches.ApplyGlobalVar(&sourceNew, func(_ *source.Config) (source.Source, error) {
return struct {
*source.MockSource
*source.MockGlobber
}{
MockSource: mockSource,
MockGlobber: mockGlobber,
}, nil
})
mockGlobber.EXPECT().Glob().Return(nil, nil)
mockSource.EXPECT().Name().AnyTimes().Return("source name")
mockSource.EXPECT().Close().Return(nil)

ss, isSupportGlob, err := s.Glob()
Expect(err).To(HaveOccurred())
Expect(isSupportGlob).To(Equal(true))
Expect(ss).To(BeNil())
})

It("glob return many", func() {
patches.ApplyGlobalVar(&sourceNew, func(_ *source.Config) (source.Source, error) {
return struct {
*source.MockSource
*source.MockGlobber
}{
MockSource: mockSource,
MockGlobber: mockGlobber,
}, nil
})
mockGlobber.EXPECT().Glob().Return([]*source.Config{
{
Local: &source.LocalConfig{
Path: "path1",
},
CSV: &source.CSVConfig{
Delimiter: ",",
},
},
{
Local: &source.LocalConfig{
Path: "path2",
},
CSV: &source.CSVConfig{
Delimiter: ",",
},
},
}, nil)
mockSource.EXPECT().Close().Return(nil)

ss, isSupportGlob, err := s.Glob()
Expect(err).NotTo(HaveOccurred())
Expect(isSupportGlob).To(Equal(true))
Expect(ss).To(Equal([]*Source{
{
SourceConfig: source.Config{
Local: &source.LocalConfig{
Path: "path1",
},
CSV: &source.CSVConfig{
Delimiter: ",",
},
},
Batch: 7,
},
{
SourceConfig: source.Config{
Local: &source.LocalConfig{
Path: "path2",
},
CSV: &source.CSVConfig{
Delimiter: ",",
},
},
Batch: 7,
},
}))
})
})
})
26 changes: 10 additions & 16 deletions pkg/config/v3/source.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package configv3

import (
"io/fs"
"os"
"path/filepath"

"github.com/vesoft-inc/nebula-importer/v4/pkg/client"
Expand Down Expand Up @@ -78,24 +76,20 @@ func (ss Sources) OptimizePath(configPath string) error {
func (ss *Sources) OptimizePathWildCard() error {
nss := make(Sources, 0, len(*ss))
for i := range *ss {
if (*ss)[i].SourceConfig.Local != nil {
paths, err := filepath.Glob((*ss)[i].SourceConfig.Local.Path)
if err != nil {
return err
}
if len(paths) == 0 {
return &os.PathError{Op: "open", Path: (*ss)[i].SourceConfig.Local.Path, Err: fs.ErrNotExist}
}
ssCpy := (*ss)[i]

for _, path := range paths {
cpy := (*ss)[i]
cpySourceConfig := cpy.SourceConfig.Clone()
cpy.SourceConfig = *cpySourceConfig
cpy.SourceConfig.Local.Path = path
baseSources, isSupportGlob, err := (*ss)[i].Glob()
if err != nil {
return err
}
if isSupportGlob {
for j := range baseSources {
cpy := ssCpy
cpy.Source = *baseSources[j]
nss = append(nss, cpy)
}
} else {
nss = append(nss, (*ss)[i])
nss = append(nss, ssCpy)
}
}
*ss = nss
Expand Down
98 changes: 98 additions & 0 deletions pkg/source/glob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package source

import (
"path/filepath"
"sort"
"strings"
)

type sourceGlobInterface interface {
IsDir(dir string) (isDir bool, err error)
Readdirnames(dir string) ([]string, error)
}

func sourceGlob(g sourceGlobInterface, pattern string) (matches []string, err error) {
return sourceGlobWithLimit(g, pattern, 0)
}

func sourceGlobWithLimit(g sourceGlobInterface, pattern string, depth int) (matches []string, err error) {
// This limit is used prevent stack exhaustion issues. See CVE-2022-30632.
const pathSeparatorsLimit = 10000
if depth == pathSeparatorsLimit {
return nil, filepath.ErrBadPattern
}

// Check pattern is well-formed.
if _, err = filepath.Match(pattern, ""); err != nil {
return nil, err
}
if !sourceGlobHas(pattern) {
return []string{pattern}, nil
}

dir, file := filepath.Split(pattern)
dir = cleanGlobPath(dir)

if !sourceGlobHas(dir) {
return sourceGlobDir(g, dir, file, nil)
}

// Prevent infinite recursion.
if dir == pattern {
return nil, filepath.ErrBadPattern
}

var m []string
m, err = sourceGlobWithLimit(g, dir, depth+1)
if err != nil {
return nil, err
}
for _, d := range m {
matches, err = sourceGlobDir(g, d, file, matches)
if err != nil {
return nil, err
}
}
return matches, nil
}

// cleanGlobPath prepares path for glob matching.
func cleanGlobPath(path string) string {
switch path {
case "":
return "."
case string(filepath.Separator):
// do nothing to the path
return path
default:
return path[0 : len(path)-1] // chop off trailing separator
}
}

func sourceGlobDir(g sourceGlobInterface, dir, pattern string, matches []string) (m []string, e error) {
m = matches
if isDir, err := g.IsDir(dir); err != nil || !isDir {
return m, err
}

names, err := g.Readdirnames(dir)
if err != nil {
return nil, err
}
sort.Strings(names)

for _, n := range names {
matched, err := filepath.Match(pattern, n)
if err != nil {
return m, err
}
if matched {
m = append(m, filepath.Join(dir, n))
}
}
return m, nil
}

func sourceGlobHas(path string) bool {
return strings.ContainsAny(path, `*?[\`)
}
Loading

0 comments on commit f0347ba

Please sign in to comment.