Skip to content

Commit

Permalink
enhance: Limit import job number (#36891) (#36892)
Browse files Browse the repository at this point in the history
issue: #36890

pr: #36891

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Oct 18, 2024
1 parent 7847b66 commit 4e0f584
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 49 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ dataCoord:
checkIntervalHigh: 2 # The interval for checking import, measured in seconds, is set to a high frequency for the import checker.
checkIntervalLow: 120 # The interval for checking import, measured in seconds, is set to a low frequency for the import checker.
maxImportFileNumPerReq: 1024 # The maximum number of files allowed per single import request.
maxImportJobNum: 1024 # Maximum number of import jobs that are executing or pending.
waitForIndex: true # Indicates whether the import operation waits for the completion of index building.
gracefulStopTimeout: 5 # seconds. force stop node without graceful stop
slot:
Expand Down
22 changes: 22 additions & 0 deletions internal/datacoord/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,28 @@ func WithCollectionID(collectionID int64) ImportJobFilter {
}
}

func WithJobStates(states ...internalpb.ImportJobState) ImportJobFilter {
return func(job ImportJob) bool {
for _, state := range states {
if job.GetState() == state {
return true
}
}
return false
}
}

func WithoutJobStates(states ...internalpb.ImportJobState) ImportJobFilter {
return func(job ImportJob) bool {
for _, state := range states {
if job.GetState() == state {
return false
}
}
return true
}
}

type UpdateJobAction func(job ImportJob)

