-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sorter/leveldb(ticdc): replace cleaner with delete range #4632
Conversation
* Add DeleteRange API to DB * Update DB dashboard in grafana * Adjust DB block cahce by sorter.max-memory-percentage Signed-off-by: Neil Shen <[email protected]>
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
Codecov Report
Flags with carried forward coverage won't be shown. Click here to find out more. @@ Coverage Diff @@
## master #4632 +/- ##
================================================
- Coverage 55.6402% 55.0001% -0.6401%
================================================
Files 494 512 +18
Lines 61283 63638 +2355
================================================
+ Hits 34098 35001 +903
- Misses 23750 25154 +1404
- Partials 3435 3483 +48 |
Signed-off-by: Neil Shen <[email protected]>
Signed-off-by: Neil Shen <[email protected]>
cdc/sorter/leveldb/compactor.go
Outdated
return false | ||
// trySchdeduleCompact try to schedule a compact task. | ||
// Returns true if it schedules compact task successfully. | ||
func (s *CompactScheduler) trySchdeduleCompact(id actor.ID, deleteCount int) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (s *CompactScheduler) trySchdeduleCompact(id actor.ID, deleteCount int) bool { | |
func (s *CompactScheduler) tryScheduleCompact(id actor.ID, deleteCount int) bool { |
cdc/sorter/leveldb/leveldb.go
Outdated
@@ -138,6 +138,14 @@ func (ldb *DBActor) close(err error) { | |||
ldb.closedWg.Done() | |||
} | |||
|
|||
func (ldb *DBActor) trySchdeduleCompact() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (ldb *DBActor) trySchdeduleCompact() { | |
func (ldb *DBActor) tryScheduleCompact() { |
Signed-off-by: Neil Shen <[email protected]>
cdc/sorter/leveldb/leveldb.go
Outdated
if len(task.DeleteReq.Range[0]) != 0 && len(task.DeleteReq.Range[1]) != 0 { | ||
// Force write pending write batch before delete range. | ||
if err := ldb.maybeWrite(true); err != nil { | ||
log.Panic("db error", zap.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log tableID?
Signed-off-by: Neil Shen <[email protected]>
Signed-off-by: Neil Shen <[email protected]>
cdc/sorter/leveldb/table_sorter.go
Outdated
// CleanupTask returns a clean up task that delete sorter's data. | ||
func (ls *Sorter) CleanupTask() actormsg.Message { | ||
return actormsg.SorterMessage(message.NewCleanupTask(ls.uid, ls.tableID)) | ||
// CleanupFunc returns a funcation that cleans up sorter's data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// CleanupFunc returns a funcation that cleans up sorter's data. | |
// CleanupFunc returns a function that cleans up sorter's data. |
cdc/sorter/leveldb/table_sorter.go
Outdated
return func(ctx context.Context) error { | ||
task := message.Task{UID: ls.uid, TableID: ls.tableID} | ||
task.DeleteReq = &message.DeleteRequest{ | ||
// We do not set task.Delte.Count, because we don't know |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// We do not set task.Delte.Count, because we don't know | |
// We do not set task.Delete.Count, because we don't know |
pkg/config/sorter.go
Outdated
@@ -48,8 +48,8 @@ func (c *SorterConfig) ValidateAndAdjust() error { | |||
if c.NumWorkerPoolGoroutine < 1 { | |||
return cerror.ErrIllegalSorterParameter.GenWithStackByArgs("num-workerpool-goroutine should be at least 1, larger than 8 is recommended") | |||
} | |||
if c.MaxMemoryPressure < 0 || c.MaxMemoryPressure > 100 { | |||
return cerror.ErrIllegalSorterParameter.GenWithStackByArgs("max-memory-percentage should be a percentage") | |||
if c.MaxMemoryPressure <= 0 || c.MaxMemoryPressure > 80 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this SorterConfig. MaxMemoryPressure
field be renamed as MaxMemoryPercentage?
@@ -229,6 +231,20 @@ func (ldb *DBActor) Poll(ctx context.Context, tasks []actormsg.Message) bool { | |||
log.Panic("db error", zap.Error(err)) | |||
} | |||
} | |||
if task.DeleteReq != nil { | |||
ldb.deleteCount += task.DeleteReq.Count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for DBActor, task.DeleteReq.Count is always 0? the value is from CleanFunc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, it's always zero, we will fill Count
in later PRs.
log.Panic("db error", zap.Error(err)) | ||
} | ||
start, end := task.DeleteReq.Range[0], task.DeleteReq.Range[1] | ||
if err := ldb.db.DeleteRange(start, end); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is DeleteRange a blocking function call or non-blocking function call?
Signed-off-by: Neil Shen <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
/LGTM |
/merge |
This pull request has been accepted and is ready to merge. Commit hash: c39e522
|
/run-integration-test |
/run-check-issue-triage-complete |
/merge |
/merge |
@overvenus: Your PR was out of date, I have automatically updated it for you. At the same time I will also trigger all tests for you: /run-all-tests If the CI test fails, you just re-trigger the test that failed and the bot will merge the PR for you after the CI passes. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
What problem does this PR solve?
Issue Number: ref #4631
Check List
Tests
Release note