Skip to content

Commit

Permalink
Support backup index info and load state
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink committed Jan 12, 2023
1 parent f804a71 commit ce366e7
Show file tree
Hide file tree
Showing 8 changed files with 424 additions and 170 deletions.
113 changes: 109 additions & 4 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,37 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
AutoID: completeCollection.Schema.AutoID,
Fields: fields,
}

hasIndex := false
var indexInfo *backuppb.IndexInfo
for _, field := range completeCollection.Schema.Fields {
if field.DataType != entity.FieldTypeBinaryVector && field.DataType != entity.FieldTypeFloatVector {
continue
}

indexState, err := b.milvusClient.GetIndexState(b.ctx, collection.Name, field.Name)
if err != nil {
log.Error("fail in GetIndexState", zap.Error(err))
return backupInfo, err
}
if indexState == 0 {
continue
}
fieldIndex, err := b.milvusClient.DescribeIndex(b.ctx, collection.Name, field.Name)
if err != nil {
log.Error("fail in DescribeIndex", zap.Error(err))
return backupInfo, err
}
if len(fieldIndex) != 0 {
indexInfo = &backuppb.IndexInfo{
Name: fieldIndex[0].Name(),
IndexType: string(fieldIndex[0].IndexType()),
Params: fieldIndex[0].Params(),
}
hasIndex = true
}
}

collectionBackupId := utils.UUID()
collectionBackup := &backuppb.CollectionBackupInfo{
Id: collectionBackupId,
Expand All @@ -335,6 +366,8 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
Schema: schema,
ShardsNum: completeCollection.ShardNum,
ConsistencyLevel: backuppb.ConsistencyLevel(completeCollection.ConsistencyLevel),
HasIndex: hasIndex,
IndexInfo: indexInfo,
}
collectionBackupInfos = append(collectionBackupInfos, collectionBackup)
}
Expand All @@ -353,13 +386,51 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
return backupInfo, err
}

// use GetLoadingProgress currently, GetLoadState is a new interface @20230104 milvus pr#21515
collectionLoadProgress, err := b.milvusClient.GetLoadingProgress(ctx, collection.GetCollectionName(), []string{})
if err != nil {
log.Error("fail to GetLoadingProgress of collection", zap.Error(err))
return backupInfo, err
}

var collectionLoadState string
partitionLoadStates := make(map[string]string, 0)
if collectionLoadProgress == 0 {
collectionLoadState = LoadState_NotLoad
for _, partition := range partitions {
partitionLoadStates[partition.Name] = LoadState_NotLoad
}
} else if collectionLoadProgress == 100 {
collectionLoadState = LoadState_Loaded
for _, partition := range partitions {
partitionLoadStates[partition.Name] = LoadState_Loaded
}
} else {
collectionLoadState = LoadState_Loading
for _, partition := range partitions {
loadProgress, err := b.milvusClient.GetLoadingProgress(ctx, collection.GetCollectionName(), []string{partition.Name})
if err != nil {
log.Error("fail to GetLoadingProgress of partition", zap.Error(err))
return backupInfo, err
}
if loadProgress == 0 {
partitionLoadStates[partition.Name] = LoadState_NotLoad
} else if loadProgress == 100 {
partitionLoadStates[partition.Name] = LoadState_Loaded
} else {
partitionLoadStates[partition.Name] = LoadState_Loading
}
}
}

// Flush
newSealedSegmentIDs, flushedSegmentIDs, timeOfSeal, err := b.milvusClient.Flush(ctx, collection.GetCollectionName(), false)
if err != nil {
log.Error(fmt.Sprintf("fail to flush the collection: %s", collection.GetCollectionName()))
return backupInfo, err
}
log.Info("flush segments",
zap.String("collectionName", collection.GetCollectionName()),
zap.Int64s("newSealedSegmentIDs", newSealedSegmentIDs),
zap.Int64s("flushedSegmentIDs", flushedSegmentIDs),
zap.Int64("timeOfSeal", timeOfSeal))
Expand All @@ -370,6 +441,9 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
if err != nil {
return backupInfo, err
}
log.Info("GetPersistentSegmentInfo from milvus",
zap.String("collectionName", collection.GetCollectionName()),
zap.Int("segmentNum", len(segmentEntities)))

checkSegmentsFunc := func(flushSegmentIds []int64, segmentEntities []*entity.Segment) ([]*entity.Segment, error) {
segmentDict := utils.ArrayToMap(flushSegmentIds)
Expand All @@ -396,6 +470,10 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
collection.ErrorMessage = err.Error()
return backupInfo, err
}
log.Info("Finished segment check",
zap.String("collectionName", collection.GetCollectionName()),
zap.Int("before check", len(segmentEntities)),
zap.Int("after check", len(checkedSegments)))

segmentBackupInfos := make([]*backuppb.SegmentBackupInfo, 0)
partSegInfoMap := make(map[int64][]*backuppb.SegmentBackupInfo)
Expand All @@ -411,6 +489,9 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
segmentBackupInfos = append(segmentBackupInfos, segmentInfo)
segmentLevelBackupInfos = append(segmentLevelBackupInfos, segmentInfo)
}
log.Info("readSegmentInfo from storage",
zap.String("collectionName", collection.GetCollectionName()),
zap.Int("segmentNum", len(checkedSegments)))

leveledBackupInfo.segmentLevel = &backuppb.SegmentLevelBackupInfo{
Infos: segmentLevelBackupInfos,
Expand All @@ -428,6 +509,7 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
CollectionId: collection.GetCollectionId(),
SegmentBackups: partSegInfoMap[partition.ID],
Size: size,
LoadState: partitionLoadStates[partition.Name],
}
partitionBackupInfos = append(partitionBackupInfos, partitionBackupInfo)
partitionLevelBackupInfos = append(partitionLevelBackupInfos, partitionBackupInfo)
Expand All @@ -437,10 +519,19 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
Infos: partitionLevelBackupInfos,
}
collection.PartitionBackups = partitionBackupInfos
collection.LoadState = collectionLoadState
refreshBackupMetaFunc(id, leveledBackupInfo)
log.Info("finish build partition info",
zap.String("collectionName", collection.GetCollectionName()),
zap.Int("partitionNum", len(partitionBackupInfos)))

log.Info("Begin copy data",
zap.String("collectionName", collection.GetCollectionName()),
zap.Int("segmentNum", len(segmentBackupInfos)))

// copy segment data
for _, segment := range segmentBackupInfos {
start := time.Now().Unix()
log.Debug("copy segment",
zap.Int64("collection_id", segment.GetCollectionId()),
zap.Int64("partition_id", segment.GetPartitionId()),
Expand Down Expand Up @@ -522,6 +613,12 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
}
}
}
duration := time.Now().Unix() - start
log.Debug("copy segment finished",
zap.Int64("collection_id", segment.GetCollectionId()),
zap.Int64("partition_id", segment.GetPartitionId()),
zap.Int64("segment_id", segment.GetSegmentId()),
zap.Int64("cost_time", duration))
}
refreshBackupMetaFunc(id, leveledBackupInfo)
}
Expand Down Expand Up @@ -638,6 +735,13 @@ func (b BackupContext) GetBackup(ctx context.Context, request *backuppb.GetBacku
}
}

log.Info("finish GetBackupRequest",
zap.String("requestId", request.GetRequestId()),
zap.String("backupName", request.GetBackupName()),
zap.String("backupId", request.GetBackupId()),
zap.String("bucketName", request.GetBucketName()),
zap.String("path", request.GetPath()),
zap.Any("resp", resp))
return resp
}

Expand Down Expand Up @@ -682,9 +786,11 @@ func (b BackupContext) ListBackups(ctx context.Context, request *backuppb.ListBa
log.Warn("Fail to read backup",
zap.String("path", backupPath),
zap.String("error", backupResp.GetMsg()))
resp.Code = backuppb.ResponseCode_Fail
resp.Msg = backupResp.Msg
return resp
// ignore get failed
continue
//resp.Code = backuppb.ResponseCode_Fail
//resp.Msg = backupResp.Msg
//return resp
}

// 2, list wanted backup
Expand Down Expand Up @@ -949,7 +1055,6 @@ func (b BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Rest
}
}

// wip
func (b BackupContext) executeRestoreBackupTask(ctx context.Context, backupBucketName string, backupPath string, backup *backuppb.BackupInfo, task *backuppb.RestoreBackupTask) (*backuppb.RestoreBackupTask, error) {
b.mu.Lock()
defer b.mu.Unlock()
Expand Down
12 changes: 12 additions & 0 deletions core/backup_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ const (
INSERT_LOG_DIR = "insert_log"
DELTA_LOG_DIR = "delta_log"
STATS_LOG_DIR = "stats_log"

LoadState_NotExist = "NotExist"
LoadState_NotLoad = "NotLoad"
LoadState_Loading = "Loading"
LoadState_Loaded = "Loaded"
)

type BackupMetaBytes struct {
Expand Down Expand Up @@ -52,6 +57,9 @@ func treeToLevel(backup *backuppb.BackupInfo) (LeveledBackupInfo, error) {
ConsistencyLevel: collectionBack.GetConsistencyLevel(),
BackupTimestamp: collectionBack.GetBackupTimestamp(),
Size: collectionBack.GetSize(),
HasIndex: collectionBack.GetHasIndex(),
IndexInfo: collectionBack.GetIndexInfo(),
LoadState: collectionBack.GetLoadState(),
}
collections = append(collections, cloneCollectionBackup)

Expand All @@ -61,6 +69,7 @@ func treeToLevel(backup *backuppb.BackupInfo) (LeveledBackupInfo, error) {
PartitionName: partitionBack.GetPartitionName(),
CollectionId: partitionBack.GetCollectionId(),
Size: partitionBack.GetSize(),
LoadState: partitionBack.GetLoadState(),
}
partitions = append(partitions, clonePartitionBackupInfo)

Expand Down Expand Up @@ -250,6 +259,9 @@ func SimpleBackupResponse(input *backuppb.BackupInfoResponse) *backuppb.BackupIn
ErrorMessage: coll.GetErrorMessage(),
CollectionName: coll.GetCollectionName(),
BackupTimestamp: coll.GetBackupTimestamp(),
HasIndex: coll.GetHasIndex(),
IndexInfo: coll.GetIndexInfo(),
LoadState: coll.GetLoadState(),
})
}
simpleBackupInfo := &backuppb.BackupInfo{
Expand Down
27 changes: 27 additions & 0 deletions core/milvus_sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -125,3 +126,29 @@ func TestBulkInsert(t *testing.T) {

client.DropCollection(ctx, _COLLECTION_NAME)
}

func TestGetIndex(t *testing.T) {
ctx := context.Background()
milvusAddr := "localhost:19530"
c, err := gomilvus.NewGrpcClient(ctx, milvusAddr)
assert.NoError(t, err)

coll, err := c.DescribeCollection(ctx, "hello_milvus_recover")

//log.Info("coll", zap.Any("coll", coll))
fmt.Println(coll.Schema.Fields[0])
fmt.Println(coll.Schema.Fields[1])
fmt.Println(coll.Schema.Fields[2])

index, err := c.DescribeIndex(ctx, "hello_milvus_recover", "embeddings")
fmt.Println(index)

indexState, err := c.GetIndexState(ctx, "hello_milvus", "embeddings")
fmt.Println(indexState)
progress, err := c.GetLoadingProgress(ctx, "hello_milvus_recover", []string{})
fmt.Println(progress)

loadState, err := c.GetLoadState(ctx, "hello_milvus_recover", []string{})
fmt.Println(loadState)

}
14 changes: 12 additions & 2 deletions core/proto/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ enum ResponseCode {
Request_Object_Not_Found = 404;
}

message IndexInfo {
string name = 1;
string index_type = 2;
map<string, string> params = 3;
}

/**
* lite version of Collection info
*/
Expand All @@ -46,15 +52,19 @@ message CollectionBackupInfo {
repeated PartitionBackupInfo partition_backups = 13;
uint64 backup_timestamp = 14;
int64 size = 15;
bool has_index = 16;
IndexInfo index_info = 17;
string load_state = 18;
}

message PartitionBackupInfo {
int64 partition_id = 1;
string partition_name = 2;
int64 collection_id = 3;
// array of segment backup
repeated SegmentBackupInfo segment_backups = 7;
int64 size = 8;
repeated SegmentBackupInfo segment_backups = 4;
int64 size = 5;
string load_state = 6;
}

/**
Expand Down
Loading

0 comments on commit ce366e7

Please sign in to comment.