Skip to content

Commit

Permalink
enhance: Support repair channel-watch command to set empty schema (#…
Browse files Browse the repository at this point in the history
…305)

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Aug 23, 2024
1 parent 87f5d7f commit 89193d8
Show file tree
Hide file tree
Showing 10 changed files with 276 additions and 23 deletions.
40 changes: 27 additions & 13 deletions models/channel_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package models
import (
"github.com/samber/lo"

datapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/datapb"
"github.com/milvus-io/birdwatcher/proto/v2.2/schemapb"
)

Expand All @@ -12,9 +13,15 @@ type ChannelWatch struct {
State ChannelWatchState
TimeoutTs int64

// 2.4 only
Progress int32
OpID int64

// key
key string
Schema CollectionSchema
Schema *CollectionSchema

VchanV2Pb *datapbv2.VchannelInfo
}

func (c *ChannelWatch) Key() string {
Expand Down Expand Up @@ -57,29 +64,36 @@ func GetChannelWatchInfo[ChannelWatchBase interface {
}

func GetChannelWatchInfoV2[ChannelWatchBase interface {
GetVchan() vchan
GetVchan() *datapbv2.VchannelInfo
GetStartTs() int64
GetState() watchState
GetTimeoutTs() int64
GetSchema() *schemapb.CollectionSchema
}, watchState ~int32, vchan interface {
vchannelInfoBase
GetSeekPosition() pos
}, pos msgPosBase](info ChannelWatchBase, key string) *ChannelWatch {
schema := newSchemaFromBase(info.GetSchema())
schema.Fields = lo.Map(info.GetSchema().GetFields(), func(fieldSchema *schemapb.FieldSchema, _ int) FieldSchema {
fs := NewFieldSchemaFromBase[*schemapb.FieldSchema, schemapb.DataType](fieldSchema)
fs.Properties = GetMapFromKVPairs(fieldSchema.GetTypeParams())
return fs
})
GetProgress() int32
GetOpID() int64
}, watchState ~int32, pos msgPosBase](info ChannelWatchBase, key string) *ChannelWatch {
var schema *CollectionSchema
if info.GetSchema() != nil {
m := newSchemaFromBase(info.GetSchema())
schema = &m
schema.Fields = lo.Map(info.GetSchema().GetFields(), func(fieldSchema *schemapb.FieldSchema, _ int) FieldSchema {
fs := NewFieldSchemaFromBase[*schemapb.FieldSchema, schemapb.DataType](fieldSchema)
fs.Properties = GetMapFromKVPairs(fieldSchema.GetTypeParams())
return fs
})
}

return &ChannelWatch{
Vchan: getVChannelInfo[vchan, pos](info.GetVchan()),
Vchan: getVChannelInfo(info.GetVchan()),
StartTs: info.GetStartTs(),
State: ChannelWatchState(info.GetState()),
TimeoutTs: info.GetTimeoutTs(),
key: key,
Schema: schema,
Progress: info.GetProgress(),
OpID: info.GetOpID(),

VchanV2Pb: info.GetVchan(),
}
}

Expand Down
3 changes: 3 additions & 0 deletions models/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type Collection struct {
Properties map[string]string
DBID int64

CollectionPBv2 *schemapbv2.CollectionSchema

// etcd collection key
key string

Expand Down Expand Up @@ -114,6 +116,7 @@ func NewCollectionFromV2_2(info *etcdpbv2.CollectionInfo, key string, fields []*
schema := info.GetSchema()
schema.Fields = fields
c.Schema = newSchemaFromBase(schema)
c.CollectionPBv2 = schema

c.Schema.Fields = lo.Map(fields, func(fieldSchema *schemapbv2.FieldSchema, _ int) FieldSchema {
fs := NewFieldSchemaFromBase[*schemapbv2.FieldSchema, schemapbv2.DataType](fieldSchema)
Expand Down
3 changes: 3 additions & 0 deletions states/backup_mock_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd"
"github.com/milvus-io/birdwatcher/states/etcd/remove"
"github.com/milvus-io/birdwatcher/states/etcd/repair"
"github.com/milvus-io/birdwatcher/states/etcd/show"
)

Expand All @@ -32,6 +33,7 @@ type embedEtcdMockState struct {
cmdState
*show.ComponentShow
*remove.ComponentRemove
*repair.ComponentRepair
client *clientv3.Client
server *embed.Etcd
instanceName string
Expand Down Expand Up @@ -90,6 +92,7 @@ func (s *embedEtcdMockState) SetInstance(instanceName string) {
rootPath := path.Join(instanceName, metaPath)
s.ComponentShow = show.NewComponent(s.client, s.config, rootPath)
s.ComponentRemove = remove.NewComponent(s.client, s.config, rootPath)
s.ComponentRepair = repair.NewComponent(s.client, s.config, rootPath)
s.SetupCommands()
}

Expand Down
5 changes: 3 additions & 2 deletions states/etcd/common/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,16 @@ func ListChannelWatch(ctx context.Context, cli clientv3.KV, basePath string, ver
return nil, err
}
result = lo.Map(infos, func(info datapb.ChannelWatchInfo, idx int) *models.ChannelWatch {
return models.GetChannelWatchInfo[*datapb.ChannelWatchInfo, datapb.ChannelWatchState, *datapb.VchannelInfo, *internalpb.MsgPosition](&info, paths[idx])
result := models.GetChannelWatchInfo[*datapb.ChannelWatchInfo, datapb.ChannelWatchState, *datapb.VchannelInfo, *internalpb.MsgPosition](&info, paths[idx])
return result
})
case models.GTEVersion2_2:
infos, paths, err := ListProtoObjects[datapbv2.ChannelWatchInfo](ctx, cli, prefix)
if err != nil {
return nil, err
}
result = lo.Map(infos, func(info datapbv2.ChannelWatchInfo, idx int) *models.ChannelWatch {
return models.GetChannelWatchInfoV2[*datapbv2.ChannelWatchInfo, datapbv2.ChannelWatchState, *datapbv2.VchannelInfo, *msgpbv2.MsgPosition](&info, paths[idx])
return models.GetChannelWatchInfoV2[*datapbv2.ChannelWatchInfo, datapbv2.ChannelWatchState, *msgpbv2.MsgPosition](&info, paths[idx])
})
default:
return nil, errors.New("version not supported")
Expand Down
30 changes: 30 additions & 0 deletions states/etcd/common/channel_watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package common

import (
"context"

"github.com/golang/protobuf/proto"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/birdwatcher/models"
datapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/datapb"
schemapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/schemapb"
)

func WriteChannelWatchInfo(ctx context.Context, cli clientv3.KV, basePath string, info *models.ChannelWatch, schema *schemapbv2.CollectionSchema) error {
pb := &datapbv2.ChannelWatchInfo{
Vchan: info.VchanV2Pb,
StartTs: info.StartTs,
State: datapbv2.ChannelWatchState(info.State),
TimeoutTs: info.TimeoutTs,
Schema: schema, // use passed schema
Progress: info.Progress,
OpID: info.OpID,
}
bs, err := proto.Marshal(pb)
if err != nil {
return err
}
_, err = cli.Put(ctx, info.Key(), string(bs))
return err
}
15 changes: 10 additions & 5 deletions states/etcd/common/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,18 @@ func ListCollectionsVersion(ctx context.Context, cli clientv3.KV, basePath strin

// GetCollectionByIDVersion retruns collection info from etcd with provided version & id.
func GetCollectionByIDVersion(ctx context.Context, cli clientv3.KV, basePath string, version string, collID int64) (*models.Collection, error) {
var result []*mvccpb.KeyValue

// meta before database
var legacy []*mvccpb.KeyValue
prefix := path.Join(basePath, CollectionMetaPrefix, strconv.FormatInt(collID, 10))
resp, err := cli.Get(ctx, prefix)
if err != nil {
fmt.Println("get error", err.Error())
return nil, err
}
result = append(result, resp.Kvs...)
legacy = append(legacy, resp.Kvs...)

// with database, dbID unknown here
var result []*mvccpb.KeyValue
prefix = path.Join(basePath, DBCollectionMetaPrefix)
resp, _ = cli.Get(ctx, prefix, clientv3.WithPrefix())
suffix := strconv.FormatInt(collID, 10)
Expand All @@ -142,11 +142,16 @@ func GetCollectionByIDVersion(ctx context.Context, cli clientv3.KV, basePath str
}
}

if len(result) != 1 {
if len(legacy)+len(result) == 0 {
return nil, fmt.Errorf("collection %d not found in etcd %w", collID, ErrCollectionNotFound)
}

kv := result[0]
var kv *mvccpb.KeyValue
if len(result) > 0 {
kv = result[0]
} else {
kv = legacy[0]
}

if bytes.Equal(kv.Value, CollectionTombstone) {
return nil, fmt.Errorf("%w, collection id: %d", ErrCollectionDropped, collID)
Expand Down
7 changes: 6 additions & 1 deletion states/etcd/common/list.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"bytes"
"context"
"fmt"

Expand All @@ -26,7 +27,11 @@ LOOP:
info := P(&elem)
err = proto.Unmarshal(kv.Value, info)
if err != nil {
fmt.Println(err.Error())
if bytes.Equal(kv.Value, []byte{0xE2, 0x9B, 0xBC}) {
fmt.Printf("Tombstone found, key: %s\n", string(kv.Key))
continue
}
fmt.Printf("failed to unmarshal key=%s, err: %s\n", string(kv.Key), err.Error())
continue
}

Expand Down
103 changes: 103 additions & 0 deletions states/etcd/repair/channel_watched.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package repair

import (
"context"
"fmt"
"sort"
"strings"

"github.com/pkg/errors"

"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
)

type ChannelWatchedParam struct {
framework.ParamBase `use:"repair channel-watch"`
CollectionID int64 `name:"collection" default:"0" desc:"collection id to repair"`
ChannelName string `name:"vchannel" default:"" desc:"channel name to repair"`
Run bool `name:"run" default:"false" desc:"whether to remove legacy collection meta, default set to \"false\" to dry run"`
}

func (c *ComponentRepair) RepairChannelWatchedCommand(ctx context.Context, p *ChannelWatchedParam) error {
infos, err := common.ListChannelWatch(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(channel *models.ChannelWatch) bool {
return (p.CollectionID == 0 || channel.Vchan.CollectionID == p.CollectionID) &&
(p.ChannelName == "" || channel.Vchan.ChannelName == p.ChannelName)
})
if err != nil {
return errors.Wrap(err, "failed to list channel watch info")
}

var targets []*models.ChannelWatch

for _, info := range infos {
if info.Schema == nil {
targets = append(targets, info)
}
}

if len(targets) == 0 {
fmt.Println("No empty schema watch info found")
return nil
}

for _, info := range targets {
fmt.Println("=================================================================")
fmt.Printf("Watch info with empty schema found, channel name = %s, key = %s", info.Vchan.ChannelName, info.Key())

collection, err := common.GetCollectionByIDVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), info.Vchan.CollectionID)
if err != nil {
fmt.Println("failed to get collection schema: ", err.Error())
}
sb := &strings.Builder{}
info.Schema = &collection.Schema
printSchema(sb, info)
fmt.Println("Collection schema found, about to set schema as:")
fmt.Println(sb.String())
if p.Run {
err := common.WriteChannelWatchInfo(ctx, c.client, c.basePath, info, collection.CollectionPBv2)
if err != nil {
fmt.Println("failed to write modified channel watch info, err: ", err.Error())
continue
}
fmt.Println("Modified channel watch info written!")
}
}

return nil
}

func printSchema(sb *strings.Builder, info *models.ChannelWatch) {
fmt.Fprintf(sb, "Fields:\n")
fields := info.Schema.Fields
sort.Slice(fields, func(i, j int) bool {
return fields[i].FieldID < fields[j].FieldID
})
for _, field := range fields {
fmt.Fprintf(sb, " - Field ID: %d \t Field Name: %s \t Field Type: %s\n", field.FieldID, field.Name, field.DataType.String())
if field.IsPrimaryKey {
fmt.Fprintf(sb, "\t - Primary Key: %t, AutoID: %t\n", field.IsPrimaryKey, field.AutoID)
}
if field.IsDynamic {
fmt.Fprintf(sb, "\t - Dynamic Field\n")
}
if field.IsPartitionKey {
fmt.Fprintf(sb, "\t - Partition Key\n")
}
if field.IsClusteringKey {
fmt.Fprintf(sb, "\t - Clustering Key\n")
}
// print element type if field is array
if field.DataType == models.DataTypeArray {
fmt.Fprintf(sb, "\t - Element Type: %s\n", field.ElementType.String())
}
// type params
for key, value := range field.Properties {
fmt.Fprintf(sb, "\t - Type Param %s: %s\n", key, value)
}
}

fmt.Fprintf(sb, "Enable Dynamic Schema: %t\n", info.Schema.EnableDynamicSchema)
}
Loading

0 comments on commit 89193d8

Please sign in to comment.