Skip to content

Commit

Permalink
enhance: add create segment message, enable empty segment flush
Browse files Browse the repository at this point in the history
- add redo interceptor to implement append context refresh. (make new timetick)
- add create segment handler for flusher.
- make empty segment flushable and directly change it into dropped.
- add create segment message into wal when creating new growing segment.

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Nov 5, 2024
1 parent 0645d46 commit 32bed9b
Show file tree
Hide file tree
Showing 61 changed files with 1,065 additions and 191 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104061416-ec2484585833
github.com/minio/minio-go/v7 v7.0.73
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -627,10 +627,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34 h1:Fwxpg98128gfWRbQ1A3PMP9o2IfYZk7RSEy8rcoCWDA=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7 h1:HwAitQk+V59QdYUwwVVYHTujd4QZrebg2Cc2hmcjhAg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104061416-ec2484585833 h1:Pkz4jXqYQVY8nbZkY0+G4K5toZJaT4LxUrrYwGxwi3I=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104061416-ec2484585833/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
9 changes: 5 additions & 4 deletions internal/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ packages:
github.com/milvus-io/milvus/internal/streamingnode/server/flusher:
interfaces:
Flusher:
FlushMsgHandler:
github.com/milvus-io/milvus/internal/streamingnode/server/wal:
interfaces:
OpenerBuilder:
Expand All @@ -44,11 +45,11 @@ packages:
Interceptor:
InterceptorWithReady:
InterceptorBuilder:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector:
interfaces:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector:
interfaces:
SealOperator:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector:
interfaces:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector:
interfaces:
TimeTickSyncOperator:
google.golang.org/grpc:
interfaces:
Expand Down
18 changes: 18 additions & 0 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,24 @@ func UpdateIsImporting(segmentID int64, isImporting bool) UpdateOperator {
}
}

// UpdateAsDroppedIfEmptyWhenFlushing updates segment state to Dropped if segment is empty and in Flushing state
// It's used to make a empty flushing segment to be dropped directly.
func UpdateAsDroppedIfEmptyWhenFlushing(segmentID int64) UpdateOperator {
return func(modPack *updateSegmentPack) bool {
segment := modPack.Get(segmentID)
if segment == nil {
log.Warn("meta update: update as dropped if empty when flusing failed - segment not found",
zap.Int64("segmentID", segmentID))
return false
}
if segment.GetNumOfRows() == 0 && segment.GetState() == commonpb.SegmentState_Flushing {
log.Info("meta update: update as dropped if empty when flusing", zap.Int64("segmentID", segmentID))
updateSegStateAndPrepareMetrics(segment, commonpb.SegmentState_Dropped, modPack.metricMutation)
}
return true
}
}

// updateSegmentsInfo update segment infos
// will exec all operators, and update all changed segments
func (m *meta) UpdateSegmentsInfo(operators ...UpdateOperator) error {
Expand Down
14 changes: 14 additions & 0 deletions internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,20 @@ func TestUpdateSegmentsInfo(t *testing.T) {
UpdateIsImporting(1, true),
)
assert.NoError(t, err)

err = meta.UpdateSegmentsInfo(UpdateAsDroppedIfEmptyWhenFlushing(1))
assert.NoError(t, err)
})

t.Run("update empty segment into flush", func(t *testing.T) {
meta, err := newMemoryMeta()
assert.NoError(t, err)
meta.AddSegment(context.Background(), &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing}})
err = meta.UpdateSegmentsInfo(
UpdateStatusOperator(1, commonpb.SegmentState_Flushing),
UpdateAsDroppedIfEmptyWhenFlushing(1),
)
assert.NoError(t, err)
})

t.Run("update checkpoints and start position of non existed segment", func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
AddBinlogsOperator(req.GetSegmentID(), req.GetField2BinlogPaths(), req.GetField2StatslogPaths(), req.GetDeltalogs(), req.GetField2Bm25LogPaths()),
UpdateStartPosition(req.GetStartPositions()),
UpdateCheckPointOperator(req.GetSegmentID(), req.GetCheckPoints()),
UpdateAsDroppedIfEmptyWhenFlushing(req.GetSegmentID()),
)

// Update segment info in memory and meta.
Expand Down
34 changes: 31 additions & 3 deletions internal/datacoord/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,19 @@ func (s *ServerSuite) TestSaveBinlogPath_SaveDroppedSegment() {
segments := map[int64]commonpb.SegmentState{
0: commonpb.SegmentState_Flushed,
1: commonpb.SegmentState_Sealed,
2: commonpb.SegmentState_Sealed,
}
for segID, state := range segments {
numOfRows := int64(100)
if segID == 2 {
numOfRows = 0
}
info := &datapb.SegmentInfo{
ID: segID,
InsertChannel: "ch1",
State: state,
Level: datapb.SegmentLevel_L1,
NumOfRows: numOfRows,
}
err := s.testServer.meta.AddSegment(context.TODO(), NewSegmentInfo(info))
s.Require().NoError(err)
Expand All @@ -262,11 +268,14 @@ func (s *ServerSuite) TestSaveBinlogPath_SaveDroppedSegment() {
inSegID int64
inDropped bool
inFlushed bool
numOfRows int64

expectedState commonpb.SegmentState
}{
{"segID=0, flushed to dropped", 0, true, false, commonpb.SegmentState_Dropped},
{"segID=1, sealed to flushing", 1, false, true, commonpb.SegmentState_Flushing},
{"segID=0, flushed to dropped", 0, true, false, 100, commonpb.SegmentState_Dropped},
{"segID=1, sealed to flushing", 1, false, true, 100, commonpb.SegmentState_Flushing},
// empty segment flush should be dropped directly.
{"segID=2, sealed to dropped", 2, false, true, 0, commonpb.SegmentState_Dropped},
}

paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key, "False")
Expand All @@ -289,7 +298,7 @@ func (s *ServerSuite) TestSaveBinlogPath_SaveDroppedSegment() {
segment := s.testServer.meta.GetSegment(test.inSegID)
s.NotNil(segment)
s.EqualValues(0, len(segment.GetBinlogs()))
s.EqualValues(segment.NumOfRows, 0)
s.EqualValues(segment.NumOfRows, test.numOfRows)

flushing := []commonpb.SegmentState{commonpb.SegmentState_Flushed, commonpb.SegmentState_Flushing}
if lo.Contains(flushing, test.expectedState) {
Expand Down Expand Up @@ -361,6 +370,7 @@ func (s *ServerSuite) TestSaveBinlogPath_NormalCase() {
segments := map[int64]int64{
0: 0,
1: 0,
2: 0,
}
for segID, collID := range segments {
info := &datapb.SegmentInfo{
Expand Down Expand Up @@ -445,6 +455,24 @@ func (s *ServerSuite) TestSaveBinlogPath_NormalCase() {
s.EqualValues(segment.DmlPosition.ChannelName, "ch1")
s.EqualValues(segment.DmlPosition.MsgID, []byte{1, 2, 3})
s.EqualValues(segment.NumOfRows, 10)

resp, err = s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
},
SegmentID: 2,
CollectionID: 0,
Channel: "ch1",
Field2BinlogPaths: []*datapb.FieldBinlog{},
Field2StatslogPaths: []*datapb.FieldBinlog{},
CheckPoints: []*datapb.CheckPoint{},
Flushed: true,
})
s.NoError(err)
s.EqualValues(resp.ErrorCode, commonpb.ErrorCode_Success)
segment = s.testServer.meta.GetSegment(2)
s.NotNil(segment)
s.Equal(commonpb.SegmentState_Dropped, segment.GetState())
}

func (s *ServerSuite) TestFlush_NormalCase() {
Expand Down
10 changes: 9 additions & 1 deletion internal/flushcommon/io/mock_binlogio.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions internal/flushcommon/metacache/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ func WithNoSyncingTask() SegmentFilter {

type SegmentAction func(info *SegmentInfo)

func SegmentActions(actions ...SegmentAction) SegmentAction {
return func(info *SegmentInfo) {
for _, act := range actions {
act(info)
}
}
}

func UpdateState(state commonpb.SegmentState) SegmentAction {
return func(info *SegmentInfo) {
info.state = state
Expand All @@ -147,6 +155,14 @@ func UpdateNumOfRows(numOfRows int64) SegmentAction {
}
}

func SetStartPositionIfNil(startPos *msgpb.MsgPosition) SegmentAction {
return func(info *SegmentInfo) {
if info.startPosition == nil {
info.startPosition = startPos
}
}
}

func UpdateBufferedRows(bufferedRows int64) SegmentAction {
return func(info *SegmentInfo) {
info.bufferRows = bufferedRows
Expand Down
7 changes: 7 additions & 0 deletions internal/flushcommon/metacache/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ func (s *SegmentActionSuite) TestActions() {
action = UpdateNumOfRows(numOfRows)
action(info)
s.Equal(numOfRows, info.NumOfRows())

info = &SegmentInfo{}
actions := SegmentActions(UpdateState(state), UpdateCheckpoint(cp), UpdateNumOfRows(numOfRows))
actions(info)
s.Equal(state, info.State())
s.Equal(cp, info.Checkpoint())
s.Equal(numOfRows, info.NumOfRows())
}

func (s *SegmentActionSuite) TestMergeActions() {
Expand Down
2 changes: 1 addition & 1 deletion internal/flushcommon/pipeline/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams,
flushed,
unflushed,
params.CompactionExecutor,
params.FlushMsgHandler,
params.MsgHandler,
)
if err != nil {
return nil, err
Expand Down
23 changes: 18 additions & 5 deletions internal/flushcommon/pipeline/flow_graph_dd_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type ddNode struct {

dropMode atomic.Value
compactionExecutor compaction.Executor
flushMsgHandler flusher.FlushMsgHandler
msgHandler flusher.MsgHandler

// for recovery
growingSegInfo map[typeutil.UniqueID]*datapb.SegmentInfo // segmentID
Expand Down Expand Up @@ -236,6 +236,19 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).
Add(float64(dmsg.GetNumRows()))
fgMsg.DeleteMessages = append(fgMsg.DeleteMessages, dmsg)
case commonpb.MsgType_CreateSegment:
createSegment := msg.(*adaptor.CreateSegmentMessageBody)
logger := log.With(
zap.String("vchannel", ddn.Name()),
zap.Int32("msgType", int32(msg.Type())),
zap.Uint64("timetick", createSegment.CreateSegmentMessage.TimeTick()),
)
logger.Info("receive create segment message")
if err := ddn.msgHandler.HandleCreateSegment(context.Background(), ddn.vChannelName, createSegment.CreateSegmentMessage); err != nil {
logger.Warn("handle create segment message failed", zap.Error(err))

Check warning on line 248 in internal/flushcommon/pipeline/flow_graph_dd_node.go

View check run for this annotation

Codecov / codecov/patch

internal/flushcommon/pipeline/flow_graph_dd_node.go#L248

Added line #L248 was not covered by tests
} else {
logger.Info("handle create segment message success")
}
case commonpb.MsgType_FlushSegment:
flushMsg := msg.(*adaptor.FlushMessageBody)
logger := log.With(
Expand All @@ -244,7 +257,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
zap.Uint64("timetick", flushMsg.FlushMessage.TimeTick()),
)
logger.Info("receive flush message")
if err := ddn.flushMsgHandler.HandleFlush(ddn.vChannelName, flushMsg.FlushMessage); err != nil {
if err := ddn.msgHandler.HandleFlush(ddn.vChannelName, flushMsg.FlushMessage); err != nil {
logger.Warn("handle flush message failed", zap.Error(err))
} else {
logger.Info("handle flush message success")
Expand All @@ -258,7 +271,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
zap.Uint64("flushTs", manualFlushMsg.ManualFlushMessage.Header().FlushTs),
)
logger.Info("receive manual flush message")
if err := ddn.flushMsgHandler.HandleManualFlush(ddn.vChannelName, manualFlushMsg.ManualFlushMessage); err != nil {
if err := ddn.msgHandler.HandleManualFlush(ddn.vChannelName, manualFlushMsg.ManualFlushMessage); err != nil {
logger.Warn("handle manual flush message failed", zap.Error(err))
} else {
logger.Info("handle manual flush message success")
Expand Down Expand Up @@ -318,7 +331,7 @@ func (ddn *ddNode) Close() {
}

func newDDNode(ctx context.Context, collID typeutil.UniqueID, vChannelName string, droppedSegmentIDs []typeutil.UniqueID,
sealedSegments []*datapb.SegmentInfo, growingSegments []*datapb.SegmentInfo, executor compaction.Executor, handler flusher.FlushMsgHandler,
sealedSegments []*datapb.SegmentInfo, growingSegments []*datapb.SegmentInfo, executor compaction.Executor, handler flusher.MsgHandler,
) (*ddNode, error) {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(paramtable.Get().DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32())
Expand All @@ -333,7 +346,7 @@ func newDDNode(ctx context.Context, collID typeutil.UniqueID, vChannelName strin
droppedSegmentIDs: droppedSegmentIDs,
vChannelName: vChannelName,
compactionExecutor: executor,
flushMsgHandler: handler,
msgHandler: handler,
}

dd.dropMode.Store(false)
Expand Down
Loading

0 comments on commit 32bed9b

Please sign in to comment.