Skip to content

Commit

Permalink
enhance: add create segment message, enable empty segment flush
Browse files Browse the repository at this point in the history
- add redo interceptor to implement append context refresh. (make new timetick)
- add create segment handler for flusher.
- make empty segment flushable and directly change it into dropped.
- add create segment message into wal when creating new growing segment.

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Nov 4, 2024
1 parent ba9f36b commit 6ab0ba1
Show file tree
Hide file tree
Showing 57 changed files with 1,025 additions and 170 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104061416-ec2484585833
github.com/minio/minio-go/v7 v7.0.73
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -627,10 +627,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34 h1:Fwxpg98128gfWRbQ1A3PMP9o2IfYZk7RSEy8rcoCWDA=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7 h1:HwAitQk+V59QdYUwwVVYHTujd4QZrebg2Cc2hmcjhAg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104061416-ec2484585833 h1:Pkz4jXqYQVY8nbZkY0+G4K5toZJaT4LxUrrYwGxwi3I=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104061416-ec2484585833/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
9 changes: 5 additions & 4 deletions internal/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ packages:
github.com/milvus-io/milvus/internal/streamingnode/server/flusher:
interfaces:
Flusher:
FlushMsgHandler:
github.com/milvus-io/milvus/internal/streamingnode/server/wal:
interfaces:
OpenerBuilder:
Expand All @@ -44,11 +45,11 @@ packages:
Interceptor:
InterceptorWithReady:
InterceptorBuilder:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector:
interfaces:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector:
interfaces:
SealOperator:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector:
interfaces:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector:
interfaces:
TimeTickSyncOperator:
google.golang.org/grpc:
interfaces:
Expand Down
18 changes: 18 additions & 0 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,24 @@ func UpdateIsImporting(segmentID int64, isImporting bool) UpdateOperator {
}
}

// UpdateAsDroppedIfEmptyWhenFlushing updates segment state to Dropped if segment is empty and in Flushing state
// It's used to make a empty flushing segment to be dropped directly.
func UpdateAsDroppedIfEmptyWhenFlushing(segmentID int64) UpdateOperator {
return func(modPack *updateSegmentPack) bool {
segment := modPack.Get(segmentID)
if segment == nil {
log.Warn("meta update: update as dropped if empty when flusing failed - segment not found",
zap.Int64("segmentID", segmentID))
return false
}
if segment.GetNumOfRows() == 0 && segment.GetState() == commonpb.SegmentState_Flushing {
log.Info("meta update: update as dropped if empty when flusing", zap.Int64("segmentID", segmentID))
updateSegStateAndPrepareMetrics(segment, commonpb.SegmentState_Dropped, modPack.metricMutation)
}
return true
}
}

// updateSegmentsInfo update segment infos
// will exec all operators, and update all changed segments
func (m *meta) UpdateSegmentsInfo(operators ...UpdateOperator) error {
Expand Down
14 changes: 14 additions & 0 deletions internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,20 @@ func TestUpdateSegmentsInfo(t *testing.T) {
UpdateIsImporting(1, true),
)
assert.NoError(t, err)

err = meta.UpdateSegmentsInfo(UpdateAsDroppedIfEmptyWhenFlushing(1))
assert.NoError(t, err)
})

t.Run("update empty segment into flush", func(t *testing.T) {
meta, err := newMemoryMeta()
assert.NoError(t, err)
meta.AddSegment(context.Background(), &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing}})
err = meta.UpdateSegmentsInfo(
UpdateStatusOperator(1, commonpb.SegmentState_Flushing),
UpdateAsDroppedIfEmptyWhenFlushing(1),
)
assert.NoError(t, err)
})

t.Run("update checkpoints and start position of non existed segment", func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
AddBinlogsOperator(req.GetSegmentID(), req.GetField2BinlogPaths(), req.GetField2StatslogPaths(), req.GetDeltalogs(), req.GetField2Bm25LogPaths()),
UpdateStartPosition(req.GetStartPositions()),
UpdateCheckPointOperator(req.GetSegmentID(), req.GetCheckPoints()),
UpdateAsDroppedIfEmptyWhenFlushing(req.GetSegmentID()),
)

// Update segment info in memory and meta.
Expand Down
19 changes: 19 additions & 0 deletions internal/datacoord/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ func (s *ServerSuite) TestSaveBinlogPath_NormalCase() {
segments := map[int64]int64{
0: 0,
1: 0,
2: 0,
}
for segID, collID := range segments {
info := &datapb.SegmentInfo{
Expand Down Expand Up @@ -445,6 +446,24 @@ func (s *ServerSuite) TestSaveBinlogPath_NormalCase() {
s.EqualValues(segment.DmlPosition.ChannelName, "ch1")
s.EqualValues(segment.DmlPosition.MsgID, []byte{1, 2, 3})
s.EqualValues(segment.NumOfRows, 10)

resp, err = s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
},
SegmentID: 2,
CollectionID: 0,
Channel: "ch1",
Field2BinlogPaths: []*datapb.FieldBinlog{},
Field2StatslogPaths: []*datapb.FieldBinlog{},
CheckPoints: []*datapb.CheckPoint{},
Flushed: true,
})
s.NoError(err)
s.EqualValues(resp.ErrorCode, commonpb.ErrorCode_Success)
segment = s.testServer.meta.GetSegment(2)
s.NotNil(segment)
s.Equal(commonpb.SegmentState_Dropped, segment.GetState())
}

