Skip to content

Commit

Permalink
Update bulkinsert timeout logic
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink committed Feb 2, 2023
1 parent ac8731a commit bde67f6
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 9 deletions.
24 changes: 19 additions & 5 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
)

const (
BULKINSERT_TIMEOUT = 10 * 60
BULKINSERT_SLEEP_INTERVAL = 3
BULKINSERT_TIMEOUT = 60 * 60
BULKINSERT_SLEEP_INTERVAL = 5
BACKUP_NAME = "BACKUP_NAME"
COLLECTION_RENAME_SUFFIX = "COLLECTION_RENAME_SUFFIX"
)
Expand Down Expand Up @@ -1228,16 +1228,30 @@ func (b BackupContext) executeBulkInsert(ctx context.Context, coll string, parti
}

func (b BackupContext) watchBulkInsertState(ctx context.Context, taskId int64, timeout int64, sleepSeconds int) error {
start := time.Now().Unix()
for time.Now().Unix()-start < timeout {
lastProgress := 0
lastUpdateTime := time.Now().Unix()
for {
importTaskState, err := b.milvusClient.GetBulkInsertState(ctx, taskId)
log.Debug("bulkinsert task state", zap.Int64("id", taskId), zap.Any("state", importTaskState))
currentTimestamp := time.Now().Unix()
log.Info("bulkinsert task state",
zap.Int64("id", taskId),
zap.Any("state", importTaskState),
zap.Int("progress", importTaskState.Progress()),
zap.Int64("currentTimestamp", currentTimestamp),
zap.Int64("lastUpdateTime", lastUpdateTime))
switch importTaskState.State {
case entity.BulkInsertFailed:
return err
case entity.BulkInsertCompleted:
return nil
default:
currentProgress := importTaskState.Progress()
if currentProgress > lastProgress {
lastUpdateTime = time.Now().Unix()
} else if (currentTimestamp - lastUpdateTime) >= timeout {
log.Warn(fmt.Sprintf("bulkinsert task state progress hang for more than %d s", timeout))
return errors.New("import task timeout")
}
time.Sleep(time.Second * time.Duration(sleepSeconds))
continue
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/gin-gonic/gin v1.8.1
github.com/golang/protobuf v1.5.2
github.com/google/btree v1.0.1
github.com/milvus-io/milvus-sdk-go/v2 v2.1.0
github.com/milvus-io/milvus-sdk-go/v2 v2.2.0
github.com/minio/minio-go/v7 v7.0.17
github.com/pkg/errors v0.9.1
github.com/sony/sonyflake v1.1.0
Expand Down Expand Up @@ -42,6 +42,6 @@ require (

replace (
github.com/apache/pulsar-client-go => github.com/milvus-io/pulsar-client-go v0.6.8
github.com/milvus-io/milvus-sdk-go/v2 => github.com/wayblink/milvus-sdk-go/v2 v2.2.6
github.com/milvus-io/milvus-sdk-go/v2 => github.com/wayblink/milvus-sdk-go/v2 v2.2.12
github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,8 @@ github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6
github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0=
github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY=
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
github.com/wayblink/milvus-sdk-go/v2 v2.2.6 h1:pluRKnnID48bD+XsK9Pmt2wZfI7brikBZuQS1ziJvxQ=
github.com/wayblink/milvus-sdk-go/v2 v2.2.6/go.mod h1:j6gYkAcGGP2dsrnbsjmKCk8y+fZwhgPCsidL6GBA4SE=
github.com/wayblink/milvus-sdk-go/v2 v2.2.12 h1:h0UvUs4SWeIQU/BOuvnXzYhnerVJ6BzunAGviRkUw5A=
github.com/wayblink/milvus-sdk-go/v2 v2.2.12/go.mod h1:4V+s61EAKSc104exqgGEmiTnTyqUDHG2nxTa4Lflz0U=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down

0 comments on commit bde67f6

Please sign in to comment.