func UpdateJobState(state internalpb.ImportJobState) UpdateJobAction {
Expand Down
11 changes: 11 additions & 0 deletions internal/datacoord/import_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ImportMeta interface {
UpdateJob(jobID int64, actions ...UpdateJobAction) error
GetJob(jobID int64) ImportJob
GetJobBy(filters ...ImportJobFilter) []ImportJob
CountJobBy(filters ...ImportJobFilter) int
RemoveJob(jobID int64) error

AddTask(task ImportTask) error
Expand Down Expand Up @@ -124,6 +125,10 @@ func (m *importMeta) GetJob(jobID int64) ImportJob {
func (m *importMeta) GetJobBy(filters ...ImportJobFilter) []ImportJob {
m.mu.RLock()
defer m.mu.RUnlock()
return m.getJobBy(filters...)
}

func (m *importMeta) getJobBy(filters ...ImportJobFilter) []ImportJob {
ret := make([]ImportJob, 0)
OUTER:
for _, job := range m.jobs {
Expand All @@ -137,6 +142,12 @@ OUTER:
return ret
}

func (m *importMeta) CountJobBy(filters ...ImportJobFilter) int {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.getJobBy(filters...))
}

func (m *importMeta) RemoveJob(jobID int64) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
126 changes: 79 additions & 47 deletions internal/datacoord/import_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package datacoord

import (
"fmt"
"math/rand"
"testing"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -70,7 +72,7 @@ func TestImportMeta_Restore(t *testing.T) {
assert.Error(t, err)
}

func TestImportMeta_ImportJob(t *testing.T) {
func TestImportMeta_Job(t *testing.T) {
catalog := mocks.NewDataCoordCatalog(t)
catalog.EXPECT().ListImportJobs().Return(nil, nil)
catalog.EXPECT().ListPreImportTasks().Return(nil, nil)
Expand All @@ -81,54 +83,66 @@ func TestImportMeta_ImportJob(t *testing.T) {
im, err := NewImportMeta(catalog)
assert.NoError(t, err)

var job ImportJob = &importJob{
ImportJob: &datapb.ImportJob{
JobID: 0,
CollectionID: 1,
PartitionIDs: []int64{2},
Vchannels: []string{"ch0"},
State: internalpb.ImportJobState_Pending,
},
jobIDs := []int64{1000, 2000, 3000}

for i, jobID := range jobIDs {
var job ImportJob = &importJob{
ImportJob: &datapb.ImportJob{
JobID: jobID,
CollectionID: rand.Int63(),
PartitionIDs: []int64{rand.Int63()},
Vchannels: []string{fmt.Sprintf("ch-%d", rand.Int63())},
State: internalpb.ImportJobState_Pending,
},
}
err = im.AddJob(job)
assert.NoError(t, err)
ret := im.GetJob(jobID)
assert.Equal(t, job, ret)
jobs := im.GetJobBy()
assert.Equal(t, i+1, len(jobs))

// Add again, test idempotency
err = im.AddJob(job)
assert.NoError(t, err)
ret = im.GetJob(jobID)
assert.Equal(t, job, ret)
jobs = im.GetJobBy()
assert.Equal(t, i+1, len(jobs))
}

err = im.AddJob(job)
assert.NoError(t, err)
jobs := im.GetJobBy()
assert.Equal(t, 1, len(jobs))
err = im.AddJob(job)
assert.Equal(t, 3, len(jobs))

err = im.UpdateJob(jobIDs[0], UpdateJobState(internalpb.ImportJobState_Completed))
assert.NoError(t, err)
jobs = im.GetJobBy()
assert.Equal(t, 1, len(jobs))
job0 := im.GetJob(jobIDs[0])
assert.NotNil(t, job0)
assert.Equal(t, internalpb.ImportJobState_Completed, job0.GetState())

assert.Nil(t, job.GetSchema())
err = im.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed))
err = im.UpdateJob(jobIDs[1], UpdateJobState(internalpb.ImportJobState_Importing))
assert.NoError(t, err)
job2 := im.GetJob(job.GetJobID())
assert.Equal(t, internalpb.ImportJobState_Completed, job2.GetState())
assert.Equal(t, job.GetJobID(), job2.GetJobID())
assert.Equal(t, job.GetCollectionID(), job2.GetCollectionID())
assert.Equal(t, job.GetPartitionIDs(), job2.GetPartitionIDs())
assert.Equal(t, job.GetVchannels(), job2.GetVchannels())

err = im.RemoveJob(job.GetJobID())
job1 := im.GetJob(jobIDs[1])
assert.NotNil(t, job1)
assert.Equal(t, internalpb.ImportJobState_Importing, job1.GetState())

jobs = im.GetJobBy(WithJobStates(internalpb.ImportJobState_Pending))
assert.Equal(t, 1, len(jobs))
jobs = im.GetJobBy(WithoutJobStates(internalpb.ImportJobState_Pending))
assert.Equal(t, 2, len(jobs))
count := im.CountJobBy()
assert.Equal(t, 3, count)
count = im.CountJobBy(WithJobStates(internalpb.ImportJobState_Pending))
assert.Equal(t, 1, count)
count = im.CountJobBy(WithoutJobStates(internalpb.ImportJobState_Pending))
assert.Equal(t, 2, count)

err = im.RemoveJob(jobIDs[0])
assert.NoError(t, err)
jobs = im.GetJobBy()
assert.Equal(t, 0, len(jobs))

// test failed
mockErr := errors.New("mock err")
catalog = mocks.NewDataCoordCatalog(t)
catalog.EXPECT().SaveImportJob(mock.Anything).Return(mockErr)
catalog.EXPECT().DropImportJob(mock.Anything).Return(mockErr)
im.(*importMeta).catalog = catalog

err = im.AddJob(job)
assert.Error(t, err)
im.(*importMeta).jobs[job.GetJobID()] = job
err = im.UpdateJob(job.GetJobID())
assert.Error(t, err)
err = im.RemoveJob(job.GetJobID())
assert.Error(t, err)
assert.Equal(t, 2, len(jobs))
count = im.CountJobBy()
assert.Equal(t, 2, count)
}

func TestImportMeta_ImportTask(t *testing.T) {
Expand Down Expand Up @@ -189,19 +203,37 @@ func TestImportMeta_ImportTask(t *testing.T) {
assert.NoError(t, err)
tasks = im.GetTaskBy()
assert.Equal(t, 1, len(tasks))
}

// test failed
func TestImportMeta_Task_Failed(t *testing.T) {
mockErr := errors.New("mock err")
catalog = mocks.NewDataCoordCatalog(t)
catalog := mocks.NewDataCoordCatalog(t)
catalog.EXPECT().ListImportJobs().Return(nil, nil)
catalog.EXPECT().ListPreImportTasks().Return(nil, nil)
catalog.EXPECT().ListImportTasks().Return(nil, nil)
catalog.EXPECT().SaveImportTask(mock.Anything).Return(mockErr)
catalog.EXPECT().DropImportTask(mock.Anything).Return(mockErr)

im, err := NewImportMeta(catalog)
assert.NoError(t, err)
im.(*importMeta).catalog = catalog

err = im.AddTask(task1)
task := &importTask{
ImportTaskV2: &datapb.ImportTaskV2{
JobID: 1,
TaskID: 2,
CollectionID: 3,
SegmentIDs: []int64{5, 6},
NodeID: 7,
State: datapb.ImportTaskStateV2_Pending,
},
}

err = im.AddTask(task)
assert.Error(t, err)
im.(*importMeta).tasks[task1.GetTaskID()] = task1
err = im.UpdateTask(task1.GetTaskID(), UpdateNodeID(9))
im.(*importMeta).tasks[task.GetTaskID()] = task
err = im.UpdateTask(task.GetTaskID(), UpdateNodeID(9))
assert.Error(t, err)
err = im.RemoveTask(task1.GetTaskID())
err = im.RemoveTask(task.GetTaskID())
assert.Error(t, err)
}
14 changes: 13 additions & 1 deletion internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -1665,12 +1665,24 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
return len(file.GetPaths()) > 0
})
if len(files) == 0 {
resp.Status = merr.Status(merr.WrapErrParameterInvalidMsg(fmt.Sprintf("no binlog to import, input=%s", in.GetFiles())))
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("no binlog to import, input=%s", in.GetFiles())))
return resp, nil
}
log.Info("list binlogs prefixes for import", zap.Any("binlog_prefixes", files))
}

// Check if the number of jobs exceeds the limit.
maxNum := paramtable.Get().DataCoordCfg.MaxImportJobNum.GetAsInt()
executingNum := s.importMeta.CountJobBy(WithoutJobStates(internalpb.ImportJobState_Completed, internalpb.ImportJobState_Failed))
if executingNum >= maxNum {
resp.Status = merr.Status(merr.WrapErrImportFailed(
fmt.Sprintf("The number of jobs has reached the limit, please try again later. " +
"If your request is set to only import a single file, " +
"please consider importing multiple files in one request for better efficiency.")))
return resp, nil
}

// Allocate file ids.
idStart, _, err := s.allocator.allocN(int64(len(files)) + 1)
if err != nil {
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("alloc id failed, err=%w", err)))
Expand Down
15 changes: 14 additions & 1 deletion internal/datacoord/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,12 @@ func TestImportV2(t *testing.T) {
assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed))