func (s *ServerSuite) TestFlush_NormalCase() {
Expand Down
10 changes: 9 additions & 1 deletion internal/flushcommon/io/mock_binlogio.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions internal/flushcommon/metacache/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ func WithNoSyncingTask() SegmentFilter {

type SegmentAction func(info *SegmentInfo)

func SegmentActions(actions ...SegmentAction) SegmentAction {
return func(info *SegmentInfo) {
for _, act := range actions {
act(info)
}
}
}

func UpdateState(state commonpb.SegmentState) SegmentAction {
return func(info *SegmentInfo) {
info.state = state
Expand All @@ -147,6 +155,14 @@ func UpdateNumOfRows(numOfRows int64) SegmentAction {
}
}

func SetStartPositionIfNil(startPos *msgpb.MsgPosition) SegmentAction {
return func(info *SegmentInfo) {
if info.startPosition == nil {
info.startPosition = startPos
}
}
}

func UpdateBufferedRows(bufferedRows int64) SegmentAction {
return func(info *SegmentInfo) {
info.bufferRows = bufferedRows
Expand Down
7 changes: 7 additions & 0 deletions internal/flushcommon/metacache/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ func (s *SegmentActionSuite) TestActions() {
action = UpdateNumOfRows(numOfRows)
action(info)
s.Equal(numOfRows, info.NumOfRows())

info = &SegmentInfo{}
actions := SegmentActions(UpdateState(state), UpdateCheckpoint(cp), UpdateNumOfRows(numOfRows))
actions(info)
s.Equal(state, info.State())
s.Equal(cp, info.Checkpoint())
s.Equal(numOfRows, info.NumOfRows())
}

func (s *SegmentActionSuite) TestMergeActions() {
Expand Down
13 changes: 13 additions & 0 deletions internal/flushcommon/pipeline/flow_graph_dd_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,19 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).
Add(float64(dmsg.GetNumRows()))
fgMsg.DeleteMessages = append(fgMsg.DeleteMessages, dmsg)
case commonpb.MsgType_CreateSegment:
createSegment := msg.(*adaptor.CreateSegmentMessageBody)
logger := log.With(
zap.String("vchannel", ddn.Name()),
zap.Int32("msgType", int32(msg.Type())),
zap.Uint64("timetick", createSegment.CreateSegmentMessage.TimeTick()),
)
logger.Info("receive create segment message")
if err := ddn.flushMsgHandler.HandleCreateSegment(context.Background(), ddn.vChannelName, createSegment.CreateSegmentMessage); err != nil {
logger.Warn("handle create segment message failed", zap.Error(err))
} else {
logger.Info("handle create segment message success")
}
case commonpb.MsgType_FlushSegment:
flushMsg := msg.(*adaptor.FlushMessageBody)
logger := log.With(
Expand Down
55 changes: 55 additions & 0 deletions internal/flushcommon/pipeline/flow_graph_dd_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/datanode/compaction"
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_flusher"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -92,6 +97,56 @@ func TestFlowGraph_DDNode_newDDNode(t *testing.T) {
}
}

func TestFlowGraph_DDNode_OperateFlush(t *testing.T) {
h := mock_flusher.NewMockFlushMsgHandler(t)
h.EXPECT().HandleCreateSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil)
h.EXPECT().HandleFlush(mock.Anything, mock.Anything).Return(nil)
h.EXPECT().HandleManualFlush(mock.Anything, mock.Anything).Return(nil)

ddn := ddNode{
ctx: context.Background(),
collectionID: 1,
vChannelName: "v1",
flushMsgHandler: h,
}

mutableMsg, err := message.NewCreateSegmentMessageBuilderV2().
WithHeader(&message.CreateSegmentMessageHeader{}).
WithBody(&message.CreateSegmentMessageBody{}).
WithVChannel("v1").
BuildMutable()
assert.NoError(t, err)
immutableCreateSegmentMsg := mutableMsg.WithTimeTick(1).IntoImmutableMessage(mock_message.NewMockMessageID(t))

flushMsg, err := message.NewFlushMessageBuilderV2().
WithHeader(&message.FlushMessageHeader{}).
WithBody(&message.FlushMessageBody{}).
WithVChannel("v1").
BuildMutable()
assert.NoError(t, err)
immutableFlushMsg := flushMsg.WithTimeTick(2).IntoImmutableMessage(mock_message.NewMockMessageID(t))

manualFlushMsg, err := message.NewManualFlushMessageBuilderV2().
WithHeader(&message.ManualFlushMessageHeader{}).
WithBody(&message.ManualFlushMessageBody{}).
WithVChannel("v1").
BuildMutable()
assert.NoError(t, err)
immutableManualFlushMsg := manualFlushMsg.WithTimeTick(3).IntoImmutableMessage(mock_message.NewMockMessageID(t))

msg1, err := adaptor.NewCreateSegmentMessageBody(immutableCreateSegmentMsg)
assert.NoError(t, err)
msg2, err := adaptor.NewFlushMessageBody(immutableFlushMsg)
assert.NoError(t, err)
msg3, err := adaptor.NewManualFlushMessageBody(immutableManualFlushMsg)
assert.NoError(t, err)

tsMessages := []msgstream.TsMsg{msg1, msg2, msg3}
var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil)
outputMsgs := ddn.Operate([]Msg{msgStreamMsg})
assert.NotNil(t, outputMsgs)
}

func TestFlowGraph_DDNode_Operate(t *testing.T) {
t.Run("Test DDNode Operate DropCollection Msg", func(t *testing.T) {
// invalid inputs
Expand Down
22 changes: 21 additions & 1 deletion internal/flushcommon/pipeline/mock_fgmanager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion internal/flushcommon/syncmgr/mock_meta_writer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6ab0ba1

Please sign in to comment.