Skip to content

Commit

Permalink
feature: Enable to remove checkpoints when remove orphan channel (#298)
Browse files Browse the repository at this point in the history
Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Aug 22, 2024
1 parent 7acafae commit ed97cba
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
8 changes: 8 additions & 0 deletions states/etcd/common/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ func ListChannelWatchV2(cli clientv3.KV, basePath string, filters ...func(channe
return ListProtoObjects(ctx, cli, prefix, filters...)
}

func ListChannelCheckpint(cli clientv3.KV, basePath string, filters ...func(pos *internalpb.MsgPosition) bool) ([]internalpb.MsgPosition, []string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()

prefix := path.Join(basePath, "datacoord-meta", "channel-cp") + "/"
return ListProtoObjects(ctx, cli, prefix, filters...)
}

// ListChannelWatch lists channel watch info meta.
func ListChannelWatch(ctx context.Context, cli clientv3.KV, basePath string, version string, filters ...func(*models.ChannelWatch) bool) ([]*models.ChannelWatch, error) {
prefix := path.Join(basePath, "channelwatch") + "/"
Expand Down
22 changes: 21 additions & 1 deletion states/etcd/remove/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/birdwatcher/proto/v2.0/internalpb"
datapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/datapb"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
Expand Down Expand Up @@ -59,7 +60,18 @@ func ChannelCommand(cli clientv3.KV, basePath string) *cobra.Command {
return
}

targets := make([]string, 0, len(paths))
orphanCps, cpPaths, err := common.ListChannelCheckpint(cli, basePath, func(pos *internalpb.MsgPosition) bool {
if len(channelName) > 0 {
return pos.GetChannelName() == channelName
}
return true
})
if err != nil {
fmt.Println(err.Error())
return
}

targets := make([]string, 0, len(paths)+len(cpPaths))
for i, watchChannel := range watchChannels {
_, ok := validChannels[watchChannel.GetVchan().GetChannelName()]
if !ok || force {
Expand All @@ -68,6 +80,14 @@ func ChannelCommand(cli clientv3.KV, basePath string) *cobra.Command {
}
}

for i, orphanCp := range orphanCps {
_, ok := validChannels[orphanCp.GetChannelName()]
if !ok || force {
fmt.Printf("%s selected as target orpah checkpoint\n", orphanCp.GetChannelName())
targets = append(targets, cpPaths[i])
}
}

if !run {
return
}
Expand Down

0 comments on commit ed97cba

Please sign in to comment.