Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: [2.4] Batch forward delete when using DirectForward (#37076) #37107

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions internal/distributed/querynode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,16 @@ func (c *Client) Delete(ctx context.Context, req *querypb.DeleteRequest, _ ...gr
return client.Delete(ctx, req)
})
}

// DeleteBatch is the API to apply same delete data into multiple segments.
// it's basically same as `Delete` but cost less memory pressure.
func (c *Client) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest, _ ...grpc.CallOption) (*querypb.DeleteBatchResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(c.nodeID),
)
return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*querypb.DeleteBatchResponse, error) {
return client.DeleteBatch(ctx, req)
})
}
3 changes: 3 additions & 0 deletions internal/distributed/querynode/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ func Test_NewClient(t *testing.T) {
r20, err := client.SearchSegments(ctx, nil)
retCheck(retNotNil, r20, err)

r21, err := client.DeleteBatch(ctx, nil)
retCheck(retNotNil, r21, err)

// stream rpc
client, err := client.QueryStream(ctx, nil)
retCheck(retNotNil, client, err)
Expand Down
6 changes: 6 additions & 0 deletions internal/distributed/querynode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,9 @@ func (s *Server) SyncDistribution(ctx context.Context, req *querypb.SyncDistribu
func (s *Server) Delete(ctx context.Context, req *querypb.DeleteRequest) (*commonpb.Status, error) {
return s.querynode.Delete(ctx, req)
}

// DeleteBatch is the API to apply same delete data into multiple segments.
// it's basically same as `Delete` but cost less memory pressure.
func (s *Server) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
return s.querynode.DeleteBatch(ctx, req)
}
10 changes: 10 additions & 0 deletions internal/distributed/querynode/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

Expand Down Expand Up @@ -270,6 +271,15 @@ func Test_NewServer(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})

t.Run("DeleteBatch", func(t *testing.T) {
mockQN.EXPECT().DeleteBatch(mock.Anything, mock.Anything).Return(&querypb.DeleteBatchResponse{
Status: merr.Success(),
}, nil)

resp, err := server.DeleteBatch(ctx, &querypb.DeleteBatchRequest{})
assert.NoError(t, merr.CheckRPCCall(resp, err))
})

err = server.Stop()
assert.NoError(t, err)
}
Expand Down
59 changes: 59 additions & 0 deletions internal/mocks/mock_querynode.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 74 additions & 0 deletions internal/mocks/mock_querynode_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ service QueryNode {
}
rpc Delete(DeleteRequest) returns (common.Status) {
}
// DeleteBatch is the API to apply same delete data into multiple segments.
// it's basically same as `Delete` but cost less memory pressure.
rpc DeleteBatch(DeleteBatchRequest) returns (DeleteBatchResponse) {
}
}

// --------------------QueryCoord grpc request and response proto------------------
Expand Down Expand Up @@ -772,6 +776,25 @@ message DeleteRequest {
DataScope scope = 8;
}

message DeleteBatchRequest {
common.MsgBase base = 1;
int64 collection_id = 2;
int64 partition_id = 3;
string vchannel_name = 4;
repeated int64 segment_ids = 5;
schema.IDs primary_keys = 6;
repeated uint64 timestamps = 7;
DataScope scope = 8;
}

// DeleteBatchResponse returns failed/missing segment ids
// cannot just using common.Status to handle partial failure logic
message DeleteBatchResponse {
common.Status status = 1;
repeated int64 failed_ids = 2;
repeated int64 missing_ids = 3;
}

message ActivateCheckerRequest {
common.MsgBase base = 1;
int32 checkerID = 2;
Expand Down
59 changes: 59 additions & 0 deletions internal/querycoordv2/mocks/mock_querynode.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 59 additions & 0 deletions internal/querynodev2/cluster/mock_worker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading