Skip to content

Commit

Permalink
Develop CreateBackup
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink committed Aug 18, 2022
1 parent 85e526c commit 8ccb864
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 56 deletions.
19 changes: 0 additions & 19 deletions core/backup.go

This file was deleted.

128 changes: 102 additions & 26 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package core

import (
"context"
"github.com/zilliztech/milvus-backup/internal/proto/datapb"
"github.com/zilliztech/milvus-backup/internal/util/typeutil"
"sync"
"time"

gomilvus "github.com/milvus-io/milvus-sdk-go/v2/client"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
Expand All @@ -14,8 +17,23 @@ import (
"github.com/zilliztech/milvus-backup/internal/proto/commonpb"
"github.com/zilliztech/milvus-backup/internal/proto/schemapb"
"github.com/zilliztech/milvus-backup/internal/util/paramtable"

dcc "github.com/zilliztech/milvus-backup/internal/distributed/datacoord/client"
)

type Backup interface {
// Create backuppb
CreateBackup(context.Context, *backuppb.CreateBackupRequest) (*backuppb.CreateBackupResponse, error)
// Get backuppb with the chosen name
GetBackup(context.Context, *backuppb.GetBackupRequest) (*backuppb.GetBackupResponse, error)
// List backups that contains the given collection name, if collection is not given, return all backups in the cluster
ListBackups(context.Context, *backuppb.ListBackupsRequest) (*backuppb.ListBackupsResponse, error)
// Delete backuppb by given backuppb name
DeleteBackup(context.Context, *backuppb.DeleteBackupRequest) (*backuppb.DeleteBackupResponse, error)
// Load backuppb to milvus, return backuppb load report
LoadBackup(context.Context, *backuppb.LoadBackupRequest) (*backuppb.LoadBackupResponse, error)
}

// makes sure BackupContext implements `Backup`
var _ Backup = (*BackupContext)(nil)

Expand All @@ -29,39 +47,43 @@ type BackupContext struct {
milvusClient gomilvus.Client
//milvusProxyClient proxy.Client
//milvusRootCoordClient *rcc.Client
//milvusDataCoordClient *dcc.Client
milvusDataCoordClient *dcc.Client
//metaClient etcdclient
//storageClient minioclient
started bool
}

func (b *BackupContext) startConnection() error {
milvusAddr := b.milvusSource.GetParams().ProxyCfg.NetworkAddress
c, err := gomilvus.NewGrpcClient(b.ctx, milvusAddr)
func (b *BackupContext) Start() error {
c, err := gomilvus.NewGrpcClient(b.ctx, b.milvusSource.GetProxyAddr())
if err != nil {
log.Error("failed to connect to milvus", zap.Error(err))
return err
}
b.milvusClient = c
//dataCoordClient, err := dcc.NewClient(b.ctx)
//if err != nil {
// log.Error("failed to connect to milvus's datacoord", zap.Error(err))
// return err
//}
//b.milvusDataCoordClient = dataCoordClient
//

dataCoordClient, err := dcc.NewClient(b.ctx, b.milvusSource.GetDatacoordAddr())
if err != nil {
log.Error("failed to connect to milvus's datacoord", zap.Error(err))
return err
}
b.milvusDataCoordClient = dataCoordClient
b.milvusDataCoordClient.Init()
b.milvusDataCoordClient.Start()
//rootCoordClient, err := rcc.NewClient(b.ctx)
//if err != nil {
// log.Error("failed to connect to milvus's rootcoord", zap.Error(err))
// return err
//}
//b.milvusRootCoordClient = rootCoordClient

b.started = true
return nil
}

func (b *BackupContext) closeConnection() error {
func (b *BackupContext) Close() error {
err := b.milvusClient.Close()
//err = b.milvusRootCoordClient.Stop()
//err = b.milvusDataCoordClient.Stop()
err = b.milvusDataCoordClient.Stop()
return err
}

Expand All @@ -70,10 +92,20 @@ func (b *BackupContext) GetMilvusSource() *MilvusSource {
}

func CreateBackupContext(ctx context.Context, params paramtable.ComponentParam) *BackupContext {
var Params paramtable.GrpcServerConfig
Params.InitOnce(typeutil.ProxyRole)
milvusAddr := Params.GetAddress()

var Params2 paramtable.GrpcServerConfig
Params2.InitOnce(typeutil.DataCoordRole)
milvusDatacoordAddr := Params2.GetAddress()

return &BackupContext{
ctx: ctx,
milvusSource: &MilvusSource{
params: params,
params: params,
proxyAddr: milvusAddr,
datacoordAddr: milvusDatacoordAddr,
},
}
}
Expand All @@ -84,11 +116,13 @@ func (b BackupContext) CreateBackup(ctx context.Context, request *backuppb.Creat
b.mu.Lock()
defer b.mu.Unlock()

err := b.startConnection()
if err != nil {
return &backuppb.CreateBackupResponse{
Status: &backuppb.Status{StatusCode: backuppb.StatusCode_ConnectFailed},
}, nil
if !b.started {
err := b.Start()
if err != nil {
return &backuppb.CreateBackupResponse{
Status: &backuppb.Status{StatusCode: backuppb.StatusCode_ConnectFailed},
}, nil
}
}

leveledBackupInfo := &LeveledBackupInfo{}
Expand Down Expand Up @@ -124,7 +158,7 @@ func (b BackupContext) CreateBackup(ctx context.Context, request *backuppb.Creat

log.Info("collections to backup", zap.Any("collections", toBackupCollections))

collectionBackupInfos := make([]*backuppb.CollectionBackupInfo, len(toBackupCollections))
collectionBackupInfos := make([]*backuppb.CollectionBackupInfo, 0)
for _, collection := range toBackupCollections {
// list collection result is not complete
completeCollection, err := b.milvusClient.DescribeCollection(b.ctx, collection.Name)
Expand Down Expand Up @@ -164,14 +198,14 @@ func (b BackupContext) CreateBackup(ctx context.Context, request *backuppb.Creat
}

// 2, get partition level meta
paritionBackupInfos := make([]*backuppb.PartitionBackupInfo, len(toBackupCollections))
partitionBackupInfos := make([]*backuppb.PartitionBackupInfo, 0)
for _, collection := range toBackupCollections {
paritions, err := b.milvusClient.ShowPartitions(b.ctx, collection.Name)
partitions, err := b.milvusClient.ShowPartitions(b.ctx, collection.Name)
if err != nil {
return nil, err
}
for _, partition := range paritions {
paritionBackupInfos = append(paritionBackupInfos, &backuppb.PartitionBackupInfo{
for _, partition := range partitions {
partitionBackupInfos = append(partitionBackupInfos, &backuppb.PartitionBackupInfo{
PartitionId: partition.ID,
PartitionName: partition.Name,
CollectionId: collection.ID,
Expand All @@ -181,19 +215,61 @@ func (b BackupContext) CreateBackup(ctx context.Context, request *backuppb.Creat
}
}
leveledBackupInfo.partitionLevel = &backuppb.PartitionLevelBackupInfo{
Infos: paritionBackupInfos,
Infos: partitionBackupInfos,
}

log.Info("Finish build backup collection meta")
// 3, Flush

// 4, get segment level meta
// todo go sdk 没有ShowSegments方法
segmentBackupInfos := make([]*backuppb.SegmentBackupInfo, 0)
for _, part := range partitionBackupInfos {
collectionID := part.GetCollectionId()
partitionID := part.GetPartitionId()
resp, err := b.milvusDataCoordClient.GetRecoveryInfo(ctx, &datapb.GetRecoveryInfoRequest{
CollectionID: collectionID,
PartitionID: partitionID,
})
if err != nil || resp.Status.ErrorCode != commonpb.ErrorCode_Success {
return nil, err
}
for _, binlogs := range resp.GetBinlogs() {
segmentBackupInfos = append(segmentBackupInfos, &backuppb.SegmentBackupInfo{
SegmentId: binlogs.GetSegmentID(),
CollectionId: collectionID,
PartitionId: partitionID,
NumOfRows: binlogs.GetNumOfRows(),
Binlogs: binlogs.GetFieldBinlogs(),
Deltalogs: binlogs.GetDeltalogs(),
Statslogs: binlogs.GetStatslogs(),
})
}
}
leveledBackupInfo.segmentLevel = &backuppb.SegmentLevelBackupInfo{
Infos: segmentBackupInfos,
}

// 5, copy data

// 5, wrap meta
// 6,wrap meta
completeBackupInfo, err := levelToTree(leveledBackupInfo)
if err != nil {
return nil, err
}

completeBackupInfo.BackupStatus = backuppb.StatusCode_Success
completeBackupInfo.BackupTimestamp = uint64(time.Now().Unix())
completeBackupInfo.Name = request.BackupName
// todo generate ID
completeBackupInfo.Id = 0

output, _ := serialize(completeBackupInfo)
log.Info(string(output.BackupMetaBytes))
log.Info(string(output.CollectionMetaBytes))
log.Info(string(output.PartitionMetaBytes))
log.Info(string(output.SegmentMetaBytes))

return &backuppb.CreateBackupResponse{
Status: &backuppb.Status{
StatusCode: backuppb.StatusCode_Success,
Expand Down
20 changes: 20 additions & 0 deletions core/backup_context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package core

import (
"context"
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
"github.com/zilliztech/milvus-backup/internal/util/paramtable"
"testing"
)

func TestCreateBackup(t *testing.T) {
var params paramtable.ComponentParam
params.InitOnce()
context := context.Background()
backup := CreateBackupContext(context, params)

req := &backuppb.CreateBackupRequest{
BackupName: "test_backup",
}
backup.CreateBackup(context, req)
}
12 changes: 6 additions & 6 deletions core/backup_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,24 +121,24 @@ func serialize(backup *backuppb.BackupInfo) (*BackupMetaBytes, error) {
// levelToTree rebuild complete tree structure BackupInfo from backup-collection-partition-segment 4-level structure
func levelToTree(level *LeveledBackupInfo) (*backuppb.BackupInfo, error) {
backupInfo := &backuppb.BackupInfo{}
segmentDict := make(map[string][]*backuppb.SegmentBackupInfo, len(level.segmentLevel.Infos))
for _, segment := range level.segmentLevel.Infos {
segmentDict := make(map[string][]*backuppb.SegmentBackupInfo, len(level.segmentLevel.GetInfos()))
for _, segment := range level.segmentLevel.GetInfos() {
unqiueId := fmt.Sprintf("%d-%d", segment.GetCollectionId(), segment.GetPartitionId())
segmentDict[unqiueId] = append(segmentDict[unqiueId], segment)
}

partitionDict := make(map[int64][]*backuppb.PartitionBackupInfo, len(level.partitionLevel.Infos))
for _, partition := range level.partitionLevel.Infos {
partitionDict := make(map[int64][]*backuppb.PartitionBackupInfo, len(level.partitionLevel.GetInfos()))
for _, partition := range level.partitionLevel.GetInfos() {
unqiueId := partition.GetCollectionId()
partition.SegmentBackups = segmentDict[fmt.Sprintf("%d-%d", partition.GetCollectionId(), partition.GetPartitionId())]
partitionDict[unqiueId] = append(partitionDict[unqiueId], partition)
}

for _, collection := range level.collectionLevel.Infos {
for _, collection := range level.collectionLevel.GetInfos() {
collection.PartitionBackups = partitionDict[collection.GetCollectionId()]
}

backupInfo.CollectionBackups = level.collectionLevel.Infos
backupInfo.CollectionBackups = level.collectionLevel.GetInfos()
return backupInfo, nil
}

Expand Down
16 changes: 11 additions & 5 deletions core/milvus_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@ package core
import "github.com/zilliztech/milvus-backup/internal/util/paramtable"

type MilvusSource struct {
params paramtable.ComponentParam
params paramtable.ComponentParam
proxyAddr string
datacoordAddr string
}

func (m *MilvusSource) GetParams() paramtable.ComponentParam {
return m.params
func (m *MilvusSource) GetProxyAddr() string {
return m.proxyAddr
}

func (m *MilvusSource) GetDatacoordAddr() string {
return m.datacoordAddr
}

func (m *MilvusSource) SetParams(params paramtable.ComponentParam) {
m.params = params
func (m *MilvusSource) GetParams() paramtable.ComponentParam {
return m.params
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/golang/protobuf v1.5.2
github.com/google/btree v1.0.1
github.com/milvus-io/milvus-sdk-go/v2 v2.1.0
github.com/minio/minio-go/v7 v7.0.10
github.com/pkg/errors v0.9.1
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72
github.com/spf13/cast v1.3.1
Expand All @@ -22,6 +23,8 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)

require github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76

replace (
github.com/apache/pulsar-client-go => github.com/milvus-io/pulsar-client-go v0.6.8
github.com/milvus-io/milvus-sdk-go/v2 => github.com/wayblink/milvus-sdk-go/v2 v2.1.1
Expand Down

0 comments on commit 8ccb864

Please sign in to comment.