Skip to content

Commit

Permalink
Update backup proto, prepare to support get create/load backup state (#9
Browse files Browse the repository at this point in the history
)
  • Loading branch information
wayblink authored Nov 8, 2022
1 parent e2fdc96 commit 05fe91d
Show file tree
Hide file tree
Showing 5 changed files with 566 additions and 515 deletions.
6 changes: 4 additions & 2 deletions core/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ import (

type Backup interface {
// Create backuppb
CreateBackup(context.Context, *backuppb.CreateBackupRequest) (*backuppb.CreateBackupResponse, error)
CreateBackup(context.Context, *backuppb.CreateBackupRequest) (*backuppb.BackupInfoResponse, error)
// Get backuppb with the chosen name
GetBackup(context.Context, *backuppb.GetBackupRequest) (*backuppb.GetBackupResponse, error)
GetBackup(context.Context, *backuppb.GetBackupRequest) (*backuppb.BackupInfoResponse, error)
// List backups that contains the given collection name, if collection is not given, return all backups in the cluster
ListBackups(context.Context, *backuppb.ListBackupsRequest) (*backuppb.ListBackupsResponse, error)
// Delete backuppb by given backuppb name
DeleteBackup(context.Context, *backuppb.DeleteBackupRequest) (*backuppb.DeleteBackupResponse, error)
// Load backuppb to milvus, return backuppb load report
LoadBackup(context.Context, *backuppb.LoadBackupRequest) (*backuppb.LoadBackupResponse, error)

GetLoadBackupState(context.Context, *backuppb.LoadBackupRequest) (*backuppb.LoadBackupResponse, error)

// Copy backuppb between buckets
//CopyBackup(context.Context, *backuppb.CopyBackupRequest) (*backuppb.CopyBackupResponse, error)
}
40 changes: 23 additions & 17 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ type BackupContext struct {
backupRootPath string
}

func (b *BackupContext) GetLoadBackupState(ctx context.Context, request *backuppb.LoadBackupRequest) (*backuppb.LoadBackupResponse, error) {
//TODO implement me
panic("implement me")
}

func (b *BackupContext) Start() error {
// start milvus go SDK client
milvusEndpoint := b.params.MilvusCfg.Address + ":" + b.params.MilvusCfg.Port
Expand Down Expand Up @@ -104,22 +109,20 @@ func CreateBackupContext(ctx context.Context, params paramtable.BackupParams) *B
}
}