// alloc failed
catalog := mocks.NewDataCoordCatalog(t)
catalog.EXPECT().ListImportJobs().Return(nil, nil)
catalog.EXPECT().ListPreImportTasks().Return(nil, nil)
catalog.EXPECT().ListImportTasks().Return(nil, nil)
s.importMeta, err = NewImportMeta(catalog)
assert.NoError(t, err)
alloc := NewNMockAllocator(t)
alloc.EXPECT().allocN(mock.Anything).Return(0, 0, mockErr)
s.allocator = alloc
Expand All @@ -1629,7 +1635,7 @@ func TestImportV2(t *testing.T) {
s.allocator = alloc

// add job failed
catalog := mocks.NewDataCoordCatalog(t)
catalog = mocks.NewDataCoordCatalog(t)
catalog.EXPECT().ListImportJobs().Return(nil, nil)
catalog.EXPECT().ListPreImportTasks().Return(nil, nil)
catalog.EXPECT().ListImportTasks().Return(nil, nil)
Expand Down Expand Up @@ -1666,6 +1672,13 @@ func TestImportV2(t *testing.T) {
assert.Equal(t, int32(0), resp.GetStatus().GetCode())
jobs = s.importMeta.GetJobBy()
assert.Equal(t, 1, len(jobs))

// number of jobs reached the limit
Params.Save(paramtable.Get().DataCoordCfg.MaxImportJobNum.Key, "1")
resp, err = s.ImportV2(ctx, &internalpb.ImportRequestInternal{})
assert.NoError(t, err)
assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed))
Params.Reset(paramtable.Get().DataCoordCfg.MaxImportJobNum.Key)
})

t.Run("GetImportProgress", func(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -3227,6 +3227,7 @@ type dataCoordConfig struct {
ImportCheckIntervalHigh ParamItem `refreshable:"true"`
ImportCheckIntervalLow ParamItem `refreshable:"true"`
MaxFilesPerImportReq ParamItem `refreshable:"true"`
MaxImportJobNum ParamItem `refreshable:"true"`
WaitForIndex ParamItem `refreshable:"true"`

GracefulStopTimeout ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -3990,6 +3991,16 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.MaxFilesPerImportReq.Init(base.mgr)

p.MaxImportJobNum = ParamItem{
Key: "dataCoord.import.maxImportJobNum",
Version: "2.4.14",
Doc: "Maximum number of import jobs that are executing or pending.",
DefaultValue: "1024",
PanicIfEmpty: false,
Export: true,
}
p.MaxImportJobNum.Init(base.mgr)

p.WaitForIndex = ParamItem{
Key: "dataCoord.import.waitForIndex",
Version: "2.4.0",
Expand Down
1 change: 1 addition & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 2*time.Second, Params.ImportCheckIntervalHigh.GetAsDuration(time.Second))
assert.Equal(t, 120*time.Second, Params.ImportCheckIntervalLow.GetAsDuration(time.Second))
assert.Equal(t, 1024, Params.MaxFilesPerImportReq.GetAsInt())
assert.Equal(t, 1024, Params.MaxImportJobNum.GetAsInt())
assert.Equal(t, true, Params.WaitForIndex.GetAsBool())

params.Save("datacoord.gracefulStopTimeout", "100")
Expand Down

0 comments on commit 4e0f584

Please sign in to comment.