Skip to content

Commit

Permalink
Add remove leaked collection meta cmd (#222)
Browse files Browse the repository at this point in the history
/kind improvement

Signed-off-by: xige-16 <[email protected]>
  • Loading branch information
xige-16 authored Dec 1, 2023
1 parent b08fb97 commit 0b09362
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 4 deletions.
2 changes: 2 additions & 0 deletions states/etcd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func RemoveCommand(cli kv.MetaKV, instanceName, basePath string) *cobra.Command
remove.SegmentCollectionDroppedCommand(cli, basePath),
// remove etcd-config
remove.EtcdConfigCommand(cli, instanceName),
// remove collection has been dropped
remove.CollectionCleanCommand(cli, basePath),
)

return removeCmd
Expand Down
3 changes: 3 additions & 0 deletions states/etcd/common/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (
)

const (
SnapshotPrefix = "snapshots"
// CollectionMetaPrefix is prefix for rootcoord collection meta.
CollectionMetaPrefix = `root-coord/collection`
// DBCollectionMetaPrefix is prefix for rootcoord database collection meta
DBCollectionMetaPrefix = `root-coord/database/collection-info`
// FieldMetaPrefix is prefix for rootcoord collection fields meta
FieldMetaPrefix = `root-coord/fields`
// CollectionLoadPrefix is prefix for querycoord collection loaded in milvus v2.1.x
CollectionLoadPrefix = "queryCoord-collectionMeta"
// CollectionLoadPrefixV2 is prefix for querycoord collection loaded in milvus v2.2.x
Expand Down
9 changes: 5 additions & 4 deletions states/etcd/common/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ import (
)

const (
segmentMetaPrefix = "datacoord-meta/s"
SegmentMetaPrefix = "datacoord-meta/s"
SegmentStatsMetaPrefix = "datacoord-meta/statslog"
)

// ListSegmentsVersion list segment info as specified version.
func ListSegmentsVersion(ctx context.Context, cli kv.MetaKV, basePath string, version string, filters ...func(*models.Segment) bool) ([]*models.Segment, error) {
prefix := path.Join(basePath, segmentMetaPrefix) + "/"
prefix := path.Join(basePath, SegmentMetaPrefix) + "/"
switch version {
case models.LTEVersion2_1:
segments, keys, err := ListProtoObjects[datapb.SegmentInfo](ctx, cli, prefix)
Expand Down Expand Up @@ -107,7 +108,7 @@ func getSegmentLazyFunc(cli kv.MetaKV, basePath string, segment datapbv2.Segment
func ListSegments(cli kv.MetaKV, basePath string, filter func(*datapb.SegmentInfo) bool) ([]*datapb.SegmentInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
_, vals, err := cli.LoadWithPrefix(ctx, path.Join(basePath, segmentMetaPrefix)+"/")
_, vals, err := cli.LoadWithPrefix(ctx, path.Join(basePath, SegmentMetaPrefix)+"/")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -308,7 +309,7 @@ func RemoveSegmentByID(ctx context.Context, cli kv.MetaKV, basePath string, coll

func UpdateSegments(ctx context.Context, cli kv.MetaKV, basePath string, collectionID int64, fn func(segment *datapbv2.SegmentInfo)) error {

prefix := path.Join(basePath, fmt.Sprintf("%s/%d", segmentMetaPrefix, collectionID)) + "/"
prefix := path.Join(basePath, fmt.Sprintf("%s/%d", SegmentMetaPrefix, collectionID)) + "/"
segments, keys, err := ListProtoObjects[datapbv2.SegmentInfo](ctx, cli, prefix)
if err != nil {
return err
Expand Down
122 changes: 122 additions & 0 deletions states/etcd/remove/collection_clean.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package remove

import (
"context"
"fmt"
"path"
"strconv"
"strings"

"github.com/samber/lo"
"github.com/spf13/cobra"

"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
"github.com/milvus-io/birdwatcher/states/kv"
)

var paginationSize = 2000

type ExcludePrefixOptions func(string) bool

// CollectionCleanCommand returns command to remove
func CollectionCleanCommand(cli kv.MetaKV, basePath string) *cobra.Command {
cmd := &cobra.Command{
Use: "collection-meta-leaked",
Short: "Remove leaked collection meta for collection has been dropped",
Run: func(cmd *cobra.Command, args []string) {
run, err := cmd.Flags().GetBool("run")
if err != nil {
fmt.Println(err.Error())
return
}

collections, err := common.ListCollectionsVersion(context.TODO(), cli, basePath, etcdversion.GetVersion())
if err != nil {
fmt.Println(err.Error())
return
}

id2Collection := lo.SliceToMap(collections, func(col *models.Collection) (string, *models.Collection) {
fmt.Printf("existing collectionID %v\n", col.ID)
return strconv.FormatInt(col.ID, 10), col
})

cleanMetaFn := func(ctx context.Context, prefix string, opts ...ExcludePrefixOptions) error {
return cli.WalkWithPrefix(ctx, prefix, paginationSize, func(k []byte, v []byte) error {
sKey := string(k)
for _, opt := range opts {
if opt(sKey) {
return nil
}
}

key := sKey[len(prefix):]
ss := strings.Split(key, "/")
collectionExist := false
for _, s := range ss {
if _, ok := id2Collection[s]; ok {
collectionExist = true
}
}

if !collectionExist {
fmt.Println("clean meta key ", sKey)
if run {
return cli.Remove(ctx, sKey)
}
}

return nil
})
}

// remove collection meta
// meta before database
collectionMetaPrefix := path.Join(basePath, common.CollectionMetaPrefix)
// with database
dbCollectionMetaPrefix := path.Join(basePath, common.DBCollectionMetaPrefix)
// remove collection field meta
fieldsPrefix := path.Join(basePath, common.FieldMetaPrefix)
fieldsSnapShotPrefix := path.Join(basePath, common.SnapshotPrefix, common.FieldMetaPrefix)
// remove collection partition meta
partitionsPrefix := path.Join(basePath, common.PartitionPrefix)
partitionsSnapShotPrefix := path.Join(basePath, common.SnapshotPrefix, common.PartitionPrefix)
prefixes := []string{
collectionMetaPrefix,
dbCollectionMetaPrefix,
fieldsPrefix,
fieldsSnapShotPrefix,
partitionsPrefix,
partitionsSnapShotPrefix}

for _, prefix := range prefixes {
fmt.Printf("start cleaning leaked collection meta, prefix: %s\n", prefix)
err = cleanMetaFn(context.TODO(), prefix)
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Printf("clean leaked collection meta done, prefix: %s\n", prefix)
}

// remove segment meta
segmentPrefix := path.Join(basePath, common.SegmentMetaPrefix)
segmentStatsPrefix := path.Join(basePath, common.SegmentStatsMetaPrefix)
fmt.Printf("start cleaning leaked segment meta, prefix: %s, exclude prefix%s\n", segmentPrefix, segmentStatsPrefix)
err = cleanMetaFn(context.TODO(), segmentPrefix, func(key string) bool {
return strings.HasPrefix(key, segmentStatsPrefix)
})
if err != nil {
fmt.Println(err.Error())
return
}

fmt.Printf("clean leaked segment meta done, prefix: %s\n", segmentPrefix)
},
}

cmd.Flags().Bool("run", false, "flags indicating whether to execute removed command")
return cmd
}
76 changes: 76 additions & 0 deletions states/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"math"
"path"
"strings"
"time"

Expand Down Expand Up @@ -50,6 +51,7 @@ type MetaKV interface {
removeWithPrefixAndPrevKV(ctx context.Context, prefix string) ([]*mvccpb.KeyValue, error)
GetAllRootPath(ctx context.Context) ([]string, error)
BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision bool, batchSize int64) error
WalkWithPrefix(ctx context.Context, prefix string, paginationSize int, fn func([]byte, []byte) error) error
Close()
}

Expand Down Expand Up @@ -206,6 +208,8 @@ func (kv *etcdKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision
meta["instance"] = instance
meta["metaPath"] = metaPath

fmt.Println("meta path ", metaPath)

bs, _ := json.Marshal(meta)
ph := models.PartHeader{
PartType: int32(models.EtcdBackup),
Expand Down Expand Up @@ -271,6 +275,39 @@ func (kv *etcdKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision
return nil
}

func (kv *etcdKV) WalkWithPrefix(ctx context.Context, prefix string, paginationSize int, fn func([]byte, []byte) error) error {
prefix = path.Join(kv.rootPath, prefix)

batch := int64(paginationSize)
opts := []clientv3.OpOption{
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
clientv3.WithLimit(batch),
clientv3.WithRange(clientv3.GetPrefixRangeEnd(prefix)),
}

key := prefix
for {
resp, err := kv.client.Get(ctx, key, opts...)
if err != nil {
return err
}

for _, kv := range resp.Kvs {
if err = fn(kv.Key, kv.Value); err != nil {
return err
}
}

if !resp.More {
break
}
// move to next key
key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
}

return nil
}

// Close closes the connection to etcd.
func (kv *etcdKV) Close() {
kv.client.Close()
Expand Down Expand Up @@ -588,6 +625,45 @@ func (kv *txnTiKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision
return txn.Commit(ctx)
}

func (kv *txnTiKV) WalkWithPrefix(ctx context.Context, prefix string, paginationSize int, fn func([]byte, []byte) error) error {
prefix = path.Join(kv.rootPath, prefix)

// Since only reading, use Snapshot for less overhead
ss := kv.client.GetSnapshot(MaxSnapshotTS)
ss.SetScanBatchSize(paginationSize)

// Retrieve key-value pairs with the specified prefix
startKey := []byte(prefix)
endKey := tikv.PrefixNextKey([]byte(prefix))
iter, err := ss.Iter(startKey, endKey)
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("Failed to create iterater for %s during WalkWithPrefix", prefix))
return err
}
defer iter.Close()

// Iterate over the key-value pairs
for iter.Valid() {
// Grab value for empty check
byteVal := iter.Value()
// Check if empty val and replace with placeholder
if isEmptyByte(byteVal) {
byteVal = []byte{}
}
err = fn(iter.Key(), byteVal)
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("Failed to apply fn to (%s;%s)", string(iter.Key()), string(byteVal)))
return err
}
err = iter.Next()
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("Failed to move Iterator after key %s for WalkWithPrefix", string(iter.Key())))
return err
}
}
return nil
}

// Close closes the connection to TiKV.
func (kv *txnTiKV) Close() {
kv.client.Close()
Expand Down
4 changes: 4 additions & 0 deletions states/kv/kv_audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,7 @@ func (c *FileAuditKV) writeData(data []byte) {
}
}
}

func (c *FileAuditKV) WalkWithPrefix(ctx context.Context, prefix string, paginationSize int, fn func([]byte, []byte) error) error {
return c.cli.WalkWithPrefix(ctx, prefix, paginationSize, fn)
}

0 comments on commit 0b09362

Please sign in to comment.