Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix collection leak in querynode #36927

Closed
wants to merge 9 commits into from
6 changes: 3 additions & 3 deletions internal/querycoordv2/job/job_release.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ func (job *ReleaseCollectionJob) Execute() error {
return errors.Wrap(err, msg)
}

job.targetObserver.ReleaseCollection(req.GetCollectionID())
waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID())
cleanQueryNodesCollectionMetaCache(job.ctx, job.meta, job.cluster, req.GetCollectionID())
err = job.meta.ReplicaManager.RemoveCollection(req.GetCollectionID())
if err != nil {
msg := "failed to remove replicas"
log.Warn(msg, zap.Error(err))
}

job.targetObserver.ReleaseCollection(req.GetCollectionID())
waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID())
metrics.QueryCoordNumCollections.WithLabelValues().Dec()
metrics.QueryCoordNumPartitions.WithLabelValues().Sub(float64(len(toRelease)))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.TotalLabel).Inc()
Expand Down
11 changes: 4 additions & 7 deletions internal/querycoordv2/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

const (
defaultVecFieldID = 1
defaultIndexID = 1
)

type JobSuite struct {
suite.Suite

Expand Down Expand Up @@ -141,7 +136,10 @@ func (suite *JobSuite) SetupSuite() {
Return(merr.Success(), nil)
suite.cluster.EXPECT().
ReleasePartitions(mock.Anything, mock.Anything, mock.Anything).
Return(merr.Success(), nil).Maybe()
Return(merr.Success(), nil)
suite.cluster.EXPECT().
ReleaseCollection(mock.Anything, mock.Anything, mock.Anything).
Return(merr.Success(), nil)

suite.proxyManager = proxyutil.NewMockProxyClientManager(suite.T())
suite.proxyManager.EXPECT().InvalidateCollectionMetaCache(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
Expand Down Expand Up @@ -849,7 +847,6 @@ func (suite *JobSuite) TestReleaseCollection() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,

suite.checkerController,
)
suite.scheduler.Add(job)
Expand Down
24 changes: 24 additions & 0 deletions internal/querycoordv2/job/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,27 @@
}
}
}

func cleanQueryNodesCollectionMetaCache(ctx context.Context,
meta *meta.Meta,
cluster session.Cluster,
collection int64,
) {
log := log.Ctx(ctx).With(zap.Int64("collection", collection))
replicas := meta.ReplicaManager.GetByCollection(collection)
releaseReq := &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleaseCollection,
},
CollectionID: collection,
}
for _, replica := range replicas {
for _, node := range replica.GetNodes() {
status, err := cluster.ReleaseCollection(ctx, node, releaseReq)
err = merr.CheckRPCCall(status, err)
if err != nil {
log.Warn("failed to release collection", zap.Int64("node", node), zap.Error(err))
}

Check warning on line 170 in internal/querycoordv2/job/utils.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/job/utils.go#L169-L170

Added lines #L169 - L170 were not covered by tests
}
}
}
10 changes: 0 additions & 10 deletions internal/querycoordv2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var (
// ErrRemoveNodeFromRGFailed = errors.New("failed to remove node from resource group")
// ErrTransferNodeFailed = errors.New("failed to transfer node between resource group")
// ErrTransferReplicaFailed = errors.New("failed to transfer replica between resource group")
// ErrListResourceGroupsFailed = errors.New("failed to list resource group")
// ErrDescribeResourceGroupFailed = errors.New("failed to describe resource group")
// ErrLoadUseWrongRG = errors.New("load operation should use collection's resource group")
// ErrLoadWithDefaultRG = errors.New("load operation can't use default resource group and other resource group together")
)

func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
log.Ctx(ctx).Info("show collections request received", zap.Int64s("collections", req.GetCollectionIDs()))