// todo refine error handle
// todo support get create backup progress
func (b BackupContext) CreateBackup(ctx context.Context, request *backuppb.CreateBackupRequest) (*backuppb.CreateBackupResponse, error) {
func (b BackupContext) CreateBackup(ctx context.Context, request *backuppb.CreateBackupRequest) (*backuppb.BackupInfoResponse, error) {
b.mu.Lock()
defer b.mu.Unlock()

if !b.started {
err := b.Start()
if err != nil {
return &backuppb.CreateBackupResponse{
return &backuppb.BackupInfoResponse{
Status: &backuppb.Status{StatusCode: backuppb.StatusCode_ConnectFailed},
}, nil
}
}

errorResp := &backuppb.CreateBackupResponse{
errorResp := &backuppb.BackupInfoResponse{
Status: &backuppb.Status{
StatusCode: backuppb.StatusCode_UnexpectedError,
},
Expand Down Expand Up @@ -318,15 +321,16 @@ func (b BackupContext) CreateBackup(ctx context.Context, request *backuppb.Creat
errorResp.Status.Reason = err.Error()
return errorResp, nil
}
completeBackupInfo.BackupStatus = backuppb.StatusCode_Success
completeBackupInfo.BackupTimestamp = uint64(time.Now().Unix())
if request.GetBackupName() == "" {
completeBackupInfo.Name = "backup_" + fmt.Sprint(time.Now().Unix())
} else {
completeBackupInfo.Name = request.BackupName
}
// todo generate ID
completeBackupInfo.Id = 0
completeBackupInfo.BackupState = &backuppb.BackupTaskState{
Id: 0,
}

// 6, copy data
for _, segment := range segmentBackupInfos {
Expand Down Expand Up @@ -429,28 +433,28 @@ func (b BackupContext) CreateBackup(ctx context.Context, request *backuppb.Creat
b.storageClient.Write(ctx, b.backupBucketName, PartitionMetaPath(b.backupRootPath, completeBackupInfo.GetName()), output.PartitionMetaBytes)
b.storageClient.Write(ctx, b.backupBucketName, SegmentMetaPath(b.backupRootPath, completeBackupInfo.GetName()), output.SegmentMetaBytes)

return &backuppb.CreateBackupResponse{
return &backuppb.BackupInfoResponse{
Status: &backuppb.Status{
StatusCode: backuppb.StatusCode_Success,
},
BackupInfo: completeBackupInfo,
}, nil
}

func (b BackupContext) GetBackup(ctx context.Context, request *backuppb.GetBackupRequest) (*backuppb.GetBackupResponse, error) {
func (b BackupContext) GetBackup(ctx context.Context, request *backuppb.GetBackupRequest) (*backuppb.BackupInfoResponse, error) {
// 1, trigger inner sync to get the newest backup list in the milvus cluster
if !b.started {
err := b.Start()
if err != nil {
return &backuppb.GetBackupResponse{
return &backuppb.BackupInfoResponse{
Status: &backuppb.Status{
StatusCode: backuppb.StatusCode_ConnectFailed,
},
}, nil
}
}

resp := &backuppb.GetBackupResponse{
resp := &backuppb.BackupInfoResponse{
Status: &backuppb.Status{
StatusCode: backuppb.StatusCode_UnexpectedError,
},
Expand All @@ -468,7 +472,7 @@ func (b BackupContext) GetBackup(ctx context.Context, request *backuppb.GetBacku
return resp, nil
}

return &backuppb.GetBackupResponse{
return &backuppb.BackupInfoResponse{
Status: &backuppb.Status{
StatusCode: backuppb.StatusCode_Success,
},
Expand Down Expand Up @@ -680,7 +684,9 @@ func (b BackupContext) LoadBackup(ctx context.Context, request *backuppb.LoadBac
}

task := &backuppb.LoadCollectionTask{
State: backuppb.LoadState_INTIAL,
LoadState: &backuppb.LoadTaskState{
Code: backuppb.LoadTaskStateCode_LOAD_INITIAL,
},
CollBackup: loadCollection,
TargetCollectionName: targetCollectionName,
PartitionLoadTasks: []*backuppb.LoadPartitionTask{},
Expand All @@ -693,12 +699,12 @@ func (b BackupContext) LoadBackup(ctx context.Context, request *backuppb.LoadBac
for _, task := range loadCollectionTasks {
err := b.executeLoadTask(ctx, backup.GetName(), task)
if err != nil {
task.ErrorMessage = err.Error()
task.State = backuppb.LoadState_FAIL
task.LoadState.ErrorMessage = err.Error()
task.LoadState.Code = backuppb.LoadTaskStateCode_LOAD_FAIL
resp.Status.Reason = err.Error()
return resp, nil
}
task.State = backuppb.LoadState_SUCCESS
task.LoadState.Code = backuppb.LoadTaskStateCode_LOAD_SUCCESS
}

resp.Status.StatusCode = backuppb.StatusCode_Success
Expand All @@ -707,7 +713,7 @@ func (b BackupContext) LoadBackup(ctx context.Context, request *backuppb.LoadBac

func (b BackupContext) executeLoadTask(ctx context.Context, backupName string, task *backuppb.LoadCollectionTask) error {
targetCollectionName := task.GetTargetCollectionName()
task.State = backuppb.LoadState_EXECUTING
task.LoadState.Code = backuppb.LoadTaskStateCode_LOAD_EXECUTING
log.With(zap.String("backupName", backupName))
// create collection
fields := make([]*entity.Field, 0)
Expand Down
14 changes: 2 additions & 12 deletions core/backup_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
)

const (
BACKUP_PREFIX = "backup"
META_PREFIX = "meta"
BACKUP_META_FILE = "backup_meta.json"
COLLECTION_META_FILE = "collection_meta.json"
Expand Down Expand Up @@ -51,9 +50,6 @@ func treeToLevel(backup *backuppb.BackupInfo) (LeveledBackupInfo, error) {
ShardsNum: collectionBack.GetShardsNum(),
ConsistencyLevel: collectionBack.GetConsistencyLevel(),
BackupTimestamp: collectionBack.GetBackupTimestamp(),
BackupStatus: collectionBack.GetBackupStatus(),
BackupError: collectionBack.GetBackupError(),
Health: collectionBack.GetHealth(),
}
collections = append(collections, cloneCollectionBackup)

Expand Down Expand Up @@ -81,12 +77,9 @@ func treeToLevel(backup *backuppb.BackupInfo) (LeveledBackupInfo, error) {
Infos: segments,
}
backupLevel := &backuppb.BackupInfo{
Id: backup.GetId(),
BackupState: backup.GetBackupState(),
Name: backup.GetName(),
BackupTimestamp: backup.GetBackupTimestamp(),
BackupStatus: backup.GetBackupStatus(),
BackupError: backup.GetBackupError(),
Health: backup.GetHealth(),
}

return LeveledBackupInfo{
Expand Down Expand Up @@ -130,12 +123,9 @@ func serialize(backup *backuppb.BackupInfo) (*BackupMetaBytes, error) {
// levelToTree rebuild complete tree structure BackupInfo from backup-collection-partition-segment 4-level structure
func levelToTree(level *LeveledBackupInfo) (*backuppb.BackupInfo, error) {
backupInfo := &backuppb.BackupInfo{
Id: level.backupLevel.GetId(),
BackupState: level.backupLevel.GetBackupState(),
Name: level.backupLevel.GetName(),
BackupTimestamp: level.backupLevel.GetBackupTimestamp(),
BackupStatus: level.backupLevel.GetBackupStatus(),
BackupError: level.backupLevel.GetBackupError(),
Health: level.backupLevel.GetHealth(),
}
segmentDict := make(map[string][]*backuppb.SegmentBackupInfo, len(level.segmentLevel.GetInfos()))
for _, segment := range level.segmentLevel.GetInfos() {
Expand Down
Loading

0 comments on commit 05fe91d

Please sign in to comment.