Skip to content

Commit

Permalink
Revert "enhance: Support db for bulkinsert (#37012) (#37017)" (#37415)
Browse files Browse the repository at this point in the history
This reverts commit d6adc62.

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Nov 4, 2024
1 parent 8370caa commit 399ae75
Show file tree
Hide file tree
Showing 9 changed files with 16 additions and 146 deletions.
7 changes: 0 additions & 7 deletions internal/datacoord/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,6 @@ func WithoutJobStates(states ...internalpb.ImportJobState) ImportJobFilter {
}
}

func WithDbID(DbID int64) ImportJobFilter {
return func(job ImportJob) bool {
return job.GetDbID() == DbID
}
}

type UpdateJobAction func(job ImportJob)

func UpdateJobState(state internalpb.ImportJobState) UpdateJobAction {
Expand Down Expand Up @@ -106,7 +100,6 @@ func UpdateJobCompleteTime(completeTime string) UpdateJobAction {

type ImportJob interface {
GetJobID() int64
GetDbID() int64
GetCollectionID() int64
GetCollectionName() string
GetPartitionIDs() []int64
Expand Down
22 changes: 5 additions & 17 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -1631,9 +1631,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
Status: merr.Success(),
}

log := log.With(
zap.Int64("dbID", in.GetDbID()),
zap.Int64("collection", in.GetCollectionID()),
log := log.With(zap.Int64("collection", in.GetCollectionID()),
zap.Int64s("partitions", in.GetPartitionIDs()),
zap.Strings("channels", in.GetChannelNames()))
log.Info("receive import request", zap.Any("files", in.GetFiles()), zap.Any("options", in.GetOptions()))
Expand Down Expand Up @@ -1692,7 +1690,6 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
job := &importJob{
ImportJob: &datapb.ImportJob{
JobID: idStart,
DbID: in.GetDbID(),
CollectionID: in.GetCollectionID(),
CollectionName: in.GetCollectionName(),
PartitionIDs: in.GetPartitionIDs(),
Expand All @@ -1719,7 +1716,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
}

func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImportProgressRequest) (*internalpb.GetImportProgressResponse, error) {
log := log.With(zap.String("jobID", in.GetJobID()), zap.Int64("dbID", in.GetDbID()))
log := log.With(zap.String("jobID", in.GetJobID()))
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
return &internalpb.GetImportProgressResponse{
Status: merr.Status(err),
Expand All @@ -1739,10 +1736,6 @@ func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImport
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("import job does not exist, jobID=%d", jobID)))
return resp, nil
}
if job.GetDbID() != 0 && job.GetDbID() != in.GetDbID() {
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("import job does not exist, jobID=%d, dbID=%d", jobID, in.GetDbID())))
return resp, nil
}
progress, state, importedRows, totalRows, reason := GetJobProgress(jobID, s.importMeta, s.meta)
resp.State = state
resp.Reason = reason
Expand Down Expand Up @@ -1773,14 +1766,11 @@ func (s *Server) ListImports(ctx context.Context, req *internalpb.ListImportsReq
}

var jobs []ImportJob
filters := make([]ImportJobFilter, 0)
if req.GetDbID() != 0 {
filters = append(filters, WithDbID(req.GetDbID()))
}
if req.GetCollectionID() != 0 {
filters = append(filters, WithCollectionID(req.GetCollectionID()))
jobs = s.importMeta.GetJobBy(WithCollectionID(req.GetCollectionID()))
} else {
jobs = s.importMeta.GetJobBy()
}
jobs = s.importMeta.GetJobBy(filters...)

for _, job := range jobs {
progress, state, _, _, reason := GetJobProgress(job.GetJobID(), s.importMeta, s.meta)
Expand All @@ -1790,7 +1780,5 @@ func (s *Server) ListImports(ctx context.Context, req *internalpb.ListImportsReq
resp.Progresses = append(resp.Progresses, progress)
resp.CollectionNames = append(resp.CollectionNames, job.GetCollectionName())
}
log.Info("ListImports done", zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64("dbID", req.GetDbID()), zap.Any("resp", resp))
return resp, nil
}
40 changes: 3 additions & 37 deletions internal/datacoord/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1711,10 +1711,9 @@ func TestImportV2(t *testing.T) {
assert.NoError(t, err)
assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed))

// db does not exist
// normal case
var job ImportJob = &importJob{
ImportJob: &datapb.ImportJob{
DbID: 1,
JobID: 0,
Schema: &schemapb.CollectionSchema{},
State: internalpb.ImportJobState_Failed,
Expand All @@ -1723,31 +1722,12 @@ func TestImportV2(t *testing.T) {
err = s.importMeta.AddJob(job)
assert.NoError(t, err)
resp, err = s.GetImportProgress(ctx, &internalpb.GetImportProgressRequest{
DbID: 2,
JobID: "0",
})
assert.NoError(t, err)
assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed))

// normal case
job = &importJob{
ImportJob: &datapb.ImportJob{
DbID: 1,
JobID: 0,
Schema: &schemapb.CollectionSchema{},
State: internalpb.ImportJobState_Pending,
},
}
err = s.importMeta.AddJob(job)
assert.NoError(t, err)
resp, err = s.GetImportProgress(ctx, &internalpb.GetImportProgressRequest{
DbID: 1,
JobID: "0",
})
assert.NoError(t, err)
assert.Equal(t, int32(0), resp.GetStatus().GetCode())
assert.Equal(t, int64(10), resp.GetProgress())
assert.Equal(t, internalpb.ImportJobState_Pending, resp.GetState())
assert.Equal(t, int64(0), resp.GetProgress())
assert.Equal(t, internalpb.ImportJobState_Failed, resp.GetState())
})

t.Run("ListImports", func(t *testing.T) {
Expand All @@ -1770,7 +1750,6 @@ func TestImportV2(t *testing.T) {
assert.NoError(t, err)
var job ImportJob = &importJob{
ImportJob: &datapb.ImportJob{
DbID: 2,
JobID: 0,
CollectionID: 1,
Schema: &schemapb.CollectionSchema{},
Expand All @@ -1787,20 +1766,7 @@ func TestImportV2(t *testing.T) {
}
err = s.importMeta.AddTask(task)
assert.NoError(t, err)
// db id not match
resp, err = s.ListImports(ctx, &internalpb.ListImportsRequestInternal{
DbID: 3,
CollectionID: 1,
})
assert.NoError(t, err)
assert.Equal(t, int32(0), resp.GetStatus().GetCode())
assert.Equal(t, 0, len(resp.GetJobIDs()))
assert.Equal(t, 0, len(resp.GetStates()))
assert.Equal(t, 0, len(resp.GetReasons()))
assert.Equal(t, 0, len(resp.GetProgresses()))
// db id match
resp, err = s.ListImports(ctx, &internalpb.ListImportsRequestInternal{
DbID: 2,
CollectionID: 1,
})
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions internal/distributed/proxy/httpserver/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ func (h *HandlersV2) RegisterRoutesToV2(router gin.IRouter) {

router.POST(ImportJobCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &OptionalCollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listImportJob)))))
router.POST(ImportJobCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &ImportReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.createImportJob)))))
router.POST(ImportJobCategory+GetProgressAction, timeoutMiddleware(wrapperPost(func() any { return &GetImportReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getImportJobProcess)))))
router.POST(ImportJobCategory+DescribeAction, timeoutMiddleware(wrapperPost(func() any { return &GetImportReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getImportJobProcess)))))
router.POST(ImportJobCategory+GetProgressAction, timeoutMiddleware(wrapperPost(func() any { return &JobIDReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getImportJobProcess)))))
router.POST(ImportJobCategory+DescribeAction, timeoutMiddleware(wrapperPost(func() any { return &JobIDReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getImportJobProcess)))))
}

type (
Expand Down
9 changes: 3 additions & 6 deletions internal/distributed/proxy/httpserver/request_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,11 @@ func (req *ImportReq) GetOptions() map[string]string {
return req.Options
}

type GetImportReq struct {
DbName string `json:"dbName"`
JobID string `json:"jobId" binding:"required"`
type JobIDReq struct {
JobID string `json:"jobId" binding:"required"`
}

func (req *GetImportReq) GetJobID() string { return req.JobID }

func (req *GetImportReq) GetDbName() string { return req.DbName }
func (req *JobIDReq) GetJobID() string { return req.JobID }

type QueryReqV2 struct {
DbName string `json:"dbName"`
Expand Down
1 change: 0 additions & 1 deletion internal/proto/internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ message ImportResponse {
message GetImportProgressRequest {
string db_name = 1;
string jobID = 2;
int64 dbID = 3;
}

message ImportTaskProgress {
Expand Down
30 changes: 1 addition & 29 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6158,7 +6158,6 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest)
return &internalpb.ImportResponse{Status: merr.Status(err)}, nil
}
log := log.Ctx(ctx).With(
zap.String("dbName", req.GetDbName()),
zap.String("collectionName", req.GetCollectionName()),
zap.String("partition name", req.GetPartitionName()),
zap.Any("files", req.GetFiles()),
Expand All @@ -6185,11 +6184,6 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest)
}
}()

dbInfo, err := globalMetaCache.GetDatabaseInfo(ctx, req.GetDbName())
if err != nil {
resp.Status = merr.Status(err)
return resp, nil
}
collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName())
if err != nil {
resp.Status = merr.Status(err)
Expand Down Expand Up @@ -6300,7 +6294,6 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest)
}
}
importRequest := &internalpb.ImportRequestInternal{
DbID: dbInfo.dbID,
CollectionID: collectionID,
CollectionName: req.GetCollectionName(),
PartitionIDs: partitionIDs,
Expand All @@ -6325,28 +6318,14 @@ func (node *Proxy) GetImportProgress(ctx context.Context, req *internalpb.GetImp
}, nil
}
log := log.Ctx(ctx).With(
zap.String("dbName", req.GetDbName()),
zap.String("jobID", req.GetJobID()),
)

resp := &internalpb.GetImportProgressResponse{
Status: merr.Success(),
}

method := "GetImportProgress"
tr := timerecord.NewTimeRecorder(method)
log.Info(rpcReceived(method))

// Fill db id for datacoord.
dbInfo, err := globalMetaCache.GetDatabaseInfo(ctx, req.GetDbName())
if err != nil {
resp.Status = merr.Status(err)
return resp, nil
}
req.DbID = dbInfo.dbID

nodeID := fmt.Sprint(paramtable.GetNodeID())
resp, err = node.dataCoord.GetImportProgress(ctx, req)
resp, err := node.dataCoord.GetImportProgress(ctx, req)
if resp.GetStatus().GetCode() != 0 || err != nil {
log.Warn("get import progress failed", zap.String("reason", resp.GetStatus().GetReason()), zap.Error(err))
metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel, req.GetDbName(), "").Inc()
Expand Down Expand Up @@ -6383,11 +6362,6 @@ func (node *Proxy) ListImports(ctx context.Context, req *internalpb.ListImportsR
err error
collectionID UniqueID
)
dbInfo, err := globalMetaCache.GetDatabaseInfo(ctx, req.GetDbName())
if err != nil {
resp.Status = merr.Status(err)
return resp, nil
}
if req.GetCollectionName() != "" {
collectionID, err = globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName())
if err != nil {
Expand All @@ -6396,9 +6370,7 @@ func (node *Proxy) ListImports(ctx context.Context, req *internalpb.ListImportsR
return resp, nil
}
}

resp, err = node.dataCoord.ListImports(ctx, &internalpb.ListImportsRequestInternal{
DbID: dbInfo.dbID,
CollectionID: collectionID,
})
if resp.GetStatus().GetCode() != 0 || err != nil {
Expand Down
40 changes: 2 additions & 38 deletions internal/proxy/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1596,17 +1596,8 @@ func TestProxy_ImportV2(t *testing.T) {
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())
node.UpdateStateCode(commonpb.StateCode_Healthy)

// no such database
mc := NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(nil, mockErr)
globalMetaCache = mc
rsp, err = node.ImportV2(ctx, &internalpb.ImportRequest{CollectionName: "aaa"})
assert.NoError(t, err)
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())

// no such collection
mc = NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
mc := NewMockCache(t)
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, mockErr)
globalMetaCache = mc
rsp, err = node.ImportV2(ctx, &internalpb.ImportRequest{CollectionName: "aaa"})
Expand All @@ -1615,7 +1606,6 @@ func TestProxy_ImportV2(t *testing.T) {

// get schema failed
mc = NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(nil, mockErr)
globalMetaCache = mc
Expand All @@ -1625,7 +1615,6 @@ func TestProxy_ImportV2(t *testing.T) {

// get channel failed
mc = NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
CollectionSchema: &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
Expand All @@ -1650,7 +1639,6 @@ func TestProxy_ImportV2(t *testing.T) {

// get partitions failed
mc = NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
CollectionSchema: &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
Expand All @@ -1665,7 +1653,6 @@ func TestProxy_ImportV2(t *testing.T) {

// get partitionID failed
mc = NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
CollectionSchema: &schemapb.CollectionSchema{},
Expand All @@ -1678,7 +1665,6 @@ func TestProxy_ImportV2(t *testing.T) {

// no file
mc = NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
CollectionSchema: &schemapb.CollectionSchema{},
Expand Down Expand Up @@ -1725,18 +1711,7 @@ func TestProxy_ImportV2(t *testing.T) {
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())
node.UpdateStateCode(commonpb.StateCode_Healthy)

// no such database
mc := NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(nil, mockErr)
globalMetaCache = mc
rsp, err = node.GetImportProgress(ctx, &internalpb.GetImportProgressRequest{})
assert.NoError(t, err)
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())

// normal case
mc = NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
globalMetaCache = mc
dataCoord := mocks.NewMockDataCoordClient(t)
dataCoord.EXPECT().GetImportProgress(mock.Anything, mock.Anything).Return(nil, nil)
node.dataCoord = dataCoord
Expand All @@ -1754,19 +1729,8 @@ func TestProxy_ImportV2(t *testing.T) {
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())
node.UpdateStateCode(commonpb.StateCode_Healthy)

// no such database
mc := NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(nil, mockErr)
globalMetaCache = mc
rsp, err = node.ListImports(ctx, &internalpb.ListImportsRequest{
CollectionName: "col",
})
assert.NoError(t, err)
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())

// normal case
mc = NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
mc := NewMockCache(t)
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
globalMetaCache = mc
dataCoord := mocks.NewMockDataCoordClient(t)
Expand Down
Loading

0 comments on commit 399ae75

Please sign in to comment.