Skip to content

Commit

Permalink
core: concurrency copy
Browse files Browse the repository at this point in the history
  • Loading branch information
huanghaoyuanhhy committed Feb 9, 2023
1 parent 05f5243 commit b9c8bd7
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 49 deletions.
118 changes: 69 additions & 49 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
"github.com/zilliztech/milvus-backup/core/storage"
"github.com/zilliztech/milvus-backup/core/utils"
"github.com/zilliztech/milvus-backup/internal/common"
"github.com/zilliztech/milvus-backup/internal/log"
"github.com/zilliztech/milvus-backup/internal/util/retry"

Expand All @@ -26,6 +27,8 @@ const (
BULKINSERT_SLEEP_INTERVAL = 5
BACKUP_NAME = "BACKUP_NAME"
COLLECTION_RENAME_SUFFIX = "COLLECTION_RENAME_SUFFIX"
WORKER_NUM = 100
RPS = 1000
)

// makes sure BackupContext implements `Backup`
Expand Down Expand Up @@ -569,6 +572,8 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
}
}

wp := common.NewWorkerPool(ctx, WORKER_NUM, RPS)
wp.Start()
for _, segment := range segmentBackupInfos {
start := time.Now().Unix()
log.Debug("copy segment",
Expand Down Expand Up @@ -599,32 +604,38 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
return backupInfo, err
}

exist, err := b.storageClient.Exist(ctx, b.milvusBucketName, binlog.GetLogPath())
if err != nil {
log.Info("Fail to check file exist",
zap.Error(err),
zap.String("file", binlog.GetLogPath()))
return backupInfo, err
}
if !exist {
log.Error("Binlog file not exist",
zap.Error(err),
zap.String("file", binlog.GetLogPath()))
return backupInfo, err
}

err = b.storageClient.Copy(ctx, b.milvusBucketName, b.backupBucketName, binlog.GetLogPath(), targetPath)
if err != nil {
log.Info("Fail to copy file",
zap.Error(err),
zap.String("from", binlog.GetLogPath()),
zap.String("to", targetPath))
return backupInfo, err
} else {
log.Debug("Successfully copy file",
zap.String("from", binlog.GetLogPath()),
zap.String("to", targetPath))
binlog := binlog
job := func(ctx context.Context) error {
exist, err := b.storageClient.Exist(ctx, b.milvusBucketName, binlog.GetLogPath())
if err != nil {
log.Info("Fail to check file exist",
zap.Error(err),
zap.String("file", binlog.GetLogPath()))
return err
}
if !exist {
log.Error("Binlog file not exist",
zap.Error(err),
zap.String("file", binlog.GetLogPath()))
return err
}

err = b.storageClient.Copy(ctx, b.milvusBucketName, b.backupBucketName, binlog.GetLogPath(), targetPath)
if err != nil {
log.Info("Fail to copy file",
zap.Error(err),
zap.String("from", binlog.GetLogPath()),
zap.String("to", targetPath))
return err
} else {
log.Debug("Successfully copy file",
zap.String("from", binlog.GetLogPath()),
zap.String("to", targetPath))
}

return nil
}
wp.Submit(job)
}
}
// delta log
Expand All @@ -650,31 +661,36 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
return backupInfo, err
}

exist, err := b.storageClient.Exist(ctx, b.milvusBucketName, binlog.GetLogPath())
if err != nil {
log.Info("Fail to check file exist",
zap.Error(err),
zap.String("file", binlog.GetLogPath()))
return backupInfo, err
}
if !exist {
log.Error("Binlog file not exist",
zap.Error(err),
zap.String("file", binlog.GetLogPath()))
return backupInfo, err
}
err = b.storageClient.Copy(ctx, b.milvusBucketName, b.backupBucketName, binlog.GetLogPath(), targetPath)
if err != nil {
log.Info("Fail to copy file",
zap.Error(err),
zap.String("from", binlog.GetLogPath()),
zap.String("to", targetPath))
return backupInfo, err
} else {
log.Info("Successfully copy file",
zap.String("from", binlog.GetLogPath()),
zap.String("to", targetPath))
binlog := binlog
job := func(ctx context.Context) error {
exist, err := b.storageClient.Exist(ctx, b.milvusBucketName, binlog.GetLogPath())
if err != nil {
log.Info("Fail to check file exist",
zap.Error(err),
zap.String("file", binlog.GetLogPath()))
return err
}
if !exist {
log.Error("Binlog file not exist",
zap.Error(err),
zap.String("file", binlog.GetLogPath()))
return err
}
err = b.storageClient.Copy(ctx, b.milvusBucketName, b.backupBucketName, binlog.GetLogPath(), targetPath)
if err != nil {
log.Info("Fail to copy file",
zap.Error(err),
zap.String("from", binlog.GetLogPath()),
zap.String("to", targetPath))
return err
} else {
log.Info("Successfully copy file",
zap.String("from", binlog.GetLogPath()),
zap.String("to", targetPath))
}
return err
}
wp.Submit(job)
}
}
duration := time.Now().Unix() - start
Expand All @@ -684,6 +700,10 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
zap.Int64("segment_id", segment.GetSegmentId()),
zap.Int64("cost_time", duration))
}
wp.Done()
if err := wp.Wait(); err != nil {
return backupInfo, err
}
refreshBackupMetaFunc(id, leveledBackupInfo)
}

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ require (
golang.org/x/crypto v0.3.0 // indirect
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6
golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602
golang.org/x/sync v0.1.0
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.3.0 // indirect
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -663,6 +664,7 @@ golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
59 changes: 59 additions & 0 deletions internal/common/workerpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package common

import (
"context"
"time"

"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)

// WorkerPool a pool that can control the total amount and rate of concurrency
type WorkerPool struct {
job chan Job
g *errgroup.Group
subCtx context.Context

workerNum int
lim *rate.Limiter
}

type Job func(ctx context.Context) error

// NewWorkerPool build a worker pool, rps 0 is unlimited
func NewWorkerPool(ctx context.Context, workerNum int, rps int32) *WorkerPool {
g, subCtx := errgroup.WithContext(ctx)

var lim *rate.Limiter
if rps != 0 {
lim = rate.NewLimiter(rate.Every(time.Second/time.Duration(rps)), 1)
}

return &WorkerPool{job: make(chan Job), workerNum: workerNum, g: g, lim: lim, subCtx: subCtx}
}

func (p *WorkerPool) Start() {
for i := 0; i < p.workerNum; i++ {
p.g.Go(p.work)
}
}

func (p *WorkerPool) work() error {
for job := range p.job {
if p.lim != nil {
if err := p.lim.Wait(p.subCtx); err != nil {
return err
}
}

if err := job(p.subCtx); err != nil {
return err
}
}

return nil
}

func (p *WorkerPool) Submit(job Job) { p.job <- job }
func (p *WorkerPool) Done() { close(p.job) }
func (p *WorkerPool) Wait() error { return p.g.Wait() }

0 comments on commit b9c8bd7

Please sign in to comment.