-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enhance: add create segment message, enable empty segment flush #37407
base: master
Are you sure you want to change the base?
Conversation
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: chyezh The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
@chyezh go-sdk check failed, comment |
@chyezh E2e jenkins job failed, comment |
39b8028
to
cb5a5e9
Compare
@@ -1013,6 +1013,24 @@ func UpdateIsImporting(segmentID int64, isImporting bool) UpdateOperator { | |||
} | |||
} | |||
|
|||
// UpdateAsDroppedIfEmptyWhenFlushing updates segment state to Dropped if segment is empty and in Flushing state | |||
// It's used to make a empty flushing segment to be dropped directly. | |||
func UpdateAsDroppedIfEmptyWhenFlushing(segmentID int64) UpdateOperator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used to set a empty segment into dropped when savebinlogpath
@@ -147,6 +155,14 @@ func UpdateNumOfRows(numOfRows int64) SegmentAction { | |||
} | |||
} | |||
|
|||
func SetStartPositionIfNil(startPos *msgpb.MsgPosition) SegmentAction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setup startPosition when first insert message comes, applied to writebuffer
@@ -236,6 +236,19 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { | |||
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel). | |||
Add(float64(dmsg.GetNumRows())) | |||
fgMsg.DeleteMessages = append(fgMsg.DeleteMessages, dmsg) | |||
case commonpb.MsgType_CreateSegment: | |||
createSegment := msg.(*adaptor.CreateSegmentMessageBody) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create new segment when create segment incoming.
TODO: Move the create new segment operation on datacoord at interceptor to here.
@@ -257,7 +258,11 @@ func (s *storageV1Serializer) serializeMergedPkStats(pack *SyncPack) (*storage.B | |||
BF: pks.PkFilter, | |||
PkType: int64(s.pkField.GetDataType()), | |||
} | |||
}), segment.NumOfRows()) | |||
}) | |||
if len(stats) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allow to merge a empty stats if the segment is empty.
@chyezh E2e jenkins job failed, comment |
cb5a5e9
to
6ab0ba1
Compare
@chyezh go-sdk check failed, comment |
@chyezh cpp-unit-test check failed, comment |
@chyezh E2e jenkins job failed, comment |
logger := log.With( | ||
zap.String("vchannel", ddn.Name()), | ||
zap.Int32("msgType", int32(msg.Type())), | ||
zap.Uint64("timetick", createSegment.CreateSegmentMessage.TimeTick()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to log segmentID as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix it soon
@@ -16,9 +16,15 @@ | |||
|
|||
package flusher | |||
|
|||
import "github.com/milvus-io/milvus/pkg/streaming/util/message" | |||
import ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename the file?
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
// no-op if the incoming time tick is less than the fenced time tick. | ||
if timeTick <= m.fencedAssignTimeTick { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When would this happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Concurrent maunalflush incoming, The older manualflush will be ignored.
- If manualflush trigger a flushsegment operation, it will redo the operation to generate a new timetick to keep the message sequence:flushTs -> flushsegment -> flushsegment -> manualflush. So the redo operation will be rejected here. In previous implementation, the sequence will be flushTs -> manualflush -> flushsegment -> flushsegment, it's wierd.
0807515
to
9a09f1a
Compare
@chyezh E2e jenkins job failed, comment |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #37407 +/- ##
=======================================
Coverage 67.31% 67.31%
=======================================
Files 290 290
Lines 25377 25377
=======================================
Hits 17082 17082
Misses 8295 8295
|
9a09f1a
to
a2fbfc4
Compare
@chyezh E2e jenkins job failed, comment |
a2fbfc4
to
32bed9b
Compare
rerun ut |
@chyezh E2e jenkins job failed, comment |
/run-cpu-e2e |
- add redo interceptor to implement append context refresh. (make new timetick) - add create segment handler for flusher. - make empty segment flushable and directly change it into dropped. - add create segment message into wal when creating new growing segment. Signed-off-by: chyezh <[email protected]>
32bed9b
to
1936a4d
Compare
@chyezh E2e jenkins job failed, comment |
/run-cpu-e2e |
issue: #37172