Expand Down
2 changes: 2 additions & 0 deletions internal/querycoordv2/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,8 @@ func (suite *ServiceSuite) TestReleaseCollection() {

suite.cluster.EXPECT().ReleasePartitions(mock.Anything, mock.Anything, mock.Anything).
Return(merr.Success(), nil)
suite.cluster.EXPECT().ReleaseCollection(mock.Anything, mock.Anything, mock.Anything).
Return(merr.Success(), nil)

// Test release all collections
for _, collection := range suite.collections {
Expand Down
15 changes: 15 additions & 0 deletions internal/querycoordv2/session/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
ReleaseSegments(ctx context.Context, nodeID int64, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error)
LoadPartitions(ctx context.Context, nodeID int64, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error)
ReleasePartitions(ctx context.Context, nodeID int64, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error)
ReleaseCollection(ctx context.Context, nodeID int64, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error)
GetDataDistribution(ctx context.Context, nodeID int64, req *querypb.GetDataDistributionRequest) (*querypb.GetDataDistributionResponse, error)
GetMetrics(ctx context.Context, nodeID int64, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
SyncDistribution(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) (*commonpb.Status, error)
Expand Down Expand Up @@ -197,6 +198,20 @@
return status, err
}

func (c *QueryCluster) ReleaseCollection(ctx context.Context, nodeID int64, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
var status *commonpb.Status
var err error
err1 := c.send(ctx, nodeID, func(cli types.QueryNodeClient) {
req := proto.Clone(req).(*querypb.ReleaseCollectionRequest)
req.Base.TargetID = nodeID
status, err = cli.ReleaseCollection(ctx, req)
})
if err1 != nil {
return nil, err1
}

Check warning on line 211 in internal/querycoordv2/session/cluster.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/session/cluster.go#L210-L211

Added lines #L210 - L211 were not covered by tests
return status, err
}

func (c *QueryCluster) GetDataDistribution(ctx context.Context, nodeID int64, req *querypb.GetDataDistributionRequest) (*querypb.GetDataDistributionResponse, error) {
var resp *querypb.GetDataDistributionResponse
var err error
Expand Down
24 changes: 24 additions & 0 deletions internal/querycoordv2/session/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ func (suite *ClusterTestSuite) createDefaultMockServer() querypb.QueryNodeServer
mock.Anything,
mock.AnythingOfType("*querypb.ReleasePartitionsRequest"),
).Maybe().Return(succStatus, nil)
svr.EXPECT().ReleaseCollection(
mock.Anything,
mock.AnythingOfType("*querypb.ReleaseCollectionRequest"),
).Maybe().Return(succStatus, nil)
svr.EXPECT().GetDataDistribution(
mock.Anything,
mock.AnythingOfType("*querypb.GetDataDistributionRequest"),
Expand Down Expand Up @@ -193,6 +197,10 @@ func (suite *ClusterTestSuite) createFailedMockServer() querypb.QueryNodeServer
mock.Anything,
mock.AnythingOfType("*querypb.ReleasePartitionsRequest"),
).Maybe().Return(failStatus, nil)
svr.EXPECT().ReleaseCollection(
mock.Anything,
mock.AnythingOfType("*querypb.ReleaseCollectionRequest"),
).Maybe().Return(failStatus, nil)
svr.EXPECT().GetDataDistribution(
mock.Anything,
mock.AnythingOfType("*querypb.GetDataDistributionRequest"),
Expand Down Expand Up @@ -318,6 +326,22 @@ func (suite *ClusterTestSuite) TestLoadAndReleasePartitions() {
suite.Equal("unexpected error", status.GetReason())
}

func (suite *ClusterTestSuite) TestReleaseCollection() {
ctx := context.TODO()
status, err := suite.cluster.ReleaseCollection(ctx, 0, &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{},
})
suite.NoError(err)
merr.Ok(status)

status, err = suite.cluster.ReleaseCollection(ctx, 1, &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{},
})
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
suite.Equal("unexpected error", status.GetReason())
}

func (suite *ClusterTestSuite) TestGetDataDistribution() {
ctx := context.TODO()
resp, err := suite.cluster.GetDataDistribution(ctx, 0, &querypb.GetDataDistributionRequest{
Expand Down
56 changes: 56 additions & 0 deletions internal/querycoordv2/session/mock_cluster.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *DelegatorDataSuite) TearDownSuite() {
}

func (s *DelegatorDataSuite) genNormalCollection() {
s.manager.Collection.PutOrRef(s.collectionID, &schemapb.CollectionSchema{
s.manager.Collection.Put(s.collectionID, &schemapb.CollectionSchema{
Name: "TestCollection",
Fields: []*schemapb.FieldSchema{
{
Expand Down Expand Up @@ -158,7 +158,7 @@ func (s *DelegatorDataSuite) genNormalCollection() {
}

func (s *DelegatorDataSuite) genCollectionWithFunction() {
s.manager.Collection.PutOrRef(s.collectionID, &schemapb.CollectionSchema{
s.manager.Collection.Put(s.collectionID, &schemapb.CollectionSchema{
Name: "TestCollection",
Fields: []*schemapb.FieldSchema{
{
Expand Down
6 changes: 3 additions & 3 deletions internal/querynodev2/delegator/delegator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (s *DelegatorSuite) SetupTest() {
}, nil)

// init schema
s.manager.Collection.PutOrRef(s.collectionID, &schemapb.CollectionSchema{
s.manager.Collection.Put(s.collectionID, &schemapb.CollectionSchema{
Name: "TestCollection",
Fields: []*schemapb.FieldSchema{
{
Expand Down Expand Up @@ -181,7 +181,7 @@ func (s *DelegatorSuite) TearDownTest() {
func (s *DelegatorSuite) TestCreateDelegatorWithFunction() {
s.Run("init function failed", func() {
manager := segments.NewManager()
manager.Collection.PutOrRef(s.collectionID, &schemapb.CollectionSchema{
manager.Collection.Put(s.collectionID, &schemapb.CollectionSchema{
Name: "TestCollection",
Fields: []*schemapb.FieldSchema{
{
Expand Down Expand Up @@ -214,7 +214,7 @@ func (s *DelegatorSuite) TestCreateDelegatorWithFunction() {

s.Run("init function failed", func() {
manager := segments.NewManager()
manager.Collection.PutOrRef(s.collectionID, &schemapb.CollectionSchema{
manager.Collection.Put(s.collectionID, &schemapb.CollectionSchema{
Name: "TestCollection",
Fields: []*schemapb.FieldSchema{
{
Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/delegator/delta_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *StreamingForwardSuite) SetupTest() {
}, nil)

// init schema
s.manager.Collection.PutOrRef(s.collectionID, &schemapb.CollectionSchema{
s.manager.Collection.Put(s.collectionID, &schemapb.CollectionSchema{
Name: "TestCollection",
Fields: []*schemapb.FieldSchema{
{
Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/local_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (suite *LocalWorkerTestSuite) BeforeTest(suiteName, testName string) {
LoadType: querypb.LoadType_LoadCollection,
CollectionID: suite.collectionID,
}
suite.node.manager.Collection.PutOrRef(suite.collectionID, collection.Schema(), suite.indexMeta, loadMata)
suite.node.manager.Collection.Put(suite.collectionID, collection.Schema(), suite.indexMeta, loadMata)
suite.worker = NewLocalWorker(suite.node)
}

Expand Down
Loading
Loading