diff --git a/go.mod b/go.mod index 766a359265af5..6854b54dc3d95 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.9 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7 + github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104061416-ec2484585833 github.com/minio/minio-go/v7 v7.0.73 github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/prometheus/client_golang v1.14.0 diff --git a/go.sum b/go.sum index 66de8bb5875c1..0c2c872203e5a 100644 --- a/go.sum +++ b/go.sum @@ -627,10 +627,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34 h1:Fwxpg98128gfWRbQ1A3PMP9o2IfYZk7RSEy8rcoCWDA= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7 h1:HwAitQk+V59QdYUwwVVYHTujd4QZrebg2Cc2hmcjhAg= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104061416-ec2484585833 h1:Pkz4jXqYQVY8nbZkY0+G4K5toZJaT4LxUrrYwGxwi3I= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104061416-ec2484585833/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/internal/.mockery.yaml b/internal/.mockery.yaml index 9aabe096bcbbc..ea0cf349562f9 100644 --- a/internal/.mockery.yaml +++ b/internal/.mockery.yaml @@ -33,6 +33,7 @@ packages: github.com/milvus-io/milvus/internal/streamingnode/server/flusher: interfaces: Flusher: + FlushMsgHandler: github.com/milvus-io/milvus/internal/streamingnode/server/wal: interfaces: OpenerBuilder: @@ -44,11 +45,11 @@ packages: Interceptor: InterceptorWithReady: InterceptorBuilder: - github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector: - interfaces: + github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector: + interfaces: SealOperator: - github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector: - interfaces: + github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector: + interfaces: TimeTickSyncOperator: google.golang.org/grpc: interfaces: diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index dec075915c11d..3900884a1f1af 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1013,6 +1013,24 @@ func UpdateIsImporting(segmentID int64, isImporting bool) UpdateOperator { } } +// UpdateAsDroppedIfEmptyWhenFlushing updates segment state to Dropped if segment is empty and in Flushing state +// It's used to make a empty flushing segment to be dropped directly. +func UpdateAsDroppedIfEmptyWhenFlushing(segmentID int64) UpdateOperator { + return func(modPack *updateSegmentPack) bool { + segment := modPack.Get(segmentID) + if segment == nil { + log.Warn("meta update: update as dropped if empty when flusing failed - segment not found", + zap.Int64("segmentID", segmentID)) + return false + } + if segment.GetNumOfRows() == 0 && segment.GetState() == commonpb.SegmentState_Flushing { + log.Info("meta update: update as dropped if empty when flusing", zap.Int64("segmentID", segmentID)) + updateSegStateAndPrepareMetrics(segment, commonpb.SegmentState_Dropped, modPack.metricMutation) + } + return true + } +} + // updateSegmentsInfo update segment infos // will exec all operators, and update all changed segments func (m *meta) UpdateSegmentsInfo(operators ...UpdateOperator) error { diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 8492173123b7b..6043613a6c329 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -774,6 +774,20 @@ func TestUpdateSegmentsInfo(t *testing.T) { UpdateIsImporting(1, true), ) assert.NoError(t, err) + + err = meta.UpdateSegmentsInfo(UpdateAsDroppedIfEmptyWhenFlushing(1)) + assert.NoError(t, err) + }) + + t.Run("update empty segment into flush", func(t *testing.T) { + meta, err := newMemoryMeta() + assert.NoError(t, err) + meta.AddSegment(context.Background(), &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing}}) + err = meta.UpdateSegmentsInfo( + UpdateStatusOperator(1, commonpb.SegmentState_Flushing), + UpdateAsDroppedIfEmptyWhenFlushing(1), + ) + assert.NoError(t, err) }) t.Run("update checkpoints and start position of non existed segment", func(t *testing.T) { diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 2fd524fddde91..fdd7b9d7344a0 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -553,6 +553,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath AddBinlogsOperator(req.GetSegmentID(), req.GetField2BinlogPaths(), req.GetField2StatslogPaths(), req.GetDeltalogs(), req.GetField2Bm25LogPaths()), UpdateStartPosition(req.GetStartPositions()), UpdateCheckPointOperator(req.GetSegmentID(), req.GetCheckPoints()), + UpdateAsDroppedIfEmptyWhenFlushing(req.GetSegmentID()), ) // Update segment info in memory and meta. diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 23c620181c58c..c5beef91f57f4 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -361,6 +361,7 @@ func (s *ServerSuite) TestSaveBinlogPath_NormalCase() { segments := map[int64]int64{ 0: 0, 1: 0, + 2: 0, } for segID, collID := range segments { info := &datapb.SegmentInfo{ @@ -445,6 +446,24 @@ func (s *ServerSuite) TestSaveBinlogPath_NormalCase() { s.EqualValues(segment.DmlPosition.ChannelName, "ch1") s.EqualValues(segment.DmlPosition.MsgID, []byte{1, 2, 3}) s.EqualValues(segment.NumOfRows, 10) + + resp, err = s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{ + Base: &commonpb.MsgBase{ + Timestamp: uint64(time.Now().Unix()), + }, + SegmentID: 2, + CollectionID: 0, + Channel: "ch1", + Field2BinlogPaths: []*datapb.FieldBinlog{}, + Field2StatslogPaths: []*datapb.FieldBinlog{}, + CheckPoints: []*datapb.CheckPoint{}, + Flushed: true, + }) + s.NoError(err) + s.EqualValues(resp.ErrorCode, commonpb.ErrorCode_Success) + segment = s.testServer.meta.GetSegment(2) + s.NotNil(segment) + s.Equal(commonpb.SegmentState_Dropped, segment.GetState()) } func (s *ServerSuite) TestFlush_NormalCase() { diff --git a/internal/flushcommon/io/mock_binlogio.go b/internal/flushcommon/io/mock_binlogio.go index b0132f16299a7..6aa372a3ed9a0 100644 --- a/internal/flushcommon/io/mock_binlogio.go +++ b/internal/flushcommon/io/mock_binlogio.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package io @@ -25,6 +25,10 @@ func (_m *MockBinlogIO) EXPECT() *MockBinlogIO_Expecter { func (_m *MockBinlogIO) Download(ctx context.Context, paths []string) ([][]byte, error) { ret := _m.Called(ctx, paths) + if len(ret) == 0 { + panic("no return value specified for Download") + } + var r0 [][]byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, []string) ([][]byte, error)); ok { @@ -80,6 +84,10 @@ func (_c *MockBinlogIO_Download_Call) RunAndReturn(run func(context.Context, []s func (_m *MockBinlogIO) Upload(ctx context.Context, kvs map[string][]byte) error { ret := _m.Called(ctx, kvs) + if len(ret) == 0 { + panic("no return value specified for Upload") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, map[string][]byte) error); ok { r0 = rf(ctx, kvs) diff --git a/internal/flushcommon/metacache/actions.go b/internal/flushcommon/metacache/actions.go index 98d9f00efacf5..302f1c14c3f50 100644 --- a/internal/flushcommon/metacache/actions.go +++ b/internal/flushcommon/metacache/actions.go @@ -129,6 +129,14 @@ func WithNoSyncingTask() SegmentFilter { type SegmentAction func(info *SegmentInfo) +func SegmentActions(actions ...SegmentAction) SegmentAction { + return func(info *SegmentInfo) { + for _, act := range actions { + act(info) + } + } +} + func UpdateState(state commonpb.SegmentState) SegmentAction { return func(info *SegmentInfo) { info.state = state @@ -147,6 +155,14 @@ func UpdateNumOfRows(numOfRows int64) SegmentAction { } } +func SetStartPositionIfNil(startPos *msgpb.MsgPosition) SegmentAction { + return func(info *SegmentInfo) { + if info.startPosition == nil { + info.startPosition = startPos + } + } +} + func UpdateBufferedRows(bufferedRows int64) SegmentAction { return func(info *SegmentInfo) { info.bufferRows = bufferedRows diff --git a/internal/flushcommon/metacache/actions_test.go b/internal/flushcommon/metacache/actions_test.go index 852ec09439f61..0513a9b251052 100644 --- a/internal/flushcommon/metacache/actions_test.go +++ b/internal/flushcommon/metacache/actions_test.go @@ -89,6 +89,13 @@ func (s *SegmentActionSuite) TestActions() { action = UpdateNumOfRows(numOfRows) action(info) s.Equal(numOfRows, info.NumOfRows()) + + info = &SegmentInfo{} + actions := SegmentActions(UpdateState(state), UpdateCheckpoint(cp), UpdateNumOfRows(numOfRows)) + actions(info) + s.Equal(state, info.State()) + s.Equal(cp, info.Checkpoint()) + s.Equal(numOfRows, info.NumOfRows()) } func (s *SegmentActionSuite) TestMergeActions() { diff --git a/internal/flushcommon/pipeline/flow_graph_dd_node.go b/internal/flushcommon/pipeline/flow_graph_dd_node.go index 14ddf9f7ebea0..2369ec28b3396 100644 --- a/internal/flushcommon/pipeline/flow_graph_dd_node.go +++ b/internal/flushcommon/pipeline/flow_graph_dd_node.go @@ -236,6 +236,19 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel). Add(float64(dmsg.GetNumRows())) fgMsg.DeleteMessages = append(fgMsg.DeleteMessages, dmsg) + case commonpb.MsgType_CreateSegment: + createSegment := msg.(*adaptor.CreateSegmentMessageBody) + logger := log.With( + zap.String("vchannel", ddn.Name()), + zap.Int32("msgType", int32(msg.Type())), + zap.Uint64("timetick", createSegment.CreateSegmentMessage.TimeTick()), + ) + logger.Info("receive create segment message") + if err := ddn.flushMsgHandler.HandleCreateSegment(context.Background(), ddn.vChannelName, createSegment.CreateSegmentMessage); err != nil { + logger.Warn("handle create segment message failed", zap.Error(err)) + } else { + logger.Info("handle create segment message success") + } case commonpb.MsgType_FlushSegment: flushMsg := msg.(*adaptor.FlushMessageBody) logger := log.With( diff --git a/internal/flushcommon/pipeline/flow_graph_dd_node_test.go b/internal/flushcommon/pipeline/flow_graph_dd_node_test.go index cda9b1c701a24..e3eadfb74b018 100644 --- a/internal/flushcommon/pipeline/flow_graph_dd_node_test.go +++ b/internal/flushcommon/pipeline/flow_graph_dd_node_test.go @@ -22,14 +22,19 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/datanode/compaction" + "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_flusher" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/util/flowgraph" + "github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message" "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -92,6 +97,56 @@ func TestFlowGraph_DDNode_newDDNode(t *testing.T) { } } +func TestFlowGraph_DDNode_OperateFlush(t *testing.T) { + h := mock_flusher.NewMockFlushMsgHandler(t) + h.EXPECT().HandleCreateSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil) + h.EXPECT().HandleFlush(mock.Anything, mock.Anything).Return(nil) + h.EXPECT().HandleManualFlush(mock.Anything, mock.Anything).Return(nil) + + ddn := ddNode{ + ctx: context.Background(), + collectionID: 1, + vChannelName: "v1", + flushMsgHandler: h, + } + + mutableMsg, err := message.NewCreateSegmentMessageBuilderV2(). + WithHeader(&message.CreateSegmentMessageHeader{}). + WithBody(&message.CreateSegmentMessageBody{}). + WithVChannel("v1"). + BuildMutable() + assert.NoError(t, err) + immutableCreateSegmentMsg := mutableMsg.WithTimeTick(1).IntoImmutableMessage(mock_message.NewMockMessageID(t)) + + flushMsg, err := message.NewFlushMessageBuilderV2(). + WithHeader(&message.FlushMessageHeader{}). + WithBody(&message.FlushMessageBody{}). + WithVChannel("v1"). + BuildMutable() + assert.NoError(t, err) + immutableFlushMsg := flushMsg.WithTimeTick(2).IntoImmutableMessage(mock_message.NewMockMessageID(t)) + + manualFlushMsg, err := message.NewManualFlushMessageBuilderV2(). + WithHeader(&message.ManualFlushMessageHeader{}). + WithBody(&message.ManualFlushMessageBody{}). + WithVChannel("v1"). + BuildMutable() + assert.NoError(t, err) + immutableManualFlushMsg := manualFlushMsg.WithTimeTick(3).IntoImmutableMessage(mock_message.NewMockMessageID(t)) + + msg1, err := adaptor.NewCreateSegmentMessageBody(immutableCreateSegmentMsg) + assert.NoError(t, err) + msg2, err := adaptor.NewFlushMessageBody(immutableFlushMsg) + assert.NoError(t, err) + msg3, err := adaptor.NewManualFlushMessageBody(immutableManualFlushMsg) + assert.NoError(t, err) + + tsMessages := []msgstream.TsMsg{msg1, msg2, msg3} + var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil) + outputMsgs := ddn.Operate([]Msg{msgStreamMsg}) + assert.NotNil(t, outputMsgs) +} + func TestFlowGraph_DDNode_Operate(t *testing.T) { t.Run("Test DDNode Operate DropCollection Msg", func(t *testing.T) { // invalid inputs diff --git a/internal/flushcommon/pipeline/mock_fgmanager.go b/internal/flushcommon/pipeline/mock_fgmanager.go index 6945e21ff271f..2dafe7d0a450b 100644 --- a/internal/flushcommon/pipeline/mock_fgmanager.go +++ b/internal/flushcommon/pipeline/mock_fgmanager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package pipeline @@ -118,6 +118,10 @@ func (_c *MockFlowgraphManager_Close_Call) RunAndReturn(run func()) *MockFlowgra func (_m *MockFlowgraphManager) GetCollectionIDs() []int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetCollectionIDs") + } + var r0 []int64 if rf, ok := ret.Get(0).(func() []int64); ok { r0 = rf() @@ -161,6 +165,10 @@ func (_c *MockFlowgraphManager_GetCollectionIDs_Call) RunAndReturn(run func() [] func (_m *MockFlowgraphManager) GetFlowgraphCount() int { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetFlowgraphCount") + } + var r0 int if rf, ok := ret.Get(0).(func() int); ok { r0 = rf() @@ -202,6 +210,10 @@ func (_c *MockFlowgraphManager_GetFlowgraphCount_Call) RunAndReturn(run func() i func (_m *MockFlowgraphManager) GetFlowgraphService(channel string) (*DataSyncService, bool) { ret := _m.Called(channel) + if len(ret) == 0 { + panic("no return value specified for GetFlowgraphService") + } + var r0 *DataSyncService var r1 bool if rf, ok := ret.Get(0).(func(string) (*DataSyncService, bool)); ok { @@ -256,6 +268,10 @@ func (_c *MockFlowgraphManager_GetFlowgraphService_Call) RunAndReturn(run func(s func (_m *MockFlowgraphManager) HasFlowgraph(channel string) bool { ret := _m.Called(channel) + if len(ret) == 0 { + panic("no return value specified for HasFlowgraph") + } + var r0 bool if rf, ok := ret.Get(0).(func(string) bool); ok { r0 = rf(channel) @@ -298,6 +314,10 @@ func (_c *MockFlowgraphManager_HasFlowgraph_Call) RunAndReturn(run func(string) func (_m *MockFlowgraphManager) HasFlowgraphWithOpID(channel string, opID int64) bool { ret := _m.Called(channel, opID) + if len(ret) == 0 { + panic("no return value specified for HasFlowgraphWithOpID") + } + var r0 bool if rf, ok := ret.Get(0).(func(string, int64) bool); ok { r0 = rf(channel, opID) diff --git a/internal/flushcommon/syncmgr/mock_meta_writer.go b/internal/flushcommon/syncmgr/mock_meta_writer.go index bacc91649a397..a5857cd3a0275 100644 --- a/internal/flushcommon/syncmgr/mock_meta_writer.go +++ b/internal/flushcommon/syncmgr/mock_meta_writer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package syncmgr @@ -25,6 +25,10 @@ func (_m *MockMetaWriter) EXPECT() *MockMetaWriter_Expecter { func (_m *MockMetaWriter) DropChannel(_a0 context.Context, _a1 string) error { ret := _m.Called(_a0, _a1) + if len(ret) == 0 { + panic("no return value specified for DropChannel") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { r0 = rf(_a0, _a1) @@ -68,6 +72,10 @@ func (_c *MockMetaWriter_DropChannel_Call) RunAndReturn(run func(context.Context func (_m *MockMetaWriter) UpdateSync(_a0 context.Context, _a1 *SyncTask) error { ret := _m.Called(_a0, _a1) + if len(ret) == 0 { + panic("no return value specified for UpdateSync") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *SyncTask) error); ok { r0 = rf(_a0, _a1) diff --git a/internal/flushcommon/syncmgr/mock_serializer.go b/internal/flushcommon/syncmgr/mock_serializer.go index fdbf8236994c5..03d05aa85574f 100644 --- a/internal/flushcommon/syncmgr/mock_serializer.go +++ b/internal/flushcommon/syncmgr/mock_serializer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package syncmgr @@ -25,6 +25,10 @@ func (_m *MockSerializer) EXPECT() *MockSerializer_Expecter { func (_m *MockSerializer) EncodeBuffer(ctx context.Context, pack *SyncPack) (Task, error) { ret := _m.Called(ctx, pack) + if len(ret) == 0 { + panic("no return value specified for EncodeBuffer") + } + var r0 Task var r1 error if rf, ok := ret.Get(0).(func(context.Context, *SyncPack) (Task, error)); ok { diff --git a/internal/flushcommon/syncmgr/mock_task.go b/internal/flushcommon/syncmgr/mock_task.go index 01a80a59c58a4..cc7494c0032b4 100644 --- a/internal/flushcommon/syncmgr/mock_task.go +++ b/internal/flushcommon/syncmgr/mock_task.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package syncmgr @@ -26,6 +26,10 @@ func (_m *MockTask) EXPECT() *MockTask_Expecter { func (_m *MockTask) ChannelName() string { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for ChannelName") + } + var r0 string if rf, ok := ret.Get(0).(func() string); ok { r0 = rf() @@ -67,6 +71,10 @@ func (_c *MockTask_ChannelName_Call) RunAndReturn(run func() string) *MockTask_C func (_m *MockTask) Checkpoint() *msgpb.MsgPosition { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Checkpoint") + } + var r0 *msgpb.MsgPosition if rf, ok := ret.Get(0).(func() *msgpb.MsgPosition); ok { r0 = rf() @@ -143,6 +151,10 @@ func (_c *MockTask_HandleError_Call) RunAndReturn(run func(error)) *MockTask_Han func (_m *MockTask) IsFlush() bool { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for IsFlush") + } + var r0 bool if rf, ok := ret.Get(0).(func() bool); ok { r0 = rf() @@ -184,6 +196,10 @@ func (_c *MockTask_IsFlush_Call) RunAndReturn(run func() bool) *MockTask_IsFlush func (_m *MockTask) Run(_a0 context.Context) error { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for Run") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context) error); ok { r0 = rf(_a0) @@ -226,6 +242,10 @@ func (_c *MockTask_Run_Call) RunAndReturn(run func(context.Context) error) *Mock func (_m *MockTask) SegmentID() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for SegmentID") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() @@ -267,6 +287,10 @@ func (_c *MockTask_SegmentID_Call) RunAndReturn(run func() int64) *MockTask_Segm func (_m *MockTask) StartPosition() *msgpb.MsgPosition { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for StartPosition") + } + var r0 *msgpb.MsgPosition if rf, ok := ret.Get(0).(func() *msgpb.MsgPosition); ok { r0 = rf() diff --git a/internal/flushcommon/syncmgr/storage_serializer.go b/internal/flushcommon/syncmgr/storage_serializer.go index 6f4606c1c7864..d45e9755aed49 100644 --- a/internal/flushcommon/syncmgr/storage_serializer.go +++ b/internal/flushcommon/syncmgr/storage_serializer.go @@ -248,7 +248,8 @@ func (s *storageV1Serializer) serializeMergedPkStats(pack *SyncPack) (*storage.B return nil, merr.WrapErrSegmentNotFound(pack.segmentID) } - return s.inCodec.SerializePkStatsList(lo.Map(segment.GetHistory(), func(pks *storage.PkStatistics, _ int) *storage.PrimaryKeyStats { + // Allow to flush empty segment to make streaming service easier to implement rollback transaction. + stats := lo.Map(segment.GetHistory(), func(pks *storage.PkStatistics, _ int) *storage.PrimaryKeyStats { return &storage.PrimaryKeyStats{ FieldID: s.pkField.GetFieldID(), MaxPk: pks.MaxPK, @@ -257,7 +258,11 @@ func (s *storageV1Serializer) serializeMergedPkStats(pack *SyncPack) (*storage.B BF: pks.PkFilter, PkType: int64(s.pkField.GetDataType()), } - }), segment.NumOfRows()) + }) + if len(stats) == 0 { + return nil, nil + } + return s.inCodec.SerializePkStatsList(stats, segment.NumOfRows()) } func (s *storageV1Serializer) serializeMergedBM25Stats(pack *SyncPack) (map[int64]*storage.Blob, error) { @@ -267,8 +272,9 @@ func (s *storageV1Serializer) serializeMergedBM25Stats(pack *SyncPack) (map[int6 } stats := segment.GetBM25Stats() + // Allow to flush empty segment to make streaming service easier to implement rollback transaction. if stats == nil { - return nil, fmt.Errorf("searalize empty bm25 stats") + return nil, nil } fieldBytes, numRow, err := stats.Serialize() diff --git a/internal/flushcommon/writebuffer/manager.go b/internal/flushcommon/writebuffer/manager.go index f2d325a148dd3..38ec89f2d8211 100644 --- a/internal/flushcommon/writebuffer/manager.go +++ b/internal/flushcommon/writebuffer/manager.go @@ -24,6 +24,8 @@ import ( type BufferManager interface { // Register adds a WriteBuffer with provided schema & options. Register(channel string, metacache metacache.MetaCache, opts ...WriteBufferOption) error + // CreateNewGrowingSegment notifies writeBuffer to create a new growing segment. + CreateNewGrowingSegment(ctx context.Context, channel string, partition int64, segmentID int64) error // SealSegments notifies writeBuffer corresponding to provided channel to seal segments. // which will cause segment start flush procedure. SealSegments(ctx context.Context, channel string, segmentIDs []int64) error @@ -156,6 +158,22 @@ func (m *bufferManager) Register(channel string, metacache metacache.MetaCache, return nil } +// CreateNewGrowingSegment notifies writeBuffer to create a new growing segment. +func (m *bufferManager) CreateNewGrowingSegment(ctx context.Context, channel string, partitionID int64, segmentID int64) error { + m.mut.RLock() + buf, ok := m.buffers[channel] + m.mut.RUnlock() + if !ok { + log.Ctx(ctx).Warn("write buffer not found when create new growing segment", + zap.String("channel", channel), + zap.Int64("partitionID", partitionID), + zap.Int64("segmentID", segmentID)) + return merr.WrapErrChannelNotFound(channel) + } + buf.CreateNewGrowingSegment(partitionID, segmentID, nil) + return nil +} + // SealSegments call sync segment and change segments state to Flushed. func (m *bufferManager) SealSegments(ctx context.Context, channel string, segmentIDs []int64) error { m.mut.RLock() diff --git a/internal/flushcommon/writebuffer/manager_test.go b/internal/flushcommon/writebuffer/manager_test.go index 63cb015657305..d344c48d83d7f 100644 --- a/internal/flushcommon/writebuffer/manager_test.go +++ b/internal/flushcommon/writebuffer/manager_test.go @@ -107,6 +107,25 @@ func (s *ManagerSuite) TestFlushSegments() { }) } +func (s *ManagerSuite) TestCreateNewGrowingSegment() { + manager := s.manager + err := manager.CreateNewGrowingSegment(context.Background(), s.channelName, 1, 1) + s.Error(err) + + s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(nil, false).Once() + s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() + + wb, err := NewL0WriteBuffer(s.channelName, s.metacache, s.syncMgr, &writeBufferOption{ + idAllocator: s.allocator, + }) + s.NoError(err) + s.manager.mut.Lock() + s.manager.buffers[s.channelName] = wb + s.manager.mut.Unlock() + err = manager.CreateNewGrowingSegment(context.Background(), s.channelName, 1, 1) + s.NoError(err) +} + func (s *ManagerSuite) TestBufferData() { manager := s.manager s.Run("channel_not_found", func() { diff --git a/internal/flushcommon/writebuffer/mock_manager.go b/internal/flushcommon/writebuffer/mock_manager.go index 4b9bde855779a..dc305f8aac247 100644 --- a/internal/flushcommon/writebuffer/mock_manager.go +++ b/internal/flushcommon/writebuffer/mock_manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package writebuffer @@ -30,6 +30,10 @@ func (_m *MockBufferManager) EXPECT() *MockBufferManager_Expecter { func (_m *MockBufferManager) BufferData(channel string, insertData []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos *msgpb.MsgPosition, endPos *msgpb.MsgPosition) error { ret := _m.Called(channel, insertData, deleteMsgs, startPos, endPos) + if len(ret) == 0 { + panic("no return value specified for BufferData") + } + var r0 error if rf, ok := ret.Get(0).(func(string, []*InsertData, []*msgstream.DeleteMsg, *msgpb.MsgPosition, *msgpb.MsgPosition) error); ok { r0 = rf(channel, insertData, deleteMsgs, startPos, endPos) @@ -72,6 +76,55 @@ func (_c *MockBufferManager_BufferData_Call) RunAndReturn(run func(string, []*In return _c } +// CreateNewGrowingSegment provides a mock function with given fields: ctx, channel, partition, segmentID +func (_m *MockBufferManager) CreateNewGrowingSegment(ctx context.Context, channel string, partition int64, segmentID int64) error { + ret := _m.Called(ctx, channel, partition, segmentID) + + if len(ret) == 0 { + panic("no return value specified for CreateNewGrowingSegment") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, int64, int64) error); ok { + r0 = rf(ctx, channel, partition, segmentID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockBufferManager_CreateNewGrowingSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateNewGrowingSegment' +type MockBufferManager_CreateNewGrowingSegment_Call struct { + *mock.Call +} + +// CreateNewGrowingSegment is a helper method to define mock.On call +// - ctx context.Context +// - channel string +// - partition int64 +// - segmentID int64 +func (_e *MockBufferManager_Expecter) CreateNewGrowingSegment(ctx interface{}, channel interface{}, partition interface{}, segmentID interface{}) *MockBufferManager_CreateNewGrowingSegment_Call { + return &MockBufferManager_CreateNewGrowingSegment_Call{Call: _e.mock.On("CreateNewGrowingSegment", ctx, channel, partition, segmentID)} +} + +func (_c *MockBufferManager_CreateNewGrowingSegment_Call) Run(run func(ctx context.Context, channel string, partition int64, segmentID int64)) *MockBufferManager_CreateNewGrowingSegment_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(int64), args[3].(int64)) + }) + return _c +} + +func (_c *MockBufferManager_CreateNewGrowingSegment_Call) Return(_a0 error) *MockBufferManager_CreateNewGrowingSegment_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockBufferManager_CreateNewGrowingSegment_Call) RunAndReturn(run func(context.Context, string, int64, int64) error) *MockBufferManager_CreateNewGrowingSegment_Call { + _c.Call.Return(run) + return _c +} + // DropChannel provides a mock function with given fields: channel func (_m *MockBufferManager) DropChannel(channel string) { _m.Called(channel) @@ -143,6 +196,10 @@ func (_c *MockBufferManager_DropPartitions_Call) RunAndReturn(run func(string, [ func (_m *MockBufferManager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error { ret := _m.Called(ctx, channel, flushTs) + if len(ret) == 0 { + panic("no return value specified for FlushChannel") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, uint64) error); ok { r0 = rf(ctx, channel, flushTs) @@ -187,6 +244,10 @@ func (_c *MockBufferManager_FlushChannel_Call) RunAndReturn(run func(context.Con func (_m *MockBufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) { ret := _m.Called(channel) + if len(ret) == 0 { + panic("no return value specified for GetCheckpoint") + } + var r0 *msgpb.MsgPosition var r1 bool var r2 error @@ -289,6 +350,10 @@ func (_m *MockBufferManager) Register(channel string, _a1 metacache.MetaCache, o _ca = append(_ca, _va...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for Register") + } + var r0 error if rf, ok := ret.Get(0).(func(string, metacache.MetaCache, ...WriteBufferOption) error); ok { r0 = rf(channel, _a1, opts...) @@ -373,6 +438,10 @@ func (_c *MockBufferManager_RemoveChannel_Call) RunAndReturn(run func(string)) * func (_m *MockBufferManager) SealSegments(ctx context.Context, channel string, segmentIDs []int64) error { ret := _m.Called(ctx, channel, segmentIDs) + if len(ret) == 0 { + panic("no return value specified for SealSegments") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, []int64) error); ok { r0 = rf(ctx, channel, segmentIDs) diff --git a/internal/flushcommon/writebuffer/mock_write_buffer.go b/internal/flushcommon/writebuffer/mock_write_buffer.go index 93635c4178c28..7fcff704744dc 100644 --- a/internal/flushcommon/writebuffer/mock_write_buffer.go +++ b/internal/flushcommon/writebuffer/mock_write_buffer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package writebuffer @@ -28,6 +28,10 @@ func (_m *MockWriteBuffer) EXPECT() *MockWriteBuffer_Expecter { func (_m *MockWriteBuffer) BufferData(insertMsgs []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos *msgpb.MsgPosition, endPos *msgpb.MsgPosition) error { ret := _m.Called(insertMsgs, deleteMsgs, startPos, endPos) + if len(ret) == 0 { + panic("no return value specified for BufferData") + } + var r0 error if rf, ok := ret.Get(0).(func([]*InsertData, []*msgstream.DeleteMsg, *msgpb.MsgPosition, *msgpb.MsgPosition) error); ok { r0 = rf(insertMsgs, deleteMsgs, startPos, endPos) @@ -103,6 +107,41 @@ func (_c *MockWriteBuffer_Close_Call) RunAndReturn(run func(context.Context, boo return _c } +// CreateNewGrowingSegment provides a mock function with given fields: partitionID, segmentID, startPos +func (_m *MockWriteBuffer) CreateNewGrowingSegment(partitionID int64, segmentID int64, startPos *msgpb.MsgPosition) { + _m.Called(partitionID, segmentID, startPos) +} + +// MockWriteBuffer_CreateNewGrowingSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateNewGrowingSegment' +type MockWriteBuffer_CreateNewGrowingSegment_Call struct { + *mock.Call +} + +// CreateNewGrowingSegment is a helper method to define mock.On call +// - partitionID int64 +// - segmentID int64 +// - startPos *msgpb.MsgPosition +func (_e *MockWriteBuffer_Expecter) CreateNewGrowingSegment(partitionID interface{}, segmentID interface{}, startPos interface{}) *MockWriteBuffer_CreateNewGrowingSegment_Call { + return &MockWriteBuffer_CreateNewGrowingSegment_Call{Call: _e.mock.On("CreateNewGrowingSegment", partitionID, segmentID, startPos)} +} + +func (_c *MockWriteBuffer_CreateNewGrowingSegment_Call) Run(run func(partitionID int64, segmentID int64, startPos *msgpb.MsgPosition)) *MockWriteBuffer_CreateNewGrowingSegment_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(int64), args[2].(*msgpb.MsgPosition)) + }) + return _c +} + +func (_c *MockWriteBuffer_CreateNewGrowingSegment_Call) Return() *MockWriteBuffer_CreateNewGrowingSegment_Call { + _c.Call.Return() + return _c +} + +func (_c *MockWriteBuffer_CreateNewGrowingSegment_Call) RunAndReturn(run func(int64, int64, *msgpb.MsgPosition)) *MockWriteBuffer_CreateNewGrowingSegment_Call { + _c.Call.Return(run) + return _c +} + // DropPartitions provides a mock function with given fields: partitionIDs func (_m *MockWriteBuffer) DropPartitions(partitionIDs []int64) { _m.Called(partitionIDs) @@ -186,6 +225,10 @@ func (_c *MockWriteBuffer_EvictBuffer_Call) RunAndReturn(run func(...SyncPolicy) func (_m *MockWriteBuffer) GetCheckpoint() *msgpb.MsgPosition { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetCheckpoint") + } + var r0 *msgpb.MsgPosition if rf, ok := ret.Get(0).(func() *msgpb.MsgPosition); ok { r0 = rf() @@ -229,6 +272,10 @@ func (_c *MockWriteBuffer_GetCheckpoint_Call) RunAndReturn(run func() *msgpb.Msg func (_m *MockWriteBuffer) GetFlushTimestamp() uint64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetFlushTimestamp") + } + var r0 uint64 if rf, ok := ret.Get(0).(func() uint64); ok { r0 = rf() @@ -270,6 +317,10 @@ func (_c *MockWriteBuffer_GetFlushTimestamp_Call) RunAndReturn(run func() uint64 func (_m *MockWriteBuffer) HasSegment(segmentID int64) bool { ret := _m.Called(segmentID) + if len(ret) == 0 { + panic("no return value specified for HasSegment") + } + var r0 bool if rf, ok := ret.Get(0).(func(int64) bool); ok { r0 = rf(segmentID) @@ -312,6 +363,10 @@ func (_c *MockWriteBuffer_HasSegment_Call) RunAndReturn(run func(int64) bool) *M func (_m *MockWriteBuffer) MemorySize() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for MemorySize") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() @@ -353,6 +408,10 @@ func (_c *MockWriteBuffer_MemorySize_Call) RunAndReturn(run func() int64) *MockW func (_m *MockWriteBuffer) SealSegments(ctx context.Context, segmentIDs []int64) error { ret := _m.Called(ctx, segmentIDs) + if len(ret) == 0 { + panic("no return value specified for SealSegments") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, []int64) error); ok { r0 = rf(ctx, segmentIDs) diff --git a/internal/flushcommon/writebuffer/write_buffer.go b/internal/flushcommon/writebuffer/write_buffer.go index 1ad6fb66365aa..38f7e6ca168c3 100644 --- a/internal/flushcommon/writebuffer/write_buffer.go +++ b/internal/flushcommon/writebuffer/write_buffer.go @@ -37,6 +37,8 @@ const ( type WriteBuffer interface { // HasSegment checks whether certain segment exists in this buffer. HasSegment(segmentID int64) bool + // CreateNewGrowingSegment creates a new growing segment in the buffer. + CreateNewGrowingSegment(partitionID int64, segmentID int64, startPos *msgpb.MsgPosition) // BufferData is the method to buffer dml data msgs. BufferData(insertMsgs []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error // FlushTimestamp set flush timestamp for write buffer @@ -520,14 +522,13 @@ func (id *InsertData) batchPkExists(pks []storage.PrimaryKey, tss []uint64, hits return hits } -// bufferInsert function InsertMsg into bufferred InsertData and returns primary key field data for future usage. -func (wb *writeBufferBase) bufferInsert(inData *InsertData, startPos, endPos *msgpb.MsgPosition) error { - _, ok := wb.metaCache.GetSegmentByID(inData.segmentID) +func (wb *writeBufferBase) CreateNewGrowingSegment(partitionID int64, segmentID int64, startPos *msgpb.MsgPosition) { + _, ok := wb.metaCache.GetSegmentByID(segmentID) // new segment if !ok { wb.metaCache.AddSegment(&datapb.SegmentInfo{ - ID: inData.segmentID, - PartitionID: inData.partitionID, + ID: segmentID, + PartitionID: partitionID, CollectionID: wb.collectionID, InsertChannel: wb.channelName, StartPosition: startPos, @@ -535,14 +536,20 @@ func (wb *writeBufferBase) bufferInsert(inData *InsertData, startPos, endPos *ms }, func(_ *datapb.SegmentInfo) pkoracle.PkStat { return pkoracle.NewBloomFilterSetWithBatchSize(wb.getEstBatchSize()) }, metacache.NewBM25StatsFactory, metacache.SetStartPosRecorded(false)) - log.Info("add growing segment", zap.Int64("segmentID", inData.segmentID), zap.String("channel", wb.channelName)) + log.Info("add growing segment", zap.Int64("segmentID", segmentID), zap.String("channel", wb.channelName)) } +} +// bufferInsert function InsertMsg into bufferred InsertData and returns primary key field data for future usage. +func (wb *writeBufferBase) bufferInsert(inData *InsertData, startPos, endPos *msgpb.MsgPosition) error { + wb.CreateNewGrowingSegment(inData.partitionID, inData.segmentID, startPos) segBuf := wb.getOrCreateBuffer(inData.segmentID) totalMemSize := segBuf.insertBuffer.Buffer(inData, startPos, endPos) - wb.metaCache.UpdateSegments(metacache.UpdateBufferedRows(segBuf.insertBuffer.rows), - metacache.WithSegmentIDs(inData.segmentID)) + wb.metaCache.UpdateSegments(metacache.SegmentActions( + metacache.UpdateNumOfRows(segBuf.insertBuffer.rows), + metacache.SetStartPositionIfNil(startPos), + ), metacache.WithSegmentIDs(inData.segmentID)) metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Add(float64(totalMemSize)) diff --git a/internal/mocks/streamingnode/server/mock_flusher/mock_FlushMsgHandler.go b/internal/mocks/streamingnode/server/mock_flusher/mock_FlushMsgHandler.go new file mode 100644 index 0000000000000..b2e6662497f65 --- /dev/null +++ b/internal/mocks/streamingnode/server/mock_flusher/mock_FlushMsgHandler.go @@ -0,0 +1,180 @@ +// Code generated by mockery v2.46.0. DO NOT EDIT. + +package mock_flusher + +import ( + context "context" + + "github.com/milvus-io/milvus/pkg/streaming/util/message" + + mock "github.com/stretchr/testify/mock" +) + +// MockFlushMsgHandler is an autogenerated mock type for the FlushMsgHandler type +type MockFlushMsgHandler struct { + mock.Mock +} + +type MockFlushMsgHandler_Expecter struct { + mock *mock.Mock +} + +func (_m *MockFlushMsgHandler) EXPECT() *MockFlushMsgHandler_Expecter { + return &MockFlushMsgHandler_Expecter{mock: &_m.Mock} +} + +// HandleCreateSegment provides a mock function with given fields: ctx, vchannel, createSegmentMsg +func (_m *MockFlushMsgHandler) HandleCreateSegment(ctx context.Context, vchannel string, createSegmentMsg message.ImmutableCreateSegmentMessageV2) error { + ret := _m.Called(ctx, vchannel, createSegmentMsg) + + if len(ret) == 0 { + panic("no return value specified for HandleCreateSegment") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, message.ImmutableCreateSegmentMessageV2) error); ok { + r0 = rf(ctx, vchannel, createSegmentMsg) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockFlushMsgHandler_HandleCreateSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HandleCreateSegment' +type MockFlushMsgHandler_HandleCreateSegment_Call struct { + *mock.Call +} + +// HandleCreateSegment is a helper method to define mock.On call +// - ctx context.Context +// - vchannel string +// - createSegmentMsg message.specializedImmutableMessage[*messagespb.CreateSegmentMessageHeader,*messagespb.CreateSegmentMessageBody] +func (_e *MockFlushMsgHandler_Expecter) HandleCreateSegment(ctx interface{}, vchannel interface{}, createSegmentMsg interface{}) *MockFlushMsgHandler_HandleCreateSegment_Call { + return &MockFlushMsgHandler_HandleCreateSegment_Call{Call: _e.mock.On("HandleCreateSegment", ctx, vchannel, createSegmentMsg)} +} + +func (_c *MockFlushMsgHandler_HandleCreateSegment_Call) Run(run func(ctx context.Context, vchannel string, createSegmentMsg message.ImmutableCreateSegmentMessageV2)) *MockFlushMsgHandler_HandleCreateSegment_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(message.ImmutableCreateSegmentMessageV2)) + }) + return _c +} + +func (_c *MockFlushMsgHandler_HandleCreateSegment_Call) Return(_a0 error) *MockFlushMsgHandler_HandleCreateSegment_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockFlushMsgHandler_HandleCreateSegment_Call) RunAndReturn(run func(context.Context, string, message.ImmutableCreateSegmentMessageV2) error) *MockFlushMsgHandler_HandleCreateSegment_Call { + _c.Call.Return(run) + return _c +} + +// HandleFlush provides a mock function with given fields: vchannel, flushMsg +func (_m *MockFlushMsgHandler) HandleFlush(vchannel string, flushMsg message.ImmutableFlushMessageV2) error { + ret := _m.Called(vchannel, flushMsg) + + if len(ret) == 0 { + panic("no return value specified for HandleFlush") + } + + var r0 error + if rf, ok := ret.Get(0).(func(string, message.ImmutableFlushMessageV2) error); ok { + r0 = rf(vchannel, flushMsg) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockFlushMsgHandler_HandleFlush_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HandleFlush' +type MockFlushMsgHandler_HandleFlush_Call struct { + *mock.Call +} + +// HandleFlush is a helper method to define mock.On call +// - vchannel string +// - flushMsg message.specializedImmutableMessage[*messagespb.FlushMessageHeader,*messagespb.FlushMessageBody] +func (_e *MockFlushMsgHandler_Expecter) HandleFlush(vchannel interface{}, flushMsg interface{}) *MockFlushMsgHandler_HandleFlush_Call { + return &MockFlushMsgHandler_HandleFlush_Call{Call: _e.mock.On("HandleFlush", vchannel, flushMsg)} +} + +func (_c *MockFlushMsgHandler_HandleFlush_Call) Run(run func(vchannel string, flushMsg message.ImmutableFlushMessageV2)) *MockFlushMsgHandler_HandleFlush_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(message.ImmutableFlushMessageV2)) + }) + return _c +} + +func (_c *MockFlushMsgHandler_HandleFlush_Call) Return(_a0 error) *MockFlushMsgHandler_HandleFlush_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockFlushMsgHandler_HandleFlush_Call) RunAndReturn(run func(string, message.ImmutableFlushMessageV2) error) *MockFlushMsgHandler_HandleFlush_Call { + _c.Call.Return(run) + return _c +} + +// HandleManualFlush provides a mock function with given fields: vchannel, flushMsg +func (_m *MockFlushMsgHandler) HandleManualFlush(vchannel string, flushMsg message.ImmutableManualFlushMessageV2) error { + ret := _m.Called(vchannel, flushMsg) + + if len(ret) == 0 { + panic("no return value specified for HandleManualFlush") + } + + var r0 error + if rf, ok := ret.Get(0).(func(string, message.ImmutableManualFlushMessageV2) error); ok { + r0 = rf(vchannel, flushMsg) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockFlushMsgHandler_HandleManualFlush_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HandleManualFlush' +type MockFlushMsgHandler_HandleManualFlush_Call struct { + *mock.Call +} + +// HandleManualFlush is a helper method to define mock.On call +// - vchannel string +// - flushMsg message.specializedImmutableMessage[*messagespb.ManualFlushMessageHeader,*messagespb.ManualFlushMessageBody] +func (_e *MockFlushMsgHandler_Expecter) HandleManualFlush(vchannel interface{}, flushMsg interface{}) *MockFlushMsgHandler_HandleManualFlush_Call { + return &MockFlushMsgHandler_HandleManualFlush_Call{Call: _e.mock.On("HandleManualFlush", vchannel, flushMsg)} +} + +func (_c *MockFlushMsgHandler_HandleManualFlush_Call) Run(run func(vchannel string, flushMsg message.ImmutableManualFlushMessageV2)) *MockFlushMsgHandler_HandleManualFlush_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(message.ImmutableManualFlushMessageV2)) + }) + return _c +} + +func (_c *MockFlushMsgHandler_HandleManualFlush_Call) Return(_a0 error) *MockFlushMsgHandler_HandleManualFlush_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockFlushMsgHandler_HandleManualFlush_Call) RunAndReturn(run func(string, message.ImmutableManualFlushMessageV2) error) *MockFlushMsgHandler_HandleManualFlush_Call { + _c.Call.Return(run) + return _c +} + +// NewMockFlushMsgHandler creates a new instance of MockFlushMsgHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockFlushMsgHandler(t interface { + mock.TestingT + Cleanup(func()) +}) *MockFlushMsgHandler { + mock := &MockFlushMsgHandler{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/proxy/task_insert_streaming.go b/internal/proxy/task_insert_streaming.go index a452816f12c02..0a99d9816844e 100644 --- a/internal/proxy/task_insert_streaming.go +++ b/internal/proxy/task_insert_streaming.go @@ -97,6 +97,7 @@ func repackInsertDataForStreamingService( return nil, err } for _, msg := range msgs { + insertRequest := msg.(*msgstream.InsertMsg).InsertRequest newMsg, err := message.NewInsertMessageBuilderV1(). WithVChannel(channel). WithHeader(&message.InsertMessageHeader{ @@ -104,12 +105,12 @@ func repackInsertDataForStreamingService( Partitions: []*message.PartitionSegmentAssignment{ { PartitionId: partitionID, - Rows: uint64(len(rowOffsets)), + Rows: insertRequest.GetNumRows(), BinarySize: 0, // TODO: current not used, message estimate size is used. }, }, }). - WithBody(msg.(*msgstream.InsertMsg).InsertRequest). + WithBody(insertRequest). BuildMutable() if err != nil { return nil, err @@ -175,6 +176,7 @@ func repackInsertDataWithPartitionKeyForStreamingService( return nil, err } for _, msg := range msgs { + insertRequest := msg.(*msgstream.InsertMsg).InsertRequest newMsg, err := message.NewInsertMessageBuilderV1(). WithVChannel(channel). WithHeader(&message.InsertMessageHeader{ @@ -182,12 +184,12 @@ func repackInsertDataWithPartitionKeyForStreamingService( Partitions: []*message.PartitionSegmentAssignment{ { PartitionId: partitionIDs[partitionName], - Rows: uint64(len(rowOffsets)), + Rows: insertRequest.GetNumRows(), BinarySize: 0, // TODO: current not used, message estimate size is used. }, }, }). - WithBody(msg.(*msgstream.InsertMsg).InsertRequest). + WithBody(insertRequest). BuildMutable() if err != nil { return nil, err diff --git a/internal/streamingnode/server/flusher/flusherimpl/flushmsg_handler_impl.go b/internal/streamingnode/server/flusher/flusherimpl/flushmsg_handler_impl.go index 7993495acb36e..4e05250cb37e6 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/flushmsg_handler_impl.go +++ b/internal/streamingnode/server/flusher/flusherimpl/flushmsg_handler_impl.go @@ -35,6 +35,19 @@ type flushMsgHandlerImpl struct { wbMgr writebuffer.BufferManager } +func (impl *flushMsgHandlerImpl) HandleCreateSegment(ctx context.Context, vchannel string, createSegmentMsg message.ImmutableCreateSegmentMessageV2) error { + body, err := createSegmentMsg.Body() + if err != nil { + return errors.Wrap(err, "failed to get create segment message body") + } + for _, segmentInfo := range body.GetSegments() { + if err := impl.wbMgr.CreateNewGrowingSegment(ctx, vchannel, segmentInfo.GetPartitionId(), segmentInfo.GetSegmentId()); err != nil { + return err + } + } + return nil +} + func (impl *flushMsgHandlerImpl) HandleFlush(vchannel string, flushMsg message.ImmutableFlushMessageV2) error { body, err := flushMsg.Body() if err != nil { diff --git a/internal/streamingnode/server/flusher/flushmsg_handler.go b/internal/streamingnode/server/flusher/flushmsg_handler.go index 5df71680e02cf..b070c058cf198 100644 --- a/internal/streamingnode/server/flusher/flushmsg_handler.go +++ b/internal/streamingnode/server/flusher/flushmsg_handler.go @@ -16,9 +16,15 @@ package flusher -import "github.com/milvus-io/milvus/pkg/streaming/util/message" +import ( + "context" + + "github.com/milvus-io/milvus/pkg/streaming/util/message" +) type FlushMsgHandler interface { + HandleCreateSegment(ctx context.Context, vchannel string, createSegmentMsg message.ImmutableCreateSegmentMessageV2) error + HandleFlush(vchannel string, flushMsg message.ImmutableFlushMessageV2) error HandleManualFlush(vchannel string, flushMsg message.ImmutableManualFlushMessageV2) error diff --git a/internal/streamingnode/server/wal/adaptor/builder.go b/internal/streamingnode/server/wal/adaptor/builder.go index 6190fca4909df..70e629ce7d6fe 100644 --- a/internal/streamingnode/server/wal/adaptor/builder.go +++ b/internal/streamingnode/server/wal/adaptor/builder.go @@ -4,6 +4,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/ddl" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/redo" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick" "github.com/milvus-io/milvus/pkg/streaming/walimpls" @@ -32,6 +33,7 @@ func (b builderAdaptorImpl) Build() (wal.Opener, error) { } // Add all interceptor here. return adaptImplsToOpener(o, []interceptors.InterceptorBuilder{ + redo.NewInterceptorBuilder(), timetick.NewInterceptorBuilder(), segment.NewInterceptorBuilder(), ddl.NewInterceptorBuilder(), diff --git a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go index b6d654f3b42bd..eaf0550f594b2 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go @@ -4,6 +4,7 @@ import ( "context" "go.uber.org/zap" + "google.golang.org/protobuf/types/known/anypb" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" @@ -115,13 +116,20 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) metricsGuard.Finish(err) return nil, err } + var extra *anypb.Any + if extraAppendResult.Extra != nil { + var err error + if extra, err = anypb.New(extraAppendResult.Extra); err != nil { + panic("unreachable: failed to marshal extra append result") + } + } // unwrap the messageID if needed. r := &wal.AppendResult{ MessageID: messageID, TimeTick: extraAppendResult.TimeTick, TxnCtx: extraAppendResult.TxnCtx, - Extra: extraAppendResult.Extra, + Extra: extra, } metricsGuard.Finish(nil) return r, nil diff --git a/internal/streamingnode/server/wal/interceptors/redo/builder.go b/internal/streamingnode/server/wal/interceptors/redo/builder.go new file mode 100644 index 0000000000000..13a04479aa476 --- /dev/null +++ b/internal/streamingnode/server/wal/interceptors/redo/builder.go @@ -0,0 +1,16 @@ +package redo + +import "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" + +// NewInterceptorBuilder creates a new redo interceptor builder. +func NewInterceptorBuilder() interceptors.InterceptorBuilder { + return &interceptorBuilder{} +} + +// interceptorBuilder is the builder for redo interceptor. +type interceptorBuilder struct{} + +// Build creates a new redo interceptor. +func (b *interceptorBuilder) Build(param interceptors.InterceptorBuildParam) interceptors.Interceptor { + return &redoAppendInterceptor{} +} diff --git a/internal/streamingnode/server/wal/interceptors/redo/redo_interceptor.go b/internal/streamingnode/server/wal/interceptors/redo/redo_interceptor.go new file mode 100644 index 0000000000000..b9eab165ee9cd --- /dev/null +++ b/internal/streamingnode/server/wal/interceptors/redo/redo_interceptor.go @@ -0,0 +1,34 @@ +package redo + +import ( + "context" + "errors" + + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" + "github.com/milvus-io/milvus/pkg/streaming/util/message" +) + +var ( + _ interceptors.Interceptor = (*redoAppendInterceptor)(nil) + ErrRedo = errors.New("redo") +) + +// redoAppendInterceptor is an append interceptor to retry the append operation if needed. +// It's useful when the append operation want to refresh the append context (such as timetick belong to the message) +type redoAppendInterceptor struct{} + +func (r *redoAppendInterceptor) DoAppend(ctx context.Context, msg message.MutableMessage, append interceptors.Append) (msgID message.MessageID, err error) { + for { + if ctx.Err() != nil { + return nil, ctx.Err() + } + msgID, err = append(ctx, msg) + // If the error is ErrRedo, we should redo the append operation. + if errors.Is(err, ErrRedo) { + continue + } + return msgID, err + } +} + +func (r *redoAppendInterceptor) Close() {} diff --git a/internal/streamingnode/server/wal/interceptors/segment/inspector/impls.go b/internal/streamingnode/server/wal/interceptors/segment/inspector/impls.go index 32ee6b8299185..de8ed3e119b2f 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/inspector/impls.go +++ b/internal/streamingnode/server/wal/interceptors/segment/inspector/impls.go @@ -55,8 +55,8 @@ func (s *sealOperationInspectorImpl) TriggerSealWaited(ctx context.Context, pcha } } -// RegsiterPChannelManager implements SealInspector.RegsiterPChannelManager. -func (s *sealOperationInspectorImpl) RegsiterPChannelManager(m SealOperator) { +// RegisterPChannelManager implements SealInspector.RegisterPChannelManager. +func (s *sealOperationInspectorImpl) RegisterPChannelManager(m SealOperator) { _, loaded := s.managers.GetOrInsert(m.Channel().Name, m) if loaded { panic("pchannel manager already exists, critical bug in code") diff --git a/internal/streamingnode/server/wal/interceptors/segment/inspector/inspector.go b/internal/streamingnode/server/wal/interceptors/segment/inspector/inspector.go index caa1e4155fbaf..7fb77b665a92f 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/inspector/inspector.go +++ b/internal/streamingnode/server/wal/interceptors/segment/inspector/inspector.go @@ -27,7 +27,7 @@ type SealOperationInspector interface { TriggerSealWaited(ctx context.Context, pchannel string) error // RegisterPChannelManager registers a pchannel manager. - RegsiterPChannelManager(m SealOperator) + RegisterPChannelManager(m SealOperator) // UnregisterPChannelManager unregisters a pchannel manager. UnregisterPChannelManager(m SealOperator) diff --git a/internal/streamingnode/server/wal/interceptors/segment/inspector/inspector_test.go b/internal/streamingnode/server/wal/interceptors/segment/inspector/inspector_test.go index d795cc19cb524..3876341988108 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/inspector/inspector_test.go +++ b/internal/streamingnode/server/wal/interceptors/segment/inspector/inspector_test.go @@ -39,7 +39,7 @@ func TestSealedInspector(t *testing.T) { return ops.Load()%2 == 0 }) - inspector.RegsiterPChannelManager(o) + inspector.RegisterPChannelManager(o) wg := sync.WaitGroup{} wg.Add(2) go func() { diff --git a/internal/streamingnode/server/wal/interceptors/segment/manager/partition_manager.go b/internal/streamingnode/server/wal/interceptors/segment/manager/partition_manager.go index fa80d09424e94..def30b9575115 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/manager/partition_manager.go +++ b/internal/streamingnode/server/wal/interceptors/segment/manager/partition_manager.go @@ -10,19 +10,23 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/streamingnode/server/resource" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/policy" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil" - "github.com/milvus-io/milvus/internal/util/streamingutil/status" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/streaming/proto/messagespb" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" + "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/syncutil" ) var ErrFencedAssign = errors.New("fenced assign") // newPartitionSegmentManager creates a new partition segment assign manager. func newPartitionSegmentManager( + wal *syncutil.Future[wal.WAL], pchannel types.PChannelInfo, vchannel string, collectionID int64, @@ -37,6 +41,7 @@ func newPartitionSegmentManager( zap.String("vchannel", vchannel), zap.Int64("collectionID", collectionID), zap.Int64("partitionID", paritionID)), + wal: wal, pchannel: pchannel, vchannel: vchannel, collectionID: collectionID, @@ -50,6 +55,7 @@ func newPartitionSegmentManager( type partitionSegmentManager struct { mu sync.Mutex logger *log.MLogger + wal *syncutil.Future[wal.WAL] pchannel types.PChannelInfo vchannel string collectionID int64 @@ -72,17 +78,20 @@ func (m *partitionSegmentManager) AssignSegment(ctx context.Context, req *Assign // So it's just a promise check here. // If the request time tick is less than the fenced time tick, the assign operation is fenced. // A special error will be returned to indicate the assign operation is fenced. - // The wal will retry it with new timetick. if req.TimeTick <= m.fencedAssignTimeTick { return nil, ErrFencedAssign } return m.assignSegment(ctx, req) } -// SealAllSegmentsAndFenceUntil seals all segments and fence assign until the maximum of timetick or max time tick. -func (m *partitionSegmentManager) SealAllSegmentsAndFenceUntil(timeTick uint64) (sealedSegments []*segmentAllocManager) { +// SealAndFenceSegmentUntil seal all segment that contains the message less than the incoming timetick. +func (m *partitionSegmentManager) SealAndFenceSegmentUntil(timeTick uint64) (sealedSegments []*segmentAllocManager) { m.mu.Lock() defer m.mu.Unlock() + // no-op if the incoming time tick is less than the fenced time tick. + if timeTick <= m.fencedAssignTimeTick { + return + } segmentManagers := m.collectShouldBeSealedWithPolicy(func(segmentMeta *segmentAllocManager) (policy.PolicyName, bool) { return policy.PolicyNameFenced, true }) // fence the assign operation until the incoming time tick or latest assigned timetick. @@ -225,6 +234,27 @@ func (m *partitionSegmentManager) allocNewGrowingSegment(ctx context.Context) (* if err := merr.CheckRPCCall(resp, err); err != nil { return nil, errors.Wrap(err, "failed to alloc growing segment at datacoord") } + msg, err := message.NewCreateSegmentMessageBuilderV2(). + WithVChannel(pendingSegment.GetVChannel()). + WithHeader(&message.CreateSegmentMessageHeader{}). + WithBody(&message.CreateSegmentMessageBody{ + CollectionId: pendingSegment.GetCollectionID(), + Segments: []*messagespb.CreateSegmentInfo{{ + // We only execute one segment creation operation at a time. + // But in future, we need to modify the segment creation operation to support batch creation. + // Because the partition-key based collection may create huge amount of segments at the same time. + PartitionId: pendingSegment.GetPartitionID(), + SegmentId: pendingSegment.GetSegmentID(), + }}, + }).BuildMutable() + if err != nil { + return nil, errors.Wrapf(err, "failed to create new segment message, segmentID: %d", pendingSegment.GetSegmentID()) + } + // Send CreateSegmentMessage into wal. + msgID, err := m.wal.Get().Append(ctx, msg) + if err != nil { + return nil, errors.Wrapf(err, "failed to send create segment message into wal, segmentID: %d", pendingSegment.GetSegmentID()) + } // Getnerate growing segment limitation. limitation := policy.GetSegmentLimitationPolicy().GenerateLimitation() @@ -232,13 +262,14 @@ func (m *partitionSegmentManager) allocNewGrowingSegment(ctx context.Context) (* // Commit it into streaming node meta. // growing segment can be assigned now. tx := pendingSegment.BeginModification() - tx.IntoGrowing(&limitation) + tx.IntoGrowing(&limitation, msgID.TimeTick) if err := tx.Commit(ctx); err != nil { return nil, errors.Wrapf(err, "failed to commit modification of segment assignment into growing, segmentID: %d", pendingSegment.GetSegmentID()) } - m.logger.Info( - "generate new growing segment", + m.logger.Info("generate new growing segment", zap.Int64("segmentID", pendingSegment.GetSegmentID()), + zap.String("messageID", msgID.MessageID.String()), + zap.Uint64("timetick", msgID.TimeTick), zap.String("limitationPolicy", limitation.PolicyName), zap.Uint64("segmentBinarySize", limitation.SegmentSize), zap.Any("extraInfo", limitation.ExtraInfo), @@ -280,21 +311,34 @@ func (m *partitionSegmentManager) createNewPendingSegment(ctx context.Context) ( // assignSegment assigns a segment for a assign segment request and return should trigger a seal operation. func (m *partitionSegmentManager) assignSegment(ctx context.Context, req *AssignSegmentRequest) (*AssignSegmentResult, error) { - // Alloc segment for insert at previous segments. + hitTimeTickTooOld := false + // Alloc segment for insert at allocated segments. for _, segment := range m.segments { - inserted, ack := segment.AllocRows(ctx, req) - if inserted { - return &AssignSegmentResult{SegmentID: segment.GetSegmentID(), Acknowledge: ack}, nil + result, err := segment.AllocRows(ctx, req) + if err == nil { + return result, nil + } + if errors.IsAny(err, ErrTooLargeInsert) { + // Return error directly. + // If the insert message is too large to hold by single segment, it can not be inserted anymore. + return nil, err + } + if errors.Is(err, ErrTimeTickTooOld) { + hitTimeTickTooOld = true } } + // If the timetick is too old for existing segment, it can not be inserted even allocate new growing segment, + // (new growing segment's timetick is always greater than the old gorwing segmet's timetick). + // Return directly to avoid unnecessary growing segment allocation. + if hitTimeTickTooOld { + return nil, ErrTimeTickTooOld + } + // If not inserted, ask a new growing segment to insert. newGrowingSegment, err := m.allocNewGrowingSegment(ctx) if err != nil { return nil, err } - if inserted, ack := newGrowingSegment.AllocRows(ctx, req); inserted { - return &AssignSegmentResult{SegmentID: newGrowingSegment.GetSegmentID(), Acknowledge: ack}, nil - } - return nil, status.NewUnrecoverableError("too large insert message, cannot hold in empty growing segment, stats: %+v", req.InsertMetrics) + return newGrowingSegment.AllocRows(ctx, req) } diff --git a/internal/streamingnode/server/wal/interceptors/segment/manager/partition_managers.go b/internal/streamingnode/server/wal/interceptors/segment/manager/partition_managers.go index 00f171ccffa19..c4269cc8636c7 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/manager/partition_managers.go +++ b/internal/streamingnode/server/wal/interceptors/segment/manager/partition_managers.go @@ -7,17 +7,20 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/policy" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil" "github.com/milvus-io/milvus/internal/util/streamingutil/status" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/syncutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) // buildNewPartitionManagers builds new partition managers. func buildNewPartitionManagers( + wal *syncutil.Future[wal.WAL], pchannel types.PChannelInfo, rawMetas []*streamingpb.SegmentAssignmentMeta, collectionInfos []*rootcoordpb.CollectionInfoOnPChannel, @@ -62,6 +65,7 @@ func buildNewPartitionManagers( } // otherwise, just create a new manager. _, ok := managers.GetOrInsert(partition.GetPartitionId(), newPartitionSegmentManager( + wal, pchannel, collectionInfo.GetVchannel(), collectionID, @@ -77,6 +81,7 @@ func buildNewPartitionManagers( m := &partitionSegmentManagers{ mu: sync.Mutex{}, logger: log.With(zap.Any("pchannel", pchannel)), + wal: wal, pchannel: pchannel, managers: managers, collectionInfos: collectionInfoMap, @@ -91,6 +96,7 @@ type partitionSegmentManagers struct { mu sync.Mutex logger *log.MLogger + wal *syncutil.Future[wal.WAL] pchannel types.PChannelInfo managers *typeutil.ConcurrentMap[int64, *partitionSegmentManager] // map partitionID to partition manager collectionInfos map[int64]*rootcoordpb.CollectionInfoOnPChannel // map collectionID to collectionInfo @@ -112,6 +118,7 @@ func (m *partitionSegmentManagers) NewCollection(collectionID int64, vchannel st m.collectionInfos[collectionID] = newCollectionInfo(collectionID, vchannel, partitionID) for _, partitionID := range partitionID { if _, loaded := m.managers.GetOrInsert(partitionID, newPartitionSegmentManager( + m.wal, m.pchannel, vchannel, collectionID, @@ -149,6 +156,7 @@ func (m *partitionSegmentManagers) NewPartition(collectionID int64, partitionID }) if _, loaded := m.managers.GetOrInsert(partitionID, newPartitionSegmentManager( + m.wal, m.pchannel, m.collectionInfos[collectionID].Vchannel, collectionID, @@ -255,8 +263,8 @@ func (m *partitionSegmentManagers) RemovePartition(collectionID int64, partition return segments } -// SealAllSegmentsAndFenceUntil seals all segments and fence assign until timetick. -func (m *partitionSegmentManagers) SealAllSegmentsAndFenceUntil(collectionID int64, timetick uint64) ([]*segmentAllocManager, error) { +// SealAndFenceSegmentUntil seal all segment that contains the message less than the incoming timetick. +func (m *partitionSegmentManagers) SealAndFenceSegmentUntil(collectionID int64, timetick uint64) ([]*segmentAllocManager, error) { m.mu.Lock() defer m.mu.Unlock() @@ -278,7 +286,7 @@ func (m *partitionSegmentManagers) SealAllSegmentsAndFenceUntil(collectionID int zap.Int64("partitionID", partition.PartitionId)) return nil, errors.New("partition not found") } - newSealedSegments := pm.SealAllSegmentsAndFenceUntil(timetick) + newSealedSegments := pm.SealAndFenceSegmentUntil(timetick) for _, segment := range newSealedSegments { segmentIDs = append(segmentIDs, segment.GetSegmentID()) } diff --git a/internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager.go b/internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager.go index d2df8ff35fc94..d34778b90048a 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager.go +++ b/internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager.go @@ -39,7 +39,7 @@ func RecoverPChannelSegmentAllocManager( return nil, errors.Wrap(err, "failed to get pchannel info from rootcoord") } metrics := metricsutil.NewSegmentAssignMetrics(pchannel.Name) - managers, waitForSealed := buildNewPartitionManagers(pchannel, rawMetas, resp.GetCollections(), metrics) + managers, waitForSealed := buildNewPartitionManagers(wal, pchannel, rawMetas, resp.GetCollections(), metrics) // PChannelSegmentAllocManager is the segment assign manager of determined pchannel. logger := log.With(zap.Any("pchannel", pchannel)) @@ -143,8 +143,8 @@ func (m *PChannelSegmentAllocManager) RemovePartition(ctx context.Context, colle return m.helper.WaitUntilNoWaitSeal(ctx) } -// SealAllSegmentsAndFenceUntil seals all segments and fence assign until timetick and return the segmentIDs. -func (m *PChannelSegmentAllocManager) SealAllSegmentsAndFenceUntil(ctx context.Context, collectionID int64, timetick uint64) ([]int64, error) { +// SealAndFenceSegmentUntil seal all segment that contains the message less than the incoming timetick. +func (m *PChannelSegmentAllocManager) SealAndFenceSegmentUntil(ctx context.Context, collectionID int64, timetick uint64) ([]int64, error) { if err := m.checkLifetime(); err != nil { return nil, err } @@ -152,7 +152,7 @@ func (m *PChannelSegmentAllocManager) SealAllSegmentsAndFenceUntil(ctx context.C // All message's timetick less than incoming timetick is all belong to the output sealed segment. // So the output sealed segment transfer into flush == all message's timetick less than incoming timetick are flushed. - sealedSegments, err := m.managers.SealAllSegmentsAndFenceUntil(collectionID, timetick) + sealedSegments, err := m.managers.SealAndFenceSegmentUntil(collectionID, timetick) if err != nil { return nil, err } diff --git a/internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager_test.go b/internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager_test.go index 4bd6ad48b7ec9..33597cce87c25 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager_test.go +++ b/internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager_test.go @@ -22,6 +22,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/txn" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/syncutil" @@ -32,7 +33,10 @@ func TestSegmentAllocManager(t *testing.T) { initializeTestState(t) w := mock_wal.NewMockWAL(t) - w.EXPECT().Append(mock.Anything, mock.Anything).Return(nil, nil) + w.EXPECT().Append(mock.Anything, mock.Anything).Return(&wal.AppendResult{ + MessageID: rmq.NewRmqID(1), + TimeTick: 2, + }, nil) f := syncutil.NewFuture[wal.WAL]() f.Set(w) @@ -42,8 +46,21 @@ func TestSegmentAllocManager(t *testing.T) { ctx := context.Background() - // Ask for allocate segment + // Ask for a too old timetick. result, err := m.AssignSegment(ctx, &AssignSegmentRequest{ + CollectionID: 1, + PartitionID: 1, + InsertMetrics: stats.InsertMetrics{ + Rows: 100, + BinarySize: 100, + }, + TimeTick: 1, + }) + assert.Nil(t, result) + assert.ErrorIs(t, err, ErrTimeTickTooOld) + + // Ask for allocate segment + result, err = m.AssignSegment(ctx, &AssignSegmentRequest{ CollectionID: 1, PartitionID: 1, InsertMetrics: stats.InsertMetrics{ @@ -163,7 +180,7 @@ func TestSegmentAllocManager(t *testing.T) { ts := tsoutil.GetCurrentTime() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() - ids, err := m.SealAllSegmentsAndFenceUntil(ctx, 1, ts) + ids, err := m.SealAndFenceSegmentUntil(ctx, 1, ts) assert.Error(t, err) assert.ErrorIs(t, err, context.DeadlineExceeded) assert.Empty(t, ids) @@ -190,14 +207,26 @@ func TestCreateAndDropCollection(t *testing.T) { initializeTestState(t) w := mock_wal.NewMockWAL(t) - w.EXPECT().Append(mock.Anything, mock.Anything).Return(nil, nil) + w.EXPECT().Append(mock.Anything, mock.Anything).Return(&wal.AppendResult{ + MessageID: rmq.NewRmqID(1), + TimeTick: 1, + }, nil) f := syncutil.NewFuture[wal.WAL]() f.Set(w) m, err := RecoverPChannelSegmentAllocManager(context.Background(), types.PChannelInfo{Name: "v1"}, f) assert.NoError(t, err) assert.NotNil(t, m) - inspector.GetSegmentSealedInspector().RegsiterPChannelManager(m) + + m.MustSealSegments(context.Background(), stats.SegmentBelongs{ + PChannel: "v1", + VChannel: "v1", + CollectionID: 1, + PartitionID: 2, + SegmentID: 4000, + }) + + inspector.GetSegmentSealedInspector().RegisterPChannelManager(m) ctx := context.Background() diff --git a/internal/streamingnode/server/wal/interceptors/segment/manager/segment_manager.go b/internal/streamingnode/server/wal/interceptors/segment/manager/segment_manager.go index a50e83903cde0..796dbe8034101 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/manager/segment_manager.go +++ b/internal/streamingnode/server/wal/interceptors/segment/manager/segment_manager.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/cockroachdb/errors" "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -19,6 +20,13 @@ import ( const dirtyThreshold = 30 * 1024 * 1024 // 30MB +var ( + ErrSegmentNotGrowing = errors.New("segment is not growing") + ErrTimeTickTooOld = errors.New("time tick is too old") + ErrNotEnoughSpace = stats.ErrNotEnoughSpace + ErrTooLargeInsert = stats.ErrTooLargeInsert +) + // newSegmentAllocManagerFromProto creates a new segment assignment meta from proto. func newSegmentAllocManagerFromProto( pchannel types.PChannelInfo, @@ -161,14 +169,20 @@ func (s *segmentAllocManager) TxnSem() int32 { // AllocRows ask for rows from current segment. // Only growing and not fenced segment can alloc rows. -func (s *segmentAllocManager) AllocRows(ctx context.Context, req *AssignSegmentRequest) (bool, *atomic.Int32) { +func (s *segmentAllocManager) AllocRows(ctx context.Context, req *AssignSegmentRequest) (*AssignSegmentResult, error) { // if the segment is not growing or reach limit, return false directly. if s.inner.GetState() != streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_GROWING { - return false, nil + return nil, ErrSegmentNotGrowing } - inserted := resource.Resource().SegmentAssignStatsManager().AllocRows(s.GetSegmentID(), req.InsertMetrics) - if !inserted { - return false, nil + if req.TimeTick <= s.inner.GetStat().CreateSegmentTimeTick { + // The incoming insert request's timetick is less than the segment's create time tick, + // return ErrTimeTickTooOld and reallocate new timetick. + return nil, ErrTimeTickTooOld + } + + err := resource.Resource().SegmentAssignStatsManager().AllocRows(s.GetSegmentID(), req.InsertMetrics) + if err != nil { + return nil, err } s.dirtyBytes += req.InsertMetrics.BinarySize s.ackSem.Inc() @@ -181,7 +195,10 @@ func (s *segmentAllocManager) AllocRows(ctx context.Context, req *AssignSegmentR // persist stats if too dirty. s.persistStatsIfTooDirty(ctx) - return inserted, s.ackSem + return &AssignSegmentResult{ + SegmentID: s.GetSegmentID(), + Acknowledge: s.ackSem, + }, nil } // Snapshot returns the snapshot of the segment assignment meta. @@ -237,7 +254,7 @@ func (m *mutableSegmentAssignmentMeta) IntoPending() { } // IntoGrowing transfers the segment assignment meta into growing state. -func (m *mutableSegmentAssignmentMeta) IntoGrowing(limitation *policy.SegmentLimitation) { +func (m *mutableSegmentAssignmentMeta) IntoGrowing(limitation *policy.SegmentLimitation, createSegmentTimeTick uint64) { if m.modifiedCopy.State != streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_PENDING { panic("tranfer state to growing from non-pending state") } @@ -247,6 +264,7 @@ func (m *mutableSegmentAssignmentMeta) IntoGrowing(limitation *policy.SegmentLim MaxBinarySize: limitation.SegmentSize, CreateTimestampNanoseconds: now, LastModifiedTimestampNanoseconds: now, + CreateSegmentTimeTick: createSegmentTimeTick, } } diff --git a/internal/streamingnode/server/wal/interceptors/segment/segment_assign_interceptor.go b/internal/streamingnode/server/wal/interceptors/segment/segment_assign_interceptor.go index bb2c8152086ba..6aac3370d2b41 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/segment_assign_interceptor.go +++ b/internal/streamingnode/server/wal/interceptors/segment/segment_assign_interceptor.go @@ -2,12 +2,13 @@ package segment import ( "context" + "errors" "time" "go.uber.org/zap" - "google.golang.org/protobuf/types/known/anypb" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/redo" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/manager" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats" @@ -15,6 +16,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility" "github.com/milvus-io/milvus/internal/util/streamingutil/status" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/streaming/proto/messagespb" "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/util/syncutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -134,7 +136,7 @@ func (impl *segmentInterceptor) handleInsertMessage(ctx context.Context, msg mes return nil, err } // Assign segment for insert message. - // Current implementation a insert message only has one parition, but we need to merge the message for partition-key in future. + // !!! Current implementation a insert message only has one parition, but we need to merge the message for partition-key in future. header := insertMsg.Header() for _, partition := range header.GetPartitions() { result, err := impl.assignManager.Get().AssignSegment(ctx, &manager.AssignSegmentRequest{ @@ -147,6 +149,15 @@ func (impl *segmentInterceptor) handleInsertMessage(ctx context.Context, msg mes TimeTick: msg.TimeTick(), TxnSession: txn.GetTxnSessionFromContext(ctx), }) + if errors.Is(err, manager.ErrTimeTickTooOld) { + // If current time tick of insert message is too old to alloc segment, + // we just redo it to refresh a new latest timetick. + return nil, redo.ErrRedo + } + if errors.Is(err, manager.ErrTooLargeInsert) { + // Message is too large, so retry operation is unrecoverable, can't be retry at client side. + return nil, status.NewUnrecoverableError("insert too large, binary size: %d", msg.EstimateSize()) + } if err != nil { return nil, err } @@ -173,17 +184,24 @@ func (impl *segmentInterceptor) handleManualFlushMessage(ctx context.Context, ms return nil, err } header := maunalFlushMsg.Header() - segmentIDs, err := impl.assignManager.Get().SealAllSegmentsAndFenceUntil(ctx, header.GetCollectionId(), header.GetFlushTs()) + segmentIDs, err := impl.assignManager.Get().SealAndFenceSegmentUntil(ctx, header.GetCollectionId(), header.GetFlushTs()) if err != nil { return nil, status.NewInner("segment seal failure with error: %s", err.Error()) } - - // create extra response for manual flush message. - extraResponse, err := anypb.New(&message.ManualFlushExtraResponse{ - SegmentIds: segmentIDs, + // Modify the extra response for manual flush message. + utility.ModifyAppendResultExtra(ctx, func(old *message.ManualFlushExtraResponse) *message.ManualFlushExtraResponse { + if old == nil { + return &messagespb.ManualFlushExtraResponse{SegmentIds: segmentIDs} + } + return &messagespb.ManualFlushExtraResponse{SegmentIds: append(old.GetSegmentIds(), segmentIDs...)} }) - if err != nil { - return nil, status.NewInner("create extra response failed with error: %s", err.Error()) + if len(segmentIDs) > 0 { + // There's some new segment sealed, we need to retry the manual flush operation refresh the context. + // If we don't refresh the context, the sequence of message in wal will be: + // FlushTsHere -> ManualFlush -> FlushSegment1 -> FlushSegment2 -> FlushSegment3. + // After refresh the context, keep the sequence of the message in the wal with following seq: + // FlushTsHere -> FlushSegment1 -> FlushSegment2 -> FlushSegment3 -> ManualFlush. + return nil, redo.ErrRedo } // send the manual flush message. @@ -192,7 +210,6 @@ func (impl *segmentInterceptor) handleManualFlushMessage(ctx context.Context, ms return nil, err } - utility.AttachAppendResultExtra(ctx, extraResponse) return msgID, nil } @@ -234,7 +251,7 @@ func (impl *segmentInterceptor) recoverPChannelManager(param interceptors.Interc } // register the manager into inspector, to do the seal asynchronously - inspector.GetSegmentSealedInspector().RegsiterPChannelManager(pm) + inspector.GetSegmentSealedInspector().RegisterPChannelManager(pm) impl.assignManager.Set(pm) impl.logger.Info("recover PChannel Assignment Manager success") return diff --git a/internal/streamingnode/server/wal/interceptors/segment/stats/stats.go b/internal/streamingnode/server/wal/interceptors/segment/stats/stats.go index 2d9e8007c1ad5..11a57fc1c0868 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/stats/stats.go +++ b/internal/streamingnode/server/wal/interceptors/segment/stats/stats.go @@ -60,7 +60,10 @@ type SyncOperationMetrics struct { // Return true if the segment is assigned. func (s *SegmentStats) AllocRows(m InsertMetrics) bool { if m.BinarySize > s.BinaryCanBeAssign() { - s.ReachLimit = true + if s.Insert.BinarySize > 0 { + // if the binary size is not empty, it means the segment cannot hold more data, mark it as reach limit. + s.ReachLimit = true + } return false } @@ -74,6 +77,16 @@ func (s *SegmentStats) BinaryCanBeAssign() uint64 { return s.MaxBinarySize - s.Insert.BinarySize } +// ShouldBeSealed returns if the segment should be sealed. +func (s *SegmentStats) ShouldBeSealed() bool { + return s.ReachLimit +} + +// IsEmpty returns if the segment is empty. +func (s *SegmentStats) IsEmpty() bool { + return s.Insert.Rows == 0 +} + // UpdateOnSync updates the stats of segment on sync. func (s *SegmentStats) UpdateOnSync(f SyncOperationMetrics) { s.BinLogCounter += f.BinLogCounterIncr diff --git a/internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager.go b/internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager.go index 84241147b7e4f..43676953bc873 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager.go +++ b/internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager.go @@ -4,12 +4,18 @@ import ( "fmt" "sync" + "github.com/cockroachdb/errors" "github.com/pingcap/log" "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/util/paramtable" ) +var ( + ErrNotEnoughSpace = errors.New("not enough space") + ErrTooLargeInsert = errors.New("insert too large") +) + // StatsManager is the manager of stats. // It manages the insert stats of all segments, used to check if a segment has enough space to insert or should be sealed. // If there will be a lock contention, we can optimize it by apply lock per segment. @@ -69,7 +75,7 @@ func (m *StatsManager) RegisterNewGrowingSegment(belongs SegmentBelongs, segment } // AllocRows alloc number of rows on current segment. -func (m *StatsManager) AllocRows(segmentID int64, insert InsertMetrics) bool { +func (m *StatsManager) AllocRows(segmentID int64, insert InsertMetrics) error { m.mu.Lock() defer m.mu.Unlock() @@ -78,7 +84,8 @@ func (m *StatsManager) AllocRows(segmentID int64, insert InsertMetrics) bool { if !ok { panic(fmt.Sprintf("alloc rows on a segment %d that not exist", segmentID)) } - inserted := m.segmentStats[segmentID].AllocRows(insert) + stat := m.segmentStats[segmentID] + inserted := stat.AllocRows(insert) // update the total stats if inserted. if inserted { @@ -91,12 +98,17 @@ func (m *StatsManager) AllocRows(segmentID int64, insert InsertMetrics) bool { m.vchannelStats[info.VChannel] = &InsertMetrics{} } m.vchannelStats[info.VChannel].Collect(insert) - return true + return nil } - // If not inserted, current segment can not hold the message, notify seal manager to do seal the segment. - m.sealNotifier.AddAndNotify(info) - return false + if stat.ShouldBeSealed() { + // notify seal manager to do seal the segment if stat reach the limit. + m.sealNotifier.AddAndNotify(info) + } + if stat.IsEmpty() { + return ErrTooLargeInsert + } + return ErrNotEnoughSpace } // SealNotifier returns the seal notifier. diff --git a/internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager_test.go b/internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager_test.go index 427244db06ba5..efee056b66448 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager_test.go +++ b/internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager_test.go @@ -34,23 +34,32 @@ func TestStatsManager(t *testing.T) { assert.Len(t, m.vchannelStats, 3) assert.Len(t, m.pchannelStats, 2) + m.RegisterNewGrowingSegment(SegmentBelongs{PChannel: "pchannel2", VChannel: "vchannel3", CollectionID: 2, PartitionID: 5, SegmentID: 7}, 7, createSegmentStats(0, 0, 300)) + assert.Len(t, m.segmentStats, 5) + assert.Len(t, m.segmentIndex, 5) + assert.Len(t, m.vchannelStats, 3) + assert.Len(t, m.pchannelStats, 2) + assert.Panics(t, func() { m.RegisterNewGrowingSegment(SegmentBelongs{PChannel: "pchannel", VChannel: "vchannel", CollectionID: 1, PartitionID: 2, SegmentID: 3}, 3, createSegmentStats(100, 100, 300)) }) shouldBlock(t, m.SealNotifier().WaitChan()) - m.AllocRows(3, InsertMetrics{Rows: 50, BinarySize: 50}) + err := m.AllocRows(3, InsertMetrics{Rows: 50, BinarySize: 50}) + assert.NoError(t, err) stat := m.GetStatsOfSegment(3) assert.Equal(t, uint64(150), stat.Insert.BinarySize) shouldBlock(t, m.SealNotifier().WaitChan()) - m.AllocRows(5, InsertMetrics{Rows: 250, BinarySize: 250}) + err = m.AllocRows(5, InsertMetrics{Rows: 250, BinarySize: 250}) + assert.ErrorIs(t, err, ErrNotEnoughSpace) <-m.SealNotifier().WaitChan() infos := m.SealNotifier().Get() assert.Len(t, infos, 1) - m.AllocRows(6, InsertMetrics{Rows: 150, BinarySize: 150}) + err = m.AllocRows(6, InsertMetrics{Rows: 150, BinarySize: 150}) + assert.NoError(t, err) shouldBlock(t, m.SealNotifier().WaitChan()) assert.Equal(t, uint64(250), m.vchannelStats["vchannel3"].BinarySize) @@ -67,17 +76,25 @@ func TestStatsManager(t *testing.T) { m.UpdateOnSync(1000, SyncOperationMetrics{BinLogCounterIncr: 100}) shouldBlock(t, m.SealNotifier().WaitChan()) - m.AllocRows(3, InsertMetrics{Rows: 400, BinarySize: 400}) - m.AllocRows(5, InsertMetrics{Rows: 250, BinarySize: 250}) - m.AllocRows(6, InsertMetrics{Rows: 400, BinarySize: 400}) + err = m.AllocRows(3, InsertMetrics{Rows: 400, BinarySize: 400}) + assert.ErrorIs(t, err, ErrNotEnoughSpace) + err = m.AllocRows(5, InsertMetrics{Rows: 250, BinarySize: 250}) + assert.ErrorIs(t, err, ErrNotEnoughSpace) + err = m.AllocRows(6, InsertMetrics{Rows: 400, BinarySize: 400}) + assert.ErrorIs(t, err, ErrNotEnoughSpace) <-m.SealNotifier().WaitChan() infos = m.SealNotifier().Get() assert.Len(t, infos, 3) + err = m.AllocRows(7, InsertMetrics{Rows: 400, BinarySize: 400}) + assert.ErrorIs(t, err, ErrTooLargeInsert) + shouldBlock(t, m.SealNotifier().WaitChan()) + m.UnregisterSealedSegment(3) m.UnregisterSealedSegment(4) m.UnregisterSealedSegment(5) m.UnregisterSealedSegment(6) + m.UnregisterSealedSegment(7) assert.Empty(t, m.segmentStats) assert.Empty(t, m.vchannelStats) assert.Empty(t, m.pchannelStats) diff --git a/internal/streamingnode/server/wal/interceptors/segment/stats/stats_test.go b/internal/streamingnode/server/wal/interceptors/segment/stats/stats_test.go index 7f294f6560411..732f8ac1ad6c9 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/stats/stats_test.go +++ b/internal/streamingnode/server/wal/interceptors/segment/stats/stats_test.go @@ -59,6 +59,8 @@ func TestSegmentStats(t *testing.T) { assert.Equal(t, stat.Insert.Rows, uint64(160)) assert.Equal(t, stat.Insert.BinarySize, uint64(320)) assert.True(t, time.Now().After(now)) + assert.False(t, stat.IsEmpty()) + assert.False(t, stat.ShouldBeSealed()) insert1 = InsertMetrics{ Rows: 100, @@ -68,6 +70,8 @@ func TestSegmentStats(t *testing.T) { assert.False(t, inserted) assert.Equal(t, stat.Insert.Rows, uint64(160)) assert.Equal(t, stat.Insert.BinarySize, uint64(320)) + assert.False(t, stat.IsEmpty()) + assert.True(t, stat.ShouldBeSealed()) stat.UpdateOnSync(SyncOperationMetrics{ BinLogCounterIncr: 4, @@ -76,3 +80,20 @@ func TestSegmentStats(t *testing.T) { assert.Equal(t, uint64(7), stat.BinLogCounter) assert.Equal(t, uint64(13), stat.BinLogFileCounter) } + +func TestOversizeAlloc(t *testing.T) { + now := time.Now() + stat := &SegmentStats{ + Insert: InsertMetrics{}, + MaxBinarySize: 400, + CreateTime: now, + LastModifiedTime: now, + } + // Try to alloc a oversized insert metrics. + inserted := stat.AllocRows(InsertMetrics{ + BinarySize: 401, + }) + assert.False(t, inserted) + assert.True(t, stat.IsEmpty()) + assert.False(t, stat.ShouldBeSealed()) +} diff --git a/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go b/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go index e42574117bbfa..15fcf5af44b77 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go @@ -202,7 +202,7 @@ func (impl *timeTickAppendInterceptor) appendMsg( return nil, err } - utility.AttachAppendResultTimeTick(ctx, msg.TimeTick()) - utility.AttachAppendResultTxnContext(ctx, msg.TxnContext()) + utility.ReplaceAppendResultTimeTick(ctx, msg.TimeTick()) + utility.ReplaceAppendResultTxnContext(ctx, msg.TxnContext()) return msgID, nil } diff --git a/internal/streamingnode/server/wal/utility/context.go b/internal/streamingnode/server/wal/utility/context.go index 8b9453e36653e..ee6b3fd3fd04f 100644 --- a/internal/streamingnode/server/wal/utility/context.go +++ b/internal/streamingnode/server/wal/utility/context.go @@ -2,8 +2,9 @@ package utility import ( "context" + "reflect" - "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/reflect/protoreflect" "github.com/milvus-io/milvus/pkg/streaming/util/message" ) @@ -20,7 +21,7 @@ var ( type ExtraAppendResult struct { TimeTick uint64 TxnCtx *message.TxnContext - Extra *anypb.Any + Extra protoreflect.ProtoMessage } // NotPersistedHint is the hint of not persisted message. @@ -47,20 +48,29 @@ func WithExtraAppendResult(ctx context.Context, r *ExtraAppendResult) context.Co return context.WithValue(ctx, extraAppendResultValue, r) } -// AttachAppendResultExtra set extra to context -func AttachAppendResultExtra(ctx context.Context, extra *anypb.Any) { +// ModifyAppendResultExtra modify extra in context +func ModifyAppendResultExtra[M protoreflect.ProtoMessage](ctx context.Context, modifier func(old M) (new M)) { result := ctx.Value(extraAppendResultValue) - result.(*ExtraAppendResult).Extra = extra + var old M + if result.(*ExtraAppendResult).Extra != nil { + old = result.(*ExtraAppendResult).Extra.(M) + } + new := modifier(old) + if reflect.ValueOf(new).IsNil() { + result.(*ExtraAppendResult).Extra = nil + return + } + result.(*ExtraAppendResult).Extra = new } -// AttachAppendResultTimeTick set time tick to context -func AttachAppendResultTimeTick(ctx context.Context, timeTick uint64) { +// ReplaceAppendResultTimeTick set time tick to context +func ReplaceAppendResultTimeTick(ctx context.Context, timeTick uint64) { result := ctx.Value(extraAppendResultValue) result.(*ExtraAppendResult).TimeTick = timeTick } -// AttachAppendResultTxnContext set txn context to context -func AttachAppendResultTxnContext(ctx context.Context, txnCtx *message.TxnContext) { +// ReplaceAppendResultTxnContext set txn context to context +func ReplaceAppendResultTxnContext(ctx context.Context, txnCtx *message.TxnContext) { result := ctx.Value(extraAppendResultValue) result.(*ExtraAppendResult).TxnCtx = txnCtx } diff --git a/pkg/go.mod b/pkg/go.mod index c8b02ff79019d..3cd31c58c59ab 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -14,7 +14,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.17.7 - github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd + github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104061416-ec2484585833 github.com/nats-io/nats-server/v2 v2.10.12 github.com/nats-io/nats.go v1.34.1 github.com/panjf2000/ants/v2 v2.7.2 @@ -26,8 +26,6 @@ require ( github.com/shirou/gopsutil/v3 v3.22.9 github.com/sirupsen/logrus v1.9.0 github.com/spaolacci/murmur3 v1.1.0 - github.com/spf13/cast v1.3.1 - github.com/spf13/viper v1.8.1 github.com/streamnative/pulsarctl v0.5.0 github.com/stretchr/testify v1.9.0 github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c @@ -57,6 +55,7 @@ require ( google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.34.2 gopkg.in/natefinch/lumberjack.v2 v2.0.0 + gopkg.in/yaml.v3 v3.0.1 k8s.io/apimachinery v0.28.6 ) @@ -86,7 +85,6 @@ require ( github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect - github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/getsentry/sentry-go v0.12.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -99,7 +97,6 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.1.2 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect github.com/gorilla/websocket v1.4.2 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect @@ -107,19 +104,16 @@ require ( github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect - github.com/hashicorp/hcl v1.0.0 // indirect github.com/ianlancetaylor/cgosymbolizer v0.0.0-20221217025313-27d3c9f66b6a // indirect github.com/jonboulle/clockwork v0.2.2 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/linkedin/goavro/v2 v2.11.1 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect - github.com/magiconair/properties v1.8.5 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/mattn/go-runewidth v0.0.8 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/minio/highwayhash v1.0.2 // indirect - github.com/mitchellh/mapstructure v1.4.1 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mtibben/percent v0.2.1 // indirect @@ -128,7 +122,6 @@ require ( github.com/nats-io/nuid v1.0.1 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect - github.com/pelletier/go-toml v1.9.3 // indirect github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect github.com/pierrec/lz4 v2.5.2+incompatible // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect @@ -144,14 +137,10 @@ require ( github.com/prometheus/procfs v0.9.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect - github.com/smartystreets/assertions v1.1.0 // indirect github.com/soheilhy/cmux v0.1.5 // indirect - github.com/spf13/afero v1.6.0 // indirect - github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stathat/consistent v1.0.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - github.com/subosito/gotenv v1.2.0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect @@ -180,9 +169,7 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/ini.v1 v1.62.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) diff --git a/pkg/go.sum b/pkg/go.sum index 0ce2c42e4740c..b325fb6b86d79 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -335,8 +335,6 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= -github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 h1:l5lAOZEym3oK3SQ2HBHWsJUfbNBiTXJDeW2QDxw9AQ0= -github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= @@ -374,7 +372,6 @@ github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09 github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= -github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= @@ -417,7 +414,6 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= -github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/juju/qthttptest v0.1.1/go.mod h1:aTlAv8TYaflIiTDIQYzxnl1QdPjAg8Q8qJMErpKy6A4= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= @@ -439,7 +435,6 @@ github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ib github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -493,8 +488,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd h1:x0b0+foTe23sKcVFseR1DE8+BB08EH6ViiRHaz8PEik= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104061416-ec2484585833 h1:Pkz4jXqYQVY8nbZkY0+G4K5toZJaT4LxUrrYwGxwi3I= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104061416-ec2484585833/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= @@ -507,8 +502,6 @@ github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4 github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= -github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag= -github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -557,8 +550,6 @@ github.com/panjf2000/ants/v2 v2.7.2 h1:2NUt9BaZFO5kQzrieOmK/wdb/tQ/K+QHaxN8sOgD6 github.com/panjf2000/ants/v2 v2.7.2/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= -github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5dSQ= -github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ= github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI= @@ -581,7 +572,6 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= @@ -657,9 +647,6 @@ github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= -github.com/smartystreets/assertions v1.1.0 h1:MkTeG1DMwsrdH7QtLXy5W+fUxWq+vmb6cLmyJ7aRtF0= -github.com/smartystreets/assertions v1.1.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= -github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= @@ -668,25 +655,17 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= -github.com/spf13/afero v1.6.0 h1:xoax2sJ2DT8S8xA2paPFjDCScCNeWsg75VG0DLRreiY= -github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= -github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng= -github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= github.com/spf13/cobra v1.1.3/go.mod h1:pGADOWyqRD/YMrPZigI/zbliZ2wVD/23d+is3pSWzOo= github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA= github.com/spf13/cobra v1.6.1/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUqzrY= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= -github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= -github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= -github.com/spf13/viper v1.8.1 h1:Kq1fyeebqsBfbjZj4EL7gj2IO0mMaiyjYUWcUsl2O44= -github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns= github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U= github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -708,7 +687,6 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M= github.com/thoas/go-funk v0.9.1/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= @@ -853,7 +831,6 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -1002,7 +979,6 @@ golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1283,8 +1259,6 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.51.1/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= -gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU= -gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= diff --git a/pkg/streaming/proto/messages.proto b/pkg/streaming/proto/messages.proto index 896435dfc1d59..62b84f98a2c80 100644 --- a/pkg/streaming/proto/messages.proto +++ b/pkg/streaming/proto/messages.proto @@ -34,6 +34,7 @@ enum MessageType { CreatePartition = 7; DropPartition = 8; ManualFlush = 9; + CreateSegment = 10; // begin transaction message is only used for transaction, once a begin // transaction message is received, all messages combined with the // transaction message cannot be consumed until a CommitTxn message @@ -80,6 +81,18 @@ message FlushMessageBody { // ManualFlushMessageBody is the body of manual flush message. message ManualFlushMessageBody {} +// CreateSegmentMessageBody is the body of create segment message. +message CreateSegmentMessageBody { + int64 collection_id = 1; + repeated CreateSegmentInfo segments = 2; +} + +// CreateSegmentInfo is the info of create segment. +message CreateSegmentInfo { + int64 partition_id = 1; + int64 segment_id = 2; +} + // BeginTxnMessageBody is the body of begin transaction message. // Just do nothing now. message BeginTxnMessageBody {} @@ -138,6 +151,9 @@ message DeleteMessageHeader { // FlushMessageHeader just nothing. message FlushMessageHeader {} +// CreateSegmentMessageHeader just nothing. +message CreateSegmentMessageHeader {} + message ManualFlushMessageHeader { int64 collection_id = 1; uint64 flush_ts = 2; diff --git a/pkg/streaming/proto/streaming.proto b/pkg/streaming/proto/streaming.proto index 7b623718a6943..2a6eb496c5d7a 100644 --- a/pkg/streaming/proto/streaming.proto +++ b/pkg/streaming/proto/streaming.proto @@ -432,4 +432,5 @@ message SegmentAssignmentStat { int64 create_timestamp_nanoseconds = 4; int64 last_modified_timestamp_nanoseconds = 5; uint64 binlog_counter = 6; + uint64 create_segment_time_tick = 7; // The timetick of create segment message in wal. } diff --git a/pkg/streaming/util/message/adaptor/message.go b/pkg/streaming/util/message/adaptor/message.go index f712364f10a4e..9c3335d952590 100644 --- a/pkg/streaming/util/message/adaptor/message.go +++ b/pkg/streaming/util/message/adaptor/message.go @@ -139,6 +139,8 @@ func fromMessageToTsMsgV2(msg message.ImmutableMessage) (msgstream.TsMsg, error) tsMsg, err = NewFlushMessageBody(msg) case message.MessageTypeManualFlush: tsMsg, err = NewManualFlushMessageBody(msg) + case message.MessageTypeCreateSegment: + tsMsg, err = NewCreateSegmentMessageBody(msg) default: panic("unsupported message type") } diff --git a/pkg/streaming/util/message/adaptor/message_test.go b/pkg/streaming/util/message/adaptor/message_test.go index 76038ddd3f5a9..5196405d51b97 100644 --- a/pkg/streaming/util/message/adaptor/message_test.go +++ b/pkg/streaming/util/message/adaptor/message_test.go @@ -70,3 +70,21 @@ func TestNewMsgPackFromCreateCollectionMessage(t *testing.T) { assert.Equal(t, tt, pack.BeginTs) assert.Equal(t, tt, pack.EndTs) } + +func TestNewMsgPackFromCreateSegmentMessage(t *testing.T) { + id := rmq.NewRmqID(1) + + tt := uint64(time.Now().UnixNano()) + mutableMsg, err := message.NewCreateSegmentMessageBuilderV2(). + WithHeader(&message.CreateSegmentMessageHeader{}). + WithBody(&message.CreateSegmentMessageBody{}). + WithVChannel("v1"). + BuildMutable() + assert.NoError(t, err) + immutableCreateSegmentMsg := mutableMsg.WithTimeTick(tt).WithLastConfirmedUseMessageID().IntoImmutableMessage(id) + pack, err := NewMsgPackFromMessage(immutableCreateSegmentMsg) + assert.NoError(t, err) + assert.NotNil(t, pack) + assert.Equal(t, tt, pack.BeginTs) + assert.Equal(t, tt, pack.EndTs) +} diff --git a/pkg/streaming/util/message/adaptor/message_type.go b/pkg/streaming/util/message/adaptor/message_type.go index ea3ab24389658..8338ca78b296c 100644 --- a/pkg/streaming/util/message/adaptor/message_type.go +++ b/pkg/streaming/util/message/adaptor/message_type.go @@ -11,6 +11,7 @@ var messageTypeToCommonpbMsgType = map[message.MessageType]commonpb.MsgType{ message.MessageTypeDelete: commonpb.MsgType_Delete, message.MessageTypeFlush: commonpb.MsgType_FlushSegment, message.MessageTypeManualFlush: commonpb.MsgType_ManualFlush, + message.MessageTypeCreateSegment: commonpb.MsgType_CreateSegment, message.MessageTypeCreateCollection: commonpb.MsgType_CreateCollection, message.MessageTypeDropCollection: commonpb.MsgType_DropCollection, message.MessageTypeCreatePartition: commonpb.MsgType_CreatePartition, diff --git a/pkg/streaming/util/message/adaptor/ts_msg_newer.go b/pkg/streaming/util/message/adaptor/ts_msg_newer.go index 0fb558deb4b12..fe7aa12c79400 100644 --- a/pkg/streaming/util/message/adaptor/ts_msg_newer.go +++ b/pkg/streaming/util/message/adaptor/ts_msg_newer.go @@ -50,6 +50,30 @@ func (t *tsMsgImpl) SetTs(ts uint64) { t.ts = ts } +type CreateSegmentMessageBody struct { + *tsMsgImpl + CreateSegmentMessage message.ImmutableCreateSegmentMessageV2 +} + +func NewCreateSegmentMessageBody(msg message.ImmutableMessage) (msgstream.TsMsg, error) { + createMsg, err := message.AsImmutableCreateSegmentMessageV2(msg) + if err != nil { + return nil, err + } + return &CreateSegmentMessageBody{ + tsMsgImpl: &tsMsgImpl{ + BaseMsg: msgstream.BaseMsg{ + BeginTimestamp: msg.TimeTick(), + EndTimestamp: msg.TimeTick(), + }, + ts: msg.TimeTick(), + sz: msg.EstimateSize(), + msgType: MustGetCommonpbMsgTypeFromMessageType(msg.MessageType()), + }, + CreateSegmentMessage: createMsg, + }, nil +} + type FlushMessageBody struct { *tsMsgImpl FlushMessage message.ImmutableFlushMessageV2 diff --git a/pkg/streaming/util/message/builder.go b/pkg/streaming/util/message/builder.go index 45ed16b9fa326..32bdad9db6482 100644 --- a/pkg/streaming/util/message/builder.go +++ b/pkg/streaming/util/message/builder.go @@ -44,6 +44,7 @@ var ( NewDropCollectionMessageBuilderV1 = createNewMessageBuilderV1[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]() NewCreatePartitionMessageBuilderV1 = createNewMessageBuilderV1[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]() NewDropPartitionMessageBuilderV1 = createNewMessageBuilderV1[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]() + NewCreateSegmentMessageBuilderV2 = createNewMessageBuilderV2[*CreateSegmentMessageHeader, *CreateSegmentMessageBody]() NewFlushMessageBuilderV2 = createNewMessageBuilderV2[*FlushMessageHeader, *FlushMessageBody]() NewManualFlushMessageBuilderV2 = createNewMessageBuilderV2[*ManualFlushMessageHeader, *ManualFlushMessageBody]() NewBeginTxnMessageBuilderV2 = createNewMessageBuilderV2[*BeginTxnMessageHeader, *BeginTxnMessageBody]() diff --git a/pkg/streaming/util/message/message_impl.go b/pkg/streaming/util/message/message_impl.go index d151ea90b12b4..41e9ac0379af2 100644 --- a/pkg/streaming/util/message/message_impl.go +++ b/pkg/streaming/util/message/message_impl.go @@ -65,33 +65,20 @@ func (m *messageImpl) WithWALTerm(term int64) MutableMessage { // WithTimeTick sets the time tick of current message. func (m *messageImpl) WithTimeTick(tt uint64) MutableMessage { - if m.properties.Exist(messageTimeTick) { - panic("time tick already set in properties of message") - } m.properties.Set(messageTimeTick, EncodeUint64(tt)) return m } // WithLastConfirmed sets the last confirmed message id of current message. func (m *messageImpl) WithLastConfirmed(id MessageID) MutableMessage { - if m.properties.Exist(messageLastConfirmedIDSameWithMessageID) { - panic("last confirmed message already set in properties of message") - } - if m.properties.Exist(messageLastConfirmed) { - panic("last confirmed message already set in properties of message") - } + m.properties.Delete(messageLastConfirmedIDSameWithMessageID) m.properties.Set(messageLastConfirmed, id.Marshal()) return m } // WithLastConfirmedUseMessageID sets the last confirmed message id of current message to be the same as message id. func (m *messageImpl) WithLastConfirmedUseMessageID() MutableMessage { - if m.properties.Exist(messageLastConfirmedIDSameWithMessageID) { - panic("last confirmed message already set in properties of message") - } - if m.properties.Exist(messageLastConfirmed) { - panic("last confirmed message already set in properties of message") - } + m.properties.Delete(messageLastConfirmed) m.properties.Set(messageLastConfirmedIDSameWithMessageID, "") return m } diff --git a/pkg/streaming/util/message/message_type.go b/pkg/streaming/util/message/message_type.go index a2a2d3369bd45..b6c74d08e2193 100644 --- a/pkg/streaming/util/message/message_type.go +++ b/pkg/streaming/util/message/message_type.go @@ -13,6 +13,7 @@ const ( MessageTypeTimeTick MessageType = MessageType(messagespb.MessageType_TimeTick) MessageTypeInsert MessageType = MessageType(messagespb.MessageType_Insert) MessageTypeDelete MessageType = MessageType(messagespb.MessageType_Delete) + MessageTypeCreateSegment MessageType = MessageType(messagespb.MessageType_CreateSegment) MessageTypeFlush MessageType = MessageType(messagespb.MessageType_Flush) MessageTypeManualFlush MessageType = MessageType(messagespb.MessageType_ManualFlush) MessageTypeCreateCollection MessageType = MessageType(messagespb.MessageType_CreateCollection) @@ -31,6 +32,7 @@ var messageTypeName = map[MessageType]string{ MessageTypeInsert: "INSERT", MessageTypeDelete: "DELETE", MessageTypeFlush: "FLUSH", + MessageTypeCreateSegment: "CREATE_SEGMENT", MessageTypeManualFlush: "MANUAL_FLUSH", MessageTypeCreateCollection: "CREATE_COLLECTION", MessageTypeDropCollection: "DROP_COLLECTION", diff --git a/pkg/streaming/util/message/specialized_message.go b/pkg/streaming/util/message/specialized_message.go index 9ee1892ee2579..a9fec13f4597f 100644 --- a/pkg/streaming/util/message/specialized_message.go +++ b/pkg/streaming/util/message/specialized_message.go @@ -22,6 +22,7 @@ type ( CreatePartitionMessageHeader = messagespb.CreatePartitionMessageHeader DropPartitionMessageHeader = messagespb.DropPartitionMessageHeader FlushMessageHeader = messagespb.FlushMessageHeader + CreateSegmentMessageHeader = messagespb.CreateSegmentMessageHeader ManualFlushMessageHeader = messagespb.ManualFlushMessageHeader BeginTxnMessageHeader = messagespb.BeginTxnMessageHeader CommitTxnMessageHeader = messagespb.CommitTxnMessageHeader @@ -30,12 +31,13 @@ type ( ) type ( - FlushMessageBody = messagespb.FlushMessageBody - ManualFlushMessageBody = messagespb.ManualFlushMessageBody - BeginTxnMessageBody = messagespb.BeginTxnMessageBody - CommitTxnMessageBody = messagespb.CommitTxnMessageBody - RollbackTxnMessageBody = messagespb.RollbackTxnMessageBody - TxnMessageBody = messagespb.TxnMessageBody + FlushMessageBody = messagespb.FlushMessageBody + CreateSegmentMessageBody = messagespb.CreateSegmentMessageBody + ManualFlushMessageBody = messagespb.ManualFlushMessageBody + BeginTxnMessageBody = messagespb.BeginTxnMessageBody + CommitTxnMessageBody = messagespb.CommitTxnMessageBody + RollbackTxnMessageBody = messagespb.RollbackTxnMessageBody + TxnMessageBody = messagespb.TxnMessageBody ) type ( @@ -51,6 +53,7 @@ var messageTypeMap = map[reflect.Type]MessageType{ reflect.TypeOf(&DropCollectionMessageHeader{}): MessageTypeDropCollection, reflect.TypeOf(&CreatePartitionMessageHeader{}): MessageTypeCreatePartition, reflect.TypeOf(&DropPartitionMessageHeader{}): MessageTypeDropPartition, + reflect.TypeOf(&CreateSegmentMessageHeader{}): MessageTypeCreateSegment, reflect.TypeOf(&FlushMessageHeader{}): MessageTypeFlush, reflect.TypeOf(&ManualFlushMessageHeader{}): MessageTypeManualFlush, reflect.TypeOf(&BeginTxnMessageHeader{}): MessageTypeBeginTxn, @@ -77,6 +80,7 @@ type ( MutableDropCollectionMessageV1 = specializedMutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest] MutableCreatePartitionMessageV1 = specializedMutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest] MutableDropPartitionMessageV1 = specializedMutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest] + MutableCreateSegmentMessageV2 = specializedMutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] MutableFlushMessageV2 = specializedMutableMessage[*FlushMessageHeader, *FlushMessageBody] MutableBeginTxnMessageV2 = specializedMutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] MutableCommitTxnMessageV2 = specializedMutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] @@ -89,6 +93,7 @@ type ( ImmutableDropCollectionMessageV1 = specializedImmutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest] ImmutableCreatePartitionMessageV1 = specializedImmutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest] ImmutableDropPartitionMessageV1 = specializedImmutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest] + ImmutableCreateSegmentMessageV2 = specializedImmutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] ImmutableFlushMessageV2 = specializedImmutableMessage[*FlushMessageHeader, *FlushMessageBody] ImmutableManualFlushMessageV2 = specializedImmutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] ImmutableBeginTxnMessageV2 = specializedImmutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] @@ -105,6 +110,7 @@ var ( AsMutableDropCollectionMessageV1 = asSpecializedMutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest] AsMutableCreatePartitionMessageV1 = asSpecializedMutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest] AsMutableDropPartitionMessageV1 = asSpecializedMutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest] + AsMutableCreateSegmentMessageV2 = asSpecializedMutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] AsMutableFlushMessageV2 = asSpecializedMutableMessage[*FlushMessageHeader, *FlushMessageBody] AsMutableManualFlushMessageV2 = asSpecializedMutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] AsMutableBeginTxnMessageV2 = asSpecializedMutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] @@ -118,6 +124,7 @@ var ( AsImmutableDropCollectionMessageV1 = asSpecializedImmutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest] AsImmutableCreatePartitionMessageV1 = asSpecializedImmutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest] AsImmutableDropPartitionMessageV1 = asSpecializedImmutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest] + AsImmutableCreateSegmentMessageV2 = asSpecializedImmutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] AsImmutableFlushMessageV2 = asSpecializedImmutableMessage[*FlushMessageHeader, *FlushMessageBody] AsImmutableManualFlushMessageV2 = asSpecializedImmutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] AsImmutableBeginTxnMessageV2 = asSpecializedImmutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody]