Skip to content

Commit

Permalink
sorter/leveldb(ticdc): replace cleaner with delete range (#4632)
Browse files Browse the repository at this point in the history
ref #4631
  • Loading branch information
overvenus committed Feb 24, 2022
1 parent 1babee1 commit c396df1
Show file tree
Hide file tree
Showing 38 changed files with 367 additions and 881 deletions.
4 changes: 3 additions & 1 deletion cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ func (c *Capture) reset(ctx context.Context) error {
// Sorter dir has been set and checked when server starts.
// See https://github.com/pingcap/tiflow/blob/9dad09/cdc/server.go#L275
sortDir := config.GetGlobalServerConfig().Sorter.SortDir
c.sorterSystem = ssystem.NewSystem(sortDir, conf.Debug.DB)
memPercentage :=
float64(config.GetGlobalServerConfig().Sorter.MaxMemoryPercentage) / 100
c.sorterSystem = ssystem.NewSystem(sortDir, memPercentage, conf.Debug.DB)
err = c.sorterSystem.Start(ctx)
if err != nil {
return errors.Annotate(
Expand Down
12 changes: 4 additions & 8 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ type sorterNode struct {
eg *errgroup.Group
cancel context.CancelFunc

cleanID actor.ID
cleanTask message.Message
cleanRouter *actor.Router
cleanup func(context.Context) error

// The latest resolved ts that sorter has received.
resolvedTs model.Ts
Expand Down Expand Up @@ -116,9 +114,7 @@ func (n *sorterNode) start(ctx pipeline.NodeContext, isTableActorMode bool, eg *
levelSorter := leveldb.NewSorter(
ctx, n.tableID, startTs, router, actorID, compactScheduler,
config.GetGlobalServerConfig().Debug.DB)
n.cleanID = actorID
n.cleanTask = levelSorter.CleanupTask()
n.cleanRouter = ctx.GlobalVars().SorterSystem.CleanerRouter()
n.cleanup = levelSorter.CleanupFunc()
eventSorter = levelSorter
} else {
// Sorter dir has been set and checked when server starts.
Expand Down Expand Up @@ -301,9 +297,9 @@ func (n *sorterNode) updateBarrierTs(barrierTs model.Ts) {

func (n *sorterNode) releaseResource(ctx context.Context, changefeedID, captureAddr string) {
defer tableMemoryHistogram.DeleteLabelValues(changefeedID, captureAddr)
if n.cleanRouter != nil {
if n.cleanup != nil {
// Clean up data when the table sorter is canceled.
err := n.cleanRouter.SendB(ctx, n.cleanID, n.cleanTask)
err := n.cleanup(ctx)
if err != nil {
log.Warn("schedule table cleanup task failed", zap.Error(err))
}
Expand Down
219 changes: 0 additions & 219 deletions cdc/sorter/leveldb/cleaner.go

This file was deleted.

Loading

0 comments on commit c396df1

Please sign in to comment.