-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sink/kafka(ticdc): remove useless partition flush logic #4598
sink/kafka(ticdc): remove useless partition flush logic #4598
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
/run-all-tests |
/run-all-tests |
1 similar comment
/run-all-tests |
Codecov Report
Flags with carried forward coverage won't be shown. Click here to find out more. @@ Coverage Diff @@
## master #4598 +/- ##
================================================
- Coverage 55.6402% 55.1585% -0.4818%
================================================
Files 494 516 +22
Lines 61283 63836 +2553
================================================
+ Hits 34098 35211 +1113
- Misses 23750 25134 +1384
- Partials 3435 3491 +56 |
/run-all-tests |
This comment was marked as outdated.
This comment was marked as outdated.
dded477
to
0193c24
Compare
This comment was marked as outdated.
This comment was marked as outdated.
I did some local testing and the duration of flush is basically around 10 ms under In addition, I also tried to do a micro benchmark test, where we tried to add buffer to msgChan when the flush latency is large, but it is not effective either.( This is even an order of magnitude slower.) This optimization has little effect in the case of large writes. The flush latency itself should be acceptable when the writes are small. // buffer
msgChan: make(chan mqEvent, 512),
// Simulate flush delay
func (m *mockProducer) Flush(ctx context.Context) error {
if len(m.mqEvent) != 0 {
time.Sleep(10 * time.Millisecond)
}
m.mqEvent = make(map[int32][]*codec.MQMessage)
m.flushed = true
return nil
}
func BenchmarkFlush(b *testing.B) {
worker, _ := newTestWorker()
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := worker.run(ctx)
if err != nil && errors.Cause(err) != context.Canceled {
b.Error(err)
}
}()
b.ResetTimer()
for i := 0; i < b.N; i++ {
for i := 0; i < 100000; i++ {
b.StopTimer()
if i%100 == 0 {
time.Sleep(time.Duration(rand.Intn(30)) * time.Millisecond)
}
n := rand.Intn(10)
b.StartTimer()
if n%2 == 0 {
worker.msgChan <- mqEvent{
row: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
},
partition: int32(i % 10),
}
}
if i%10 == 0 {
worker.msgChan <- mqEvent{resolvedTs: model.Ts(i)}
}
}
}
b.StopTimer()
cancel()
wg.Wait()
} no-buffer:
buffered:
|
/run-all-tests |
/run-leak-test |
@@ -43,21 +43,23 @@ type mqEvent struct { | |||
|
|||
// flushWorker is responsible for sending messages to the Kafka producer on a batch basis. | |||
type flushWorker struct { | |||
msgChan chan mqEvent | |||
msgChan chan mqEvent | |||
ticker *time.Ticker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is it closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume it shuts down when an error occurs or the flushworker is stopped? Because when an error occurs during the run, it is reported to the processor, which then tries to shut down the sink.
What problem does this PR solve?
Issue Number: ref #4423
What is changed and how it works?
Check List
Tests
Code changes
None
Side effects
None
Related changes
None
Release note