From ed97cba972d6e8a3f572cb2843e92373860e8ca2 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 22 Aug 2024 17:56:50 +0800 Subject: [PATCH] feature: Enable to remove checkpoints when remove orphan channel (#298) Signed-off-by: yangxuan --- states/etcd/common/channel.go | 8 ++++++++ states/etcd/remove/channel.go | 22 +++++++++++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/states/etcd/common/channel.go b/states/etcd/common/channel.go index 2cc21d21..3f765d96 100644 --- a/states/etcd/common/channel.go +++ b/states/etcd/common/channel.go @@ -39,6 +39,14 @@ func ListChannelWatchV2(cli clientv3.KV, basePath string, filters ...func(channe return ListProtoObjects(ctx, cli, prefix, filters...) } +func ListChannelCheckpint(cli clientv3.KV, basePath string, filters ...func(pos *internalpb.MsgPosition) bool) ([]internalpb.MsgPosition, []string, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + + prefix := path.Join(basePath, "datacoord-meta", "channel-cp") + "/" + return ListProtoObjects(ctx, cli, prefix, filters...) +} + // ListChannelWatch lists channel watch info meta. func ListChannelWatch(ctx context.Context, cli clientv3.KV, basePath string, version string, filters ...func(*models.ChannelWatch) bool) ([]*models.ChannelWatch, error) { prefix := path.Join(basePath, "channelwatch") + "/" diff --git a/states/etcd/remove/channel.go b/states/etcd/remove/channel.go index a4d5f061..b22b2b22 100644 --- a/states/etcd/remove/channel.go +++ b/states/etcd/remove/channel.go @@ -8,6 +8,7 @@ import ( "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" + "github.com/milvus-io/birdwatcher/proto/v2.0/internalpb" datapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/datapb" "github.com/milvus-io/birdwatcher/states/etcd/common" etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" @@ -59,7 +60,18 @@ func ChannelCommand(cli clientv3.KV, basePath string) *cobra.Command { return } - targets := make([]string, 0, len(paths)) + orphanCps, cpPaths, err := common.ListChannelCheckpint(cli, basePath, func(pos *internalpb.MsgPosition) bool { + if len(channelName) > 0 { + return pos.GetChannelName() == channelName + } + return true + }) + if err != nil { + fmt.Println(err.Error()) + return + } + + targets := make([]string, 0, len(paths)+len(cpPaths)) for i, watchChannel := range watchChannels { _, ok := validChannels[watchChannel.GetVchan().GetChannelName()] if !ok || force { @@ -68,6 +80,14 @@ func ChannelCommand(cli clientv3.KV, basePath string) *cobra.Command { } } + for i, orphanCp := range orphanCps { + _, ok := validChannels[orphanCp.GetChannelName()] + if !ok || force { + fmt.Printf("%s selected as target orpah checkpoint\n", orphanCp.GetChannelName()) + targets = append(targets, cpPaths[i]) + } + } + if !run { return }