Skip to content

Commit

Permalink
support batch remove segment meta with filters
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 committed Sep 25, 2024
1 parent b09603e commit b2fd4f7
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 60 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ require (
github.com/rs/xid v1.2.1 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
Expand Down
61 changes: 61 additions & 0 deletions states/etcd/common/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sort"
"time"

"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
clientv3 "go.etcd.io/etcd/client/v3"
Expand All @@ -22,6 +23,8 @@ const (
SegmentStatsMetaPrefix = "datacoord-meta/statslog"
)

var ErrReachMaxNumOfWalkSegment = errors.New("reach max number of the walked segments")

// ListSegmentsVersion list segment info as specified version.
func ListSegmentsVersion(ctx context.Context, cli clientv3.KV, basePath string, version string, filters ...func(*models.Segment) bool) ([]*models.Segment, error) {
prefix := path.Join(basePath, SegmentMetaPrefix) + "/"
Expand Down Expand Up @@ -332,3 +335,61 @@ func UpdateSegments(ctx context.Context, cli clientv3.KV, basePath string, colle
}
return nil
}

// WalkAllSegments walk all segment info from etcd with func
func WalkAllSegments(cli clientv3.KV, basePath string, filter func(*datapb.SegmentInfo) bool, op func(*datapb.SegmentInfo) error, limit int64) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()

cnt := int64(0)
return WalkWithPrefix(ctx, cli, path.Join(basePath, SegmentMetaPrefix)+"/", 1000, func(k []byte, v []byte) error {
info := &datapb.SegmentInfo{}
err := proto.Unmarshal(v, info)
if err != nil {
return err
}

if filter == nil || filter(info) {
err = op(info)
if err != nil {
return err
}
cnt++
if cnt >= limit {
return ErrReachMaxNumOfWalkSegment
}
}
return nil
})
}

func WalkWithPrefix(ctx context.Context, cli clientv3.KV, prefix string, paginationSize int, fn func([]byte, []byte) error) error {
batch := int64(paginationSize)
opts := []clientv3.OpOption{
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
clientv3.WithLimit(batch),
clientv3.WithRange(clientv3.GetPrefixRangeEnd(prefix)),
}

key := prefix
for {
resp, err := cli.Get(ctx, key, opts...)
if err != nil {
return err
}

for _, kv := range resp.Kvs {
if err = fn(kv.Key, kv.Value); err != nil {
return err
}
}

if !resp.More {
break
}
// move to next key
key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
}

return nil
}
33 changes: 1 addition & 32 deletions states/etcd/remove/collection_clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func CollectionCleanCommand(cli clientv3.KV, basePath string) *cobra.Command {
})

cleanMetaFn := func(ctx context.Context, prefix string, opts ...ExcludePrefixOptions) error {
return walkWithPrefix(ctx, cli, prefix, paginationSize, func(k []byte, v []byte) error {
return common.WalkWithPrefix(ctx, cli, prefix, paginationSize, func(k []byte, v []byte) error {
sKey := string(k)
for _, opt := range opts {
if opt(sKey) {
Expand Down Expand Up @@ -122,34 +122,3 @@ func CollectionCleanCommand(cli clientv3.KV, basePath string) *cobra.Command {
cmd.Flags().Bool("run", false, "flags indicating whether to execute removed command")
return cmd
}

func walkWithPrefix(ctx context.Context, cli clientv3.KV, prefix string, paginationSize int, fn func([]byte, []byte) error) error {
batch := int64(paginationSize)
opts := []clientv3.OpOption{
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
clientv3.WithLimit(batch),
clientv3.WithRange(clientv3.GetPrefixRangeEnd(prefix)),
}

key := prefix
for {
resp, err := cli.Get(ctx, key, opts...)
if err != nil {
return err
}

for _, kv := range resp.Kvs {
if err = fn(kv.Key, kv.Value); err != nil {
return err
}
}

if !resp.More {
break
}
// move to next key
key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
}

return nil
}
98 changes: 71 additions & 27 deletions states/etcd/remove/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package remove

import (
"fmt"
"math"
"os"
"strings"
"time"

"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
Expand All @@ -17,9 +20,14 @@ import (
func SegmentCommand(cli clientv3.KV, basePath string) *cobra.Command {
cmd := &cobra.Command{
Use: "segment",
Short: "Remove segment from meta with specified segment id",
Short: "Remove segment from meta with specified filters",
Run: func(cmd *cobra.Command, args []string) {
targetSegmentID, err := cmd.Flags().GetInt64("segment")
targetSegmentID, err := cmd.Flags().GetInt64("segmentID")
if err != nil {
fmt.Println(err.Error())
return
}
collectionID, err := cmd.Flags().GetInt64("collectionID")
if err != nil {
fmt.Println(err.Error())
return
Expand All @@ -30,60 +38,96 @@ func SegmentCommand(cli clientv3.KV, basePath string) *cobra.Command {
return
}

segments, err := common.ListSegments(cli, basePath, func(segmentInfo *datapb.SegmentInfo) bool {
return segmentInfo.GetID() == targetSegmentID
})
maxNum, err := cmd.Flags().GetInt64("maxNum")
if err != nil {
fmt.Println("failed to list segments", err.Error())
fmt.Println(err.Error())
return
}

if len(segments) != 1 {
fmt.Printf("failed to get segment with id %d, get %d result(s)\n", targetSegmentID, len(segments))
state, err := cmd.Flags().GetString("state")
if err != nil {
fmt.Println(err.Error())
return
}

// dry run, display segment first
if !run {
// show.PrintSegmentInfo(segments[0], false)
fmt.Printf("segment info %v", segments[0])
return
backupDir := fmt.Sprintf("segments-backup_%d", time.Now().UnixMilli())

filterFunc := func(segmentInfo *datapb.SegmentInfo) bool {
return (collectionID == 0 || segmentInfo.CollectionID == collectionID) &&
(targetSegmentID == 0 || segmentInfo.GetID() == targetSegmentID) &&
(state == "" || strings.EqualFold(segmentInfo.State.String(), state))
}

// TODO put audit log
info := segments[0]
backupSegmentInfo(info)
fmt.Println("[WARNING] about to remove segment from etcd")
err = common.RemoveSegment(cli, basePath, info)
if err != nil {
fmt.Printf("Remove segment %d from Etcd failed, err: %s\n", info.ID, err.Error())
removedCnt := 0
dryRunCount := 0
opFunc := func(info *datapb.SegmentInfo) error {
// dry run, display segment first
if !run {
dryRunCount++
fmt.Printf("dry run segment:%d collectionID:%d state:%s\n", info.ID, info.CollectionID, info.State.String())
return nil
}

if err = backupSegmentInfo(info, backupDir); err != nil {
return err
}

if err = common.RemoveSegment(cli, basePath, info); err != nil {
fmt.Printf("Remove segment %d from Etcd failed, err: %s\n", info.ID, err.Error())
return err
}

removedCnt++
fmt.Printf("Remove segment %d from etcd succeeds.\n", info.GetID())
return nil
}

err = common.WalkAllSegments(cli, basePath, filterFunc, opFunc, maxNum)
if err != nil && !errors.Is(err, common.ErrReachMaxNumOfWalkSegment) {
fmt.Printf("WalkAllSegmentsfailed, err: %s\n", err.Error())
}

if !run {
fmt.Println("dry run segments, total count:", dryRunCount)
return
}
fmt.Printf("Remove segment %d from etcd succeeds.\n", info.GetID())
fmt.Println("Remove segments succeeds, total count:", removedCnt)
},
}

cmd.Flags().Bool("run", false, "flags indicating whether to remove segment from meta")
cmd.Flags().Int64("segment", 0, "segment id to remove")
cmd.Flags().Int64("segmentID", 0, "segment id")
cmd.Flags().Int64("collectionID", 0, "collection id")
cmd.Flags().String("state", "", "segment state")
cmd.Flags().Int64("maxNum", math.MaxInt64, "max number of segment to remove")
return cmd
}

func backupSegmentInfo(info *datapb.SegmentInfo) {
func backupSegmentInfo(info *datapb.SegmentInfo, backupDir string) error {
if _, err := os.Stat(backupDir); errors.Is(err, os.ErrNotExist) {
err := os.MkdirAll(backupDir, os.ModePerm)
if err != nil {
fmt.Println("Failed to create folder,", err.Error())
return err
}
}

now := time.Now()
filePath := fmt.Sprintf("bw_etcd_segment_%d.%s.bak", info.GetID(), now.Format("060102-150405"))
filePath := fmt.Sprintf("%s/bw_etcd_segment_%d.%s.bak", backupDir, info.GetID(), now.Format("060102-150405"))
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o600)
if err != nil {
fmt.Println("failed to open backup segment file", err.Error())
return
return err
}

defer f.Close()

bs, err := proto.Marshal(info)
if err != nil {
fmt.Println("failed to marshal backup segment", err.Error())
return
return err
}

f.Write(bs)
_, err = f.Write(bs)
return err
}

0 comments on commit b2fd4f7

Please sign in to comment.