Skip to content

Commit

Permalink
Support progress (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink authored Nov 29, 2022
1 parent 9d4c3e4 commit a5fa980
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 178 deletions.
76 changes: 54 additions & 22 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,11 +415,17 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
}

for _, partition := range partitions {
partitionSegments := partSegInfoMap[partition.ID]
var size int64 = 0
for _, seg := range partitionSegments {
size += seg.GetSize()
}
partitionBackupInfo := &backuppb.PartitionBackupInfo{
PartitionId: partition.ID,
PartitionName: partition.Name,
CollectionId: collection.GetCollectionId(),
SegmentBackups: partSegInfoMap[partition.ID],
Size: size,
}
partitionBackupInfos = append(partitionBackupInfos, partitionBackupInfo)
partitionLevelBackupInfos = append(partitionLevelBackupInfos, partitionBackupInfo)
Expand Down Expand Up @@ -517,6 +523,12 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
}
refreshBackupMetaFunc(id, leveledBackupInfo)
}

var backupSize int64 = 0
for _, coll := range leveledBackupInfo.collectionLevel.GetInfos() {
backupSize += coll.GetSize()
}
backupInfo.Size = backupSize
backupInfo, err := refreshBackupMetaFunc(id, leveledBackupInfo)
if err != nil {
return backupInfo, err
Expand Down Expand Up @@ -832,17 +844,26 @@ func (b BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Rest
return resp
}

var toRestoreSize int64 = 0
for _, partitionBackup := range restoreCollection.GetPartitionBackups() {
toRestoreSize += partitionBackup.GetSize()
}
id := utils.UUID()

restoreCollectionTask := &backuppb.RestoreCollectionTask{
Id: id,
StateCode: backuppb.RestoreTaskStateCode_INITIAL,
StartTime: time.Now().Unix(),
CollBackup: restoreCollection,
TargetCollectionName: targetCollectionName,
PartitionRestoreTasks: []*backuppb.RestorePartitionTask{},
ToRestoreSize: toRestoreSize,
RestoredSize: 0,
Progress: 0,
}
restoreCollectionTasks = append(restoreCollectionTasks, restoreCollectionTask)
task.CollectionRestoreTasks = restoreCollectionTasks
task.ToRestoreSize = task.GetToRestoreSize() + toRestoreSize
}

if request.Async {
Expand Down Expand Up @@ -875,6 +896,7 @@ func (b BackupContext) executeRestoreBackupTask(ctx context.Context, backup *bac

id := task.GetId()
b.restoreTasks[id] = task
task.StateCode = backuppb.RestoreTaskStateCode_EXECUTING

log.With(zap.String("id", id),
zap.String("backup_name", backup.GetName()))
Expand All @@ -887,7 +909,7 @@ func (b BackupContext) executeRestoreBackupTask(ctx context.Context, backup *bac

// 3, execute restoreCollectionTasks
for _, restoreCollectionTask := range restoreCollectionTasks {
_, err := b.executeRestoreCollectionTask(ctx, backup.GetName(), restoreCollectionTask)
endTask, err := b.executeRestoreCollectionTask(ctx, backup.GetName(), restoreCollectionTask)
log.Info("end restore", zap.String("collection_name", restoreCollectionTask.GetTargetCollectionName()))
if err != nil {
log.Error("executeRestoreCollectionTask failed",
Expand All @@ -896,6 +918,9 @@ func (b BackupContext) executeRestoreBackupTask(ctx context.Context, backup *bac
return task, err
}
restoreCollectionTask.StateCode = backuppb.RestoreTaskStateCode_SUCCESS
task.RestoredSize += endTask.RestoredSize
task.Progress = int32(100 * task.GetRestoredSize() / task.GetToRestoreSize())
updateRestoreTaskFunc(id, task)
}

task.StateCode = backuppb.RestoreTaskStateCode_SUCCESS
Expand Down Expand Up @@ -977,6 +1002,8 @@ func (b BackupContext) executeRestoreCollectionTask(ctx context.Context, backupN
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
task.RestoredSize = task.RestoredSize + partitionBackup.GetSize()
task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize)
}

return task, err
Expand Down Expand Up @@ -1103,6 +1130,7 @@ func (b BackupContext) readSegmentInfo(ctx context.Context, collecitonID int64,
PartitionId: partitionID,
NumOfRows: numOfRows,
}
var size int64 = 0

insertPath := fmt.Sprintf("%s/%s/%v/%v/%v/", b.params.MinioCfg.RootPath, "insert_log", collecitonID, partitionID, segmentID)
log.Debug("insertPath", zap.String("insertPath", insertPath))
Expand All @@ -1119,6 +1147,7 @@ func (b BackupContext) readSegmentInfo(ctx context.Context, collecitonID int64,
LogPath: binlogPath,
LogSize: sizes[index],
})
size += sizes[index]
}
insertLogs = append(insertLogs, &backuppb.FieldBinlog{
FieldID: fieldId,
Expand All @@ -1139,6 +1168,7 @@ func (b BackupContext) readSegmentInfo(ctx context.Context, collecitonID int64,
LogPath: binlogPath,
LogSize: sizes[index],
})
size += sizes[index]
}
deltaLogs = append(deltaLogs, &backuppb.FieldBinlog{
FieldID: fieldId,
Expand All @@ -1151,29 +1181,31 @@ func (b BackupContext) readSegmentInfo(ctx context.Context, collecitonID int64,
})
}

statsLogPath := fmt.Sprintf("%s/%s/%v/%v/%v/", b.params.MinioCfg.RootPath, "stats_log", collecitonID, partitionID, segmentID)
statsFieldsLogDir, _, _ := b.storageClient.ListWithPrefix(ctx, b.milvusBucketName, statsLogPath, false)
statsLogs := make([]*backuppb.FieldBinlog, 0)
for _, statsFieldLogDir := range statsFieldsLogDir {
binlogPaths, sizes, _ := b.storageClient.ListWithPrefix(ctx, b.milvusBucketName, statsFieldLogDir, false)
fieldIdStr := strings.Replace(strings.Replace(statsFieldLogDir, statsLogPath, "", 1), SEPERATOR, "", -1)
fieldId, _ := strconv.ParseInt(fieldIdStr, 10, 64)
binlogs := make([]*backuppb.Binlog, 0)
for index, binlogPath := range binlogPaths {
binlogs = append(binlogs, &backuppb.Binlog{
LogPath: binlogPath,
LogSize: sizes[index],
})
}
statsLogs = append(statsLogs, &backuppb.FieldBinlog{
FieldID: fieldId,
Binlogs: binlogs,
})
}
//statsLogPath := fmt.Sprintf("%s/%s/%v/%v/%v/", b.params.MinioCfg.RootPath, "stats_log", collecitonID, partitionID, segmentID)
//statsFieldsLogDir, _, _ := b.storageClient.ListWithPrefix(ctx, b.milvusBucketName, statsLogPath, false)
//statsLogs := make([]*backuppb.FieldBinlog, 0)
//for _, statsFieldLogDir := range statsFieldsLogDir {
// binlogPaths, sizes, _ := b.storageClient.ListWithPrefix(ctx, b.milvusBucketName, statsFieldLogDir, false)
// fieldIdStr := strings.Replace(strings.Replace(statsFieldLogDir, statsLogPath, "", 1), SEPERATOR, "", -1)
// fieldId, _ := strconv.ParseInt(fieldIdStr, 10, 64)
// binlogs := make([]*backuppb.Binlog, 0)
// for index, binlogPath := range binlogPaths {
// binlogs = append(binlogs, &backuppb.Binlog{
// LogPath: binlogPath,
// LogSize: sizes[index],
// })
// }
// statsLogs = append(statsLogs, &backuppb.FieldBinlog{
// FieldID: fieldId,
// Binlogs: binlogs,
// })
//}

segmentBackupInfo.Binlogs = insertLogs
segmentBackupInfo.Deltalogs = deltaLogs
segmentBackupInfo.Statslogs = statsLogs
//segmentBackupInfo.Statslogs = statsLogs

segmentBackupInfo.Size = size
return &segmentBackupInfo, nil
}

Expand Down Expand Up @@ -1207,7 +1239,7 @@ func (b *BackupContext) GetRestore(ctx context.Context, request *backuppb.GetRes
if value, ok := b.restoreTasks[request.GetId()]; ok {
resp.Code = backuppb.ResponseCode_Success
resp.Msg = "success"
resp.Data = value
resp.Data = UpdateRestoreBackupTask(value)
return resp
} else {
resp.Code = backuppb.ResponseCode_Fail
Expand Down
32 changes: 26 additions & 6 deletions core/backup_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func treeToLevel(backup *backuppb.BackupInfo) (LeveledBackupInfo, error) {
ShardsNum: collectionBack.GetShardsNum(),
ConsistencyLevel: collectionBack.GetConsistencyLevel(),
BackupTimestamp: collectionBack.GetBackupTimestamp(),
Size: collectionBack.GetSize(),
}
collections = append(collections, cloneCollectionBackup)

Expand All @@ -59,6 +60,7 @@ func treeToLevel(backup *backuppb.BackupInfo) (LeveledBackupInfo, error) {
PartitionId: partitionBack.GetPartitionId(),
PartitionName: partitionBack.GetPartitionName(),
CollectionId: partitionBack.GetCollectionId(),
Size: partitionBack.GetSize(),
}
partitions = append(partitions, clonePartitionBackupInfo)

Expand Down Expand Up @@ -137,6 +139,7 @@ func levelToTree(level *LeveledBackupInfo) (*backuppb.BackupInfo, error) {
Progress: level.backupLevel.GetProgress(),
Name: level.backupLevel.GetName(),
BackupTimestamp: level.backupLevel.GetBackupTimestamp(),
Size: level.backupLevel.GetSize(),
}
segmentDict := make(map[string][]*backuppb.SegmentBackupInfo, len(level.segmentLevel.GetInfos()))
for _, segment := range level.segmentLevel.GetInfos() {
Expand All @@ -152,7 +155,13 @@ func levelToTree(level *LeveledBackupInfo) (*backuppb.BackupInfo, error) {
}

for _, collection := range level.collectionLevel.GetInfos() {
collPartitions := partitionDict[collection.GetCollectionId()]
var size int64 = 0
for _, part := range collPartitions {
size += part.GetSize()
}
collection.PartitionBackups = partitionDict[collection.GetCollectionId()]
collection.Size = size
}

backupInfo.CollectionBackups = level.collectionLevel.GetInfos()
Expand Down Expand Up @@ -275,12 +284,13 @@ func SimpleRestoreResponse(input *backuppb.RestoreBackupResponse) *backuppb.Rest
}

simpleRestore := &backuppb.RestoreBackupTask{
Id: restore.GetId(),
StateCode: restore.GetStateCode(),
ErrorMessage: restore.GetErrorMessage(),
StartTime: restore.GetStartTime(),
EndTime: restore.GetEndTime(),
Progress: restore.GetProgress(),
Id: restore.GetId(),
StateCode: restore.GetStateCode(),
ErrorMessage: restore.GetErrorMessage(),
StartTime: restore.GetStartTime(),
EndTime: restore.GetEndTime(),
CollectionRestoreTasks: collectionRestores,
Progress: restore.GetProgress(),
}

return &backuppb.RestoreBackupResponse{
Expand All @@ -290,3 +300,13 @@ func SimpleRestoreResponse(input *backuppb.RestoreBackupResponse) *backuppb.Rest
Data: simpleRestore,
}
}

func UpdateRestoreBackupTask(input *backuppb.RestoreBackupTask) *backuppb.RestoreBackupTask {
var storedSize int64 = 0
for _, coll := range input.GetCollectionRestoreTasks() {
storedSize += coll.GetRestoredSize()
}
progress := int32(100 * storedSize / input.ToRestoreSize)
input.Progress = progress
return input
}
20 changes: 14 additions & 6 deletions core/proto/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ message CollectionBackupInfo {
ConsistencyLevel consistency_level = 12;
repeated PartitionBackupInfo partition_backups = 13;
uint64 backup_timestamp = 14;
int64 size = 15;
}

message PartitionBackupInfo {
Expand All @@ -53,6 +54,7 @@ message PartitionBackupInfo {
int64 collection_id = 3;
// array of segment backup
repeated SegmentBackupInfo segment_backups = 7;
int64 size = 8;
}

/**
Expand All @@ -66,6 +68,7 @@ message SegmentBackupInfo {
repeated FieldBinlog binlogs = 5;
repeated FieldBinlog statslogs = 6;
repeated FieldBinlog deltalogs = 7;
int64 size = 8;
}

/**
Expand All @@ -83,6 +86,7 @@ message BackupInfo {
uint64 backup_timestamp = 8;
// array of collection backup
repeated CollectionBackupInfo collection_backups = 9;
int64 size = 10;
}

/**
Expand Down Expand Up @@ -226,10 +230,12 @@ message RestoreCollectionTask {
string errorMessage = 3;
int64 start_time = 4;
int64 end_time = 5;
int32 progress = 6;
CollectionBackupInfo coll_backup = 7;
string target_collection_name = 8;
repeated RestorePartitionTask partition_restore_tasks = 9;
CollectionBackupInfo coll_backup = 6;
string target_collection_name = 7;
repeated RestorePartitionTask partition_restore_tasks = 8;
int64 restored_size = 9;
int64 to_restore_size = 10;
int32 progress = 11;
}

message RestoreBackupTask {
Expand All @@ -238,8 +244,10 @@ message RestoreBackupTask {
string errorMessage = 3;
int64 start_time = 4;
int64 end_time = 5;
int32 progress = 6;
repeated RestoreCollectionTask collection_restore_tasks = 7;
repeated RestoreCollectionTask collection_restore_tasks = 6;
int64 restored_size = 7;
int64 to_restore_size = 8;
int32 progress = 9;
}

message RestoreBackupResponse {
Expand Down
Loading

0 comments on commit a5fa980

Please sign in to comment.