Skip to content

Commit

Permalink
core: concurrency copy
Browse files Browse the repository at this point in the history
  • Loading branch information
huanghaoyuanhhy committed Feb 9, 2023
1 parent 05f5243 commit c9bbea2
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 49 deletions.
118 changes: 69 additions & 49 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
"github.com/zilliztech/milvus-backup/core/storage"
"github.com/zilliztech/milvus-backup/core/utils"
"github.com/zilliztech/milvus-backup/internal/common"
"github.com/zilliztech/milvus-backup/internal/log"
"github.com/zilliztech/milvus-backup/internal/util/retry"

Expand All @@ -26,6 +27,8 @@ const (
BULKINSERT_SLEEP_INTERVAL = 5
BACKUP_NAME = "BACKUP_NAME"
COLLECTION_RENAME_SUFFIX = "COLLECTION_RENAME_SUFFIX"
WORKER_NUM = 100
RPS = 1000
)

// makes sure BackupContext implements `Backup`
Expand Down Expand Up @@ -569,6 +572,8 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
}
}

wp := common.NewWorkerPool(ctx, WORKER_NUM, RPS)
wp.Start()
for _, segment := range segmentBackupInfos {
start := time.Now().Unix()
log.Debug("copy segment",
Expand Down Expand Up @@ -599,32 +604,38 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
return backupInfo, err
}

exist, err := b.storageClient.Exist(ctx, b.milvusBucketName, binlog.GetLogPath())
if err != nil {
log.Info("Fail to check file exist",
zap.Error(err),
zap.String("file", binlog.GetLogPath()))
return backupInfo, err
}
if !exist {
log.Error("Binlog file not exist",
zap.Error(err),
zap.String("file", binlog.GetLogPath()))
return backupInfo, err
}

err = b.storageClient.Copy(ctx, b.milvusBucketName, b.backupBucketName, binlog.GetLogPath(), targetPath)
if err != nil {
log.Info("Fail to copy file",
zap.Error(err),
zap.String("from", binlog.GetLogPath()),
zap.String("to", targetPath))
return backupInfo, err
} else {
log.Debug("Successfully copy file",
zap.String("from", binlog.GetLogPath()),
zap.String("to", targetPath))
binlog := binlog
job := func(ctx context.Context) error {
exist, err := b.storageClient.Exist(ctx, b.milvusBucketName, binlog.GetLogPath())
if err != nil {
log.Info("Fail to check file exist",
zap.Error(err),
zap.String("file", binlog.GetLogPath()))
return err
}
if !exist {
log.Error("Binlog file not exist",
zap.Error(err),
zap.String("file", binlog.GetLogPath()))
return err
}

err = b.storageClient.Copy(ctx, b.milvusBucketName, b.backupBucketName, binlog.GetLogPath(), targetPath)
if err != nil {
log.Info("Fail to copy file",
zap.Error(err),
zap.String("from", binlog.GetLogPath()),
zap.String("to", targetPath))
return err
} else {
log.Debug("Successfully copy file",
zap.String("from", binlog.GetLogPath()),
zap.String("to", targetPath))
}

return nil
}
wp.Submit(job)
}
}
// delta log
Expand All @@ -650,31 +661,36 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
return backupInfo, err
}

exist, err := b.storageClient.Exist(ctx, b.milvusBucketName, binlog.GetLogPath())
if err != nil {
log.Info("Fail to check file exist",
zap.Error(err),
zap.String("file", binlog.GetLogPath()))
return backupInfo, err
}
if !exist {
log.Error("Binlog file not exist",
zap.Error(err),
zap.String("file", binlog.GetLogPath()))
return backupInfo, err
}
err = b.storageClient.Copy(ctx, b.milvusBucketName, b.backupBucketName, binlog.GetLogPath(), targetPath)
if err != nil {
log.Info("Fail to copy file",
zap.Error(err),
zap.String("from", binlog.GetLogPath()),
zap.String("to", targetPath))
return backupInfo, err
} else {
log.Info("Successfully copy file",
zap.String("from", binlog.GetLogPath()),
zap.String("to", targetPath))
binlog := binlog
job := func(ctx context.Context) error {
exist, err := b.storageClient.Exist(ctx, b.milvusBucketName, binlog.GetLogPath())
if err != nil {
log.Info("Fail to check file exist",
zap.Error(err),
zap.String("file", binlog.GetLogPath()))
return err
}
if !exist {
log.Error("Binlog file not exist",
zap.Error(err),
zap.String("file", binlog.GetLogPath()))
return err
}
err = b.storageClient.Copy(ctx, b.milvusBucketName, b.backupBucketName, binlog.GetLogPath(), targetPath)
if err != nil {
log.Info("Fail to copy file",
zap.Error(err),
zap.String("from", binlog.GetLogPath()),
zap.String("to", targetPath))
return err
} else {
log.Info("Successfully copy file",
zap.String("from", binlog.GetLogPath()),
zap.String("to", targetPath))
}
return err
}
wp.Submit(job)
}
}
duration := time.Now().Unix() - start
Expand All @@ -684,6 +700,10 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
zap.Int64("segment_id", segment.GetSegmentId()),
zap.Int64("cost_time", duration))
}
wp.Done()
if err := wp.Wait(); err != nil {
return backupInfo, err
}
refreshBackupMetaFunc(id, leveledBackupInfo)
}

Expand Down
59 changes: 59 additions & 0 deletions internal/common/workerpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package common

import (
"context"
"time"

"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)

// WorkerPool a pool that can control the total amount and rate of concurrency
type WorkerPool struct {
job chan Job
g *errgroup.Group
subCtx context.Context

workerNum int
lim *rate.Limiter
}

type Job func(ctx context.Context) error

// NewWorkerPool build a worker pool, rps 0 is unlimited
func NewWorkerPool(ctx context.Context, workerNum int, rps int32) *WorkerPool {
g, subCtx := errgroup.WithContext(ctx)

var lim *rate.Limiter
if rps != 0 {
lim = rate.NewLimiter(rate.Every(time.Second/time.Duration(rps)), 1)
}

return &WorkerPool{job: make(chan Job), workerNum: workerNum, g: g, lim: lim, subCtx: subCtx}
}

func (p *WorkerPool) Start() {
for i := 0; i < p.workerNum; i++ {
p.g.Go(p.work)
}
}

func (p *WorkerPool) work() error {
for job := range p.job {
if p.lim != nil {
if err := p.lim.Wait(p.subCtx); err != nil {
return err
}
}

if err := job(p.subCtx); err != nil {
return err
}
}

return nil
}

func (p *WorkerPool) Submit(job Job) { p.job <- job }
func (p *WorkerPool) Done() { close(p.job) }
func (p *WorkerPool) Wait() error { return p.g.Wait() }

0 comments on commit c9bbea2

Please sign in to comment.