Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add indexfile process commands #117

Merged
merged 1 commit into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions states/autocomplete/auto_complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package autocomplete

import (
"fmt"
"os"
"strings"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -39,6 +40,9 @@ func (c *cmdCandidate) args() []acCandidate {
if name == "file" {
result = append(result, &fileCandidate{previousCandidates: []acCandidate{}})
}
if name == "directory" {
result = append(result, &fileCandidate{previousCandidates: []acCandidate{}, validator: func(info os.FileInfo) bool { return info.IsDir() }})
}
}
}
return result
Expand Down
4 changes: 4 additions & 0 deletions states/autocomplete/auto_complete_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

type fileCandidate struct {
previousCandidates []acCandidate
validator func(file os.FileInfo) bool
}

func (c *fileCandidate) Match(input cComp) bool {
Expand Down Expand Up @@ -63,6 +64,9 @@ func (c *fileCandidate) Suggest(target cComp) map[string]string {
result := make(map[string]string)
for _, f := range fs {
if strings.HasPrefix(f.Name(), part) {
if c.validator != nil && !c.validator(f) {
continue
}
result[path.Join(d, f.Name())] = ""
}
}
Expand Down
12 changes: 12 additions & 0 deletions states/load_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,15 @@ func testFile(file string) error {
}
return nil
}

func testFolder(folder string) error {
fi, err := os.Stat(folder)
if err != nil {
return err
}

if !fi.IsDir() {
return errors.Newf("path is not dir %s", folder)
}
return nil
}
338 changes: 338 additions & 0 deletions states/parse_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,338 @@
package states

import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io/fs"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/cockroachdb/errors"
"github.com/milvus-io/birdwatcher/storage"
"github.com/spf13/cobra"
)

func GetParseIndexParamCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "parse-indexparam [file]",
Short: "parse index params",
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 1 {
fmt.Println("should provide only one file path")
return
}
f, err := openBackupFile(args[0])
if err != nil {
fmt.Println(err.Error())
return
}
defer f.Close()

r, evt, err := storage.NewIndexReader(f)
if err != nil {
fmt.Println(err.Error())
return
}
extra := make(map[string]any)
json.Unmarshal(evt.ExtraBytes, &extra)
key := extra["key"].(string)
if key != "indexParams" && key != "SLICE_META" {
fmt.Println("index data file found", extra)
return
}
data, err := r.NextEventReader(f, evt.PayloadDataType)
if err != nil {
fmt.Println(err.Error())
return
}

if len(data) != 1 {
fmt.Println("event data length is not 1")
return
}

switch key {
case "indexParams":
params := make(map[string]string)
json.Unmarshal(data[0], &params)
fmt.Println(params)
case "SLICE_META":
fmt.Println(string(data[0]))
}

},
}
return cmd
}

func GetValidateIndexFilesCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "validate-indexfiles [directory]",
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 1 {
fmt.Println("should provide only one file path")
return
}

folder := args[0]
if err := testFolder(folder); err != nil {
fmt.Println(err.Error())
return
}

filepath.WalkDir(folder, func(fp string, d fs.DirEntry, err error) error {
if d.IsDir() {
idxParam := path.Join(fp, "indexParams")
if info, err := os.Stat(idxParam); err == nil {
if !info.IsDir() {
bs, err := readIndexFile(idxParam, func(key string) bool {
return key == "indexParams"
})
if err != nil {
return nil
}
params := make(map[string]string)
json.Unmarshal(bs, &params)
indexType := params["index_type"]
fmt.Printf("Path:[%s] IndexParam file found, index type is %s\n", fp, indexType)
validateIndexFolder(fp, params)
}
} else if errors.Is(err, os.ErrNotExist) {
// file not exist
} else {
fmt.Println(err.Error())
}
}
return nil
})
},
}
return cmd
}

func validateIndexFolder(fp string, params map[string]string) {
fmt.Println(params)
indexType := params["index_type"]
var indexSize int64
var dataSize int64
filepath.WalkDir(fp, func(file string, d os.DirEntry, err error) error {
switch indexType {
case "":
fallthrough
case "STL_SORT":
//fmt.Println(file, d.Name())
switch d.Name() {
case "index_length":
bs, err := readIndexFile(file, func(key string) bool { return key == "index_length" })
if err != nil {
fmt.Println(err.Error())
return nil
}
indexSize = int64(binary.LittleEndian.Uint64(bs))
case "index_data":
bs, err := readIndexFile(file, func(key string) bool { return key == "index_data" })
if err != nil {
fmt.Println(err.Error())
return nil
}
dataSize = int64(len(bs))
}
case "Trie":
switch d.Name() {
case "marisa_trie_index":
bs, err := readIndexFile(file, func(key string) bool { return key == "marisa_trie_index" })
if err != nil {
fmt.Println(err.Error())
return nil
}
fmt.Printf("%s: size %d\n", d.Name(), len(bs))
case "marisa_trie_str_ids":
bs, err := readIndexFile(file, func(key string) bool { return key == "marisa_trie_str_ids" })
if err != nil {
fmt.Println(err.Error())
return nil
}
fmt.Printf("%s: size %d\n", d.Name(), len(bs))
}
}

return nil
})

switch indexType {
case "":
fallthrough
case "STL_SORT":
fmt.Printf("indexSize: %d, dataSize:%d, multipler: %f\n", indexSize, dataSize, float64(dataSize)/float64(indexSize))
}

}

func GetAssembleIndexFilesCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "assemble-indexfiles [directory]",
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 1 {
fmt.Println("should provide only one file path")
return
}

folder := args[0]
if err := testFolder(folder); err != nil {
fmt.Println(err.Error())
return
}

sliceMetaFile := path.Join(folder, "SLICE_META")
prefix, num, err := tryParseSliceMeta(sliceMetaFile)
if err != nil {
fmt.Println("failed to parse SLICE_META", err.Error())
return
}

fmt.Printf("original file name: %s, slice num: %d\n", prefix, num)

m := make(map[int64]struct{})

filepath.Walk(folder, func(file string, info os.FileInfo, err error) error {
file = path.Base(file)
if !strings.HasPrefix(file, prefix+"_") {
fmt.Println("skip file", file)
return nil
}

suffix := file[len(prefix)+1:]
idx, err := strconv.ParseInt(suffix, 10, 64)
if err != nil {
fmt.Println(err.Error())
return nil
}

m[idx] = struct{}{}
return nil
})
if len(m) != num {
fmt.Println("slice files not complete", m)
return
}

outputPath := fmt.Sprintf("%s_%s", prefix, time.Now().Format("060102150406"))
output, err := os.OpenFile(outputPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
fmt.Println(err.Error())
return
}
defer output.Close()
totalLen := int64(0)

for i := 0; i < num; i++ {
key := fmt.Sprintf("%s_%d", prefix, i)
fmt.Print("processing file:", key)
data, err := readIndexFile(path.Join(folder, key), func(metaKey string) bool {
return metaKey == key
})
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Println(" read data size:", len(data), hrSize(int64(len(data))))

_, err = output.Write(data)
if err != nil {
fmt.Println(err.Error())
return
}
totalLen += int64(len(data))
}
fmt.Printf("index file write to %s success, total len %d\n", outputPath, totalLen)
},
}
return cmd
}

func hrSize(size int64) string {
sf := float64(size)
units := []string{"Bytes", "KB", "MB", "GB"}
idx := 0
for sf > 1024.0 && idx < 3 {
sf /= 1024.0
idx++
}
return fmt.Sprintf("%f %s", sf, units[idx])
}

func tryParseSliceMeta(file string) (string, int, error) {
data, err := readIndexFile(file, func(key string) bool {
if key != "SLICE_META" {
fmt.Println("failed meta indicates file content not SLICE_META but", key)
return false
}
return true
})
if err != nil {
fmt.Println(err.Error())
return "", 0, err
}
meta := &SliceMeta{}
raw := bytes.Trim(data, "\x00")
err = json.Unmarshal(raw, meta)
if err != nil {
fmt.Println("failed to unmarshal", err.Error())
return "", 0, err
}

if len(meta.Meta) != 1 {
return "", 0, errors.Newf("slice_meta item is not 1 but %d", len(meta.Meta))
}

fmt.Printf("SLICE_META total_num parsed: %d\n", meta.Meta[0].TotalLength)
return meta.Meta[0].Name, meta.Meta[0].SliceNum, nil
}

type SliceMeta struct {
Meta []struct {
Name string `json:"name"`
SliceNum int `json:"slice_num"`
TotalLength int64 `json:"total_len"`
} `json:"meta"`
}

func readIndexFile(file string, validKey func(key string) bool) ([]byte, error) {
if err := testFile(file); err != nil {
fmt.Println("failed to test file", file)
return nil, err
}

f, err := openBackupFile(file)
if err != nil {
return nil, err
}
defer f.Close()

r, evt, err := storage.NewIndexReader(f)
if err != nil {
return nil, err
}
extra := make(map[string]any)
json.Unmarshal(evt.ExtraBytes, &extra)
key := extra["key"].(string)
if !validKey(key) {
return nil, errors.New("file meta key not valid")
}

data, err := r.NextEventReader(f, evt.PayloadDataType)
if err != nil {
return nil, err
}
if len(data) != 1 {
return nil, errors.Newf("index file suppose to contain only one block but got %d", len(data))
}

return data[0], nil
}
Loading