diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 79263afccea..f6fd6dd6e60 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -170,7 +170,9 @@ func (c *Capture) reset(ctx context.Context) error { // Sorter dir has been set and checked when server starts. // See https://github.com/pingcap/tiflow/blob/9dad09/cdc/server.go#L275 sortDir := config.GetGlobalServerConfig().Sorter.SortDir - c.sorterSystem = ssystem.NewSystem(sortDir, conf.Debug.DB) + memPercentage := + float64(config.GetGlobalServerConfig().Sorter.MaxMemoryPercentage) / 100 + c.sorterSystem = ssystem.NewSystem(sortDir, memPercentage, conf.Debug.DB) err = c.sorterSystem.Start(ctx) if err != nil { return errors.Annotate( diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index a76af6d267c..2363e206282 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -55,9 +55,7 @@ type sorterNode struct { eg *errgroup.Group cancel context.CancelFunc - cleanID actor.ID - cleanTask message.Message - cleanRouter *actor.Router + cleanup func(context.Context) error // The latest resolved ts that sorter has received. resolvedTs model.Ts @@ -116,9 +114,7 @@ func (n *sorterNode) start(ctx pipeline.NodeContext, isTableActorMode bool, eg * levelSorter := leveldb.NewSorter( ctx, n.tableID, startTs, router, actorID, compactScheduler, config.GetGlobalServerConfig().Debug.DB) - n.cleanID = actorID - n.cleanTask = levelSorter.CleanupTask() - n.cleanRouter = ctx.GlobalVars().SorterSystem.CleanerRouter() + n.cleanup = levelSorter.CleanupFunc() eventSorter = levelSorter } else { // Sorter dir has been set and checked when server starts. @@ -301,9 +297,9 @@ func (n *sorterNode) updateBarrierTs(barrierTs model.Ts) { func (n *sorterNode) releaseResource(ctx context.Context, changefeedID, captureAddr string) { defer tableMemoryHistogram.DeleteLabelValues(changefeedID, captureAddr) - if n.cleanRouter != nil { + if n.cleanup != nil { // Clean up data when the table sorter is canceled. - err := n.cleanRouter.SendB(ctx, n.cleanID, n.cleanTask) + err := n.cleanup(ctx) if err != nil { log.Warn("schedule table cleanup task failed", zap.Error(err)) } diff --git a/cdc/sorter/leveldb/cleaner.go b/cdc/sorter/leveldb/cleaner.go deleted file mode 100644 index 18cb2ab15b8..00000000000 --- a/cdc/sorter/leveldb/cleaner.go +++ /dev/null @@ -1,219 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package leveldb - -import ( - "context" - "sync" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/sorter/encoding" - "github.com/pingcap/tiflow/cdc/sorter/leveldb/message" - "github.com/pingcap/tiflow/pkg/actor" - actormsg "github.com/pingcap/tiflow/pkg/actor/message" - "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/db" - "go.uber.org/zap" - "golang.org/x/time/rate" -) - -// CleanerActor is an actor that can clean up table data asynchronously. -type CleanerActor struct { - id actor.ID - db db.DB - wbSize int - - deleteCount int - compact *CompactScheduler - - closedWg *sync.WaitGroup - - limiter *rate.Limiter - router *actor.Router -} - -var _ actor.Actor = (*CleanerActor)(nil) - -// NewCleanerActor returns a cleaner actor. -func NewCleanerActor( - id int, db db.DB, router *actor.Router, compact *CompactScheduler, - cfg *config.DBConfig, wg *sync.WaitGroup, -) (*CleanerActor, actor.Mailbox, error) { - wg.Add(1) - wbSize := 500 // default write batch size. - if (cfg.CleanupSpeedLimit / 2) < wbSize { - // wb size must be less than speed limit, otherwise it is easily - // rate-limited. - wbSize = cfg.CleanupSpeedLimit / 2 - } - limiter := rate.NewLimiter(rate.Limit(cfg.CleanupSpeedLimit), wbSize*2) - mb := actor.NewMailbox(actor.ID(id), cfg.Concurrency) - return &CleanerActor{ - id: actor.ID(id), - db: db, - wbSize: wbSize, - compact: compact, - closedWg: wg, - limiter: limiter, - router: router, - }, mb, nil -} - -// Poll implements actor.Actor. -func (clean *CleanerActor) Poll(ctx context.Context, tasks []actormsg.Message) bool { - select { - case <-ctx.Done(): - clean.close(ctx.Err()) - return false - default: - } - - reschedulePos := -1 - rescheduleDelay := time.Duration(0) - batch := clean.db.Batch(0) -TASKS: - for pos := range tasks { - var task message.Task - msg := tasks[pos] - switch msg.Tp { - case actormsg.TypeSorterTask: - task = msg.SorterTask - case actormsg.TypeStop: - clean.close(nil) - return false - default: - log.Panic("unexpected message", zap.Any("message", msg)) - } - if !task.Cleanup { - log.Panic("unexpected message", zap.Any("message", msg)) - } - - start := encoding.EncodeTsKey(task.UID, task.TableID, 0) - limit := encoding.EncodeTsKey(task.UID, task.TableID+1, 0) - iter := clean.db.Iterator(start, limit) - - // Force writes the first batch if the task is rescheduled (rate limited). - force := task.CleanupRatelimited - - for hasNext := iter.Seek(start); hasNext; hasNext = iter.Next() { - batch.Delete(iter.Key()) - - // TODO it's similar to LevelActor.maybeWrite, - // they should be unified. - if int(batch.Count()) >= clean.wbSize { - delay, err := clean.writeRateLimited(batch, force) - if err != nil { - log.Panic("db error", - zap.Error(err), zap.Uint64("id", uint64(clean.id))) - } - if delay != 0 { - // Rate limited, break and reschedule tasks. - // After the delay, this batch can be write forcibly. - reschedulePos = pos - rescheduleDelay = delay - err := iter.Release() - if err != nil { - log.Panic("db error", - zap.Error(err), zap.Uint64("id", uint64(clean.id))) - } - break TASKS - } - force = false - } - } - // Release iterator and snapshot in time. - err := iter.Release() - if err != nil { - log.Panic("db error", - zap.Error(err), zap.Uint64("id", uint64(clean.id))) - } - // Ignore rate limit and force write remaining kv. - _, err = clean.writeRateLimited(batch, true) - if err != nil { - log.Panic("db error", - zap.Error(err), zap.Uint64("id", uint64(clean.id))) - } - } - - // Reschedule rate limited tasks. - if reschedulePos >= 0 { - clean.reschedule(ctx, tasks[reschedulePos:], rescheduleDelay) - } - - return true -} - -func (clean *CleanerActor) close(err error) { - log.Info("cleaner actor quit", - zap.Uint64("ID", uint64(clean.id)), zap.Error(err)) - clean.closedWg.Done() -} - -func (clean *CleanerActor) writeRateLimited( - batch db.Batch, force bool, -) (time.Duration, error) { - count := int(batch.Count()) - // Skip rate limiter, if force write. - if !force { - reservation := clean.limiter.ReserveN(time.Now(), count) - if reservation != nil { - if !reservation.OK() { - log.Panic("write batch too large", - zap.Int("wbSize", count), - zap.Int("limit", clean.limiter.Burst())) - } - delay := reservation.Delay() - if delay != 0 { - // Rate limited, wait. - return delay, nil - } - } - } - clean.deleteCount += int(batch.Count()) - err := batch.Commit() - if err != nil { - return 0, errors.Trace(err) - } - batch.Reset() - // Schedule a compact task when there are too many deletion. - if clean.compact.maybeCompact(clean.id, clean.deleteCount) { - // Reset delete key count if schedule compaction successfully. - clean.deleteCount = 0 - } - return 0, nil -} - -func (clean *CleanerActor) reschedule( - ctx context.Context, tasks []actormsg.Message, delay time.Duration, -) { - id := clean.id - msgs := append([]actormsg.Message{}, tasks...) - // Reschedule tasks respect after delay. - time.AfterFunc(delay, func() { - for i := range msgs { - // Mark the first task is rescheduled due to rate limit. - if i == 0 { - msgs[i].SorterTask.CleanupRatelimited = true - } - // Blocking send to ensure that no tasks are lost. - err := clean.router.SendB(ctx, id, msgs[i]) - if err != nil { - log.Warn("drop table clean-up task", - zap.Uint64("tableID", msgs[i].SorterTask.TableID)) - } - } - }) -} diff --git a/cdc/sorter/leveldb/cleaner_test.go b/cdc/sorter/leveldb/cleaner_test.go deleted file mode 100644 index 9ea1915dd27..00000000000 --- a/cdc/sorter/leveldb/cleaner_test.go +++ /dev/null @@ -1,394 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package leveldb - -import ( - "context" - "encoding/hex" - "fmt" - "sync" - "testing" - "time" - - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sorter/encoding" - "github.com/pingcap/tiflow/cdc/sorter/leveldb/message" - "github.com/pingcap/tiflow/pkg/actor" - actormsg "github.com/pingcap/tiflow/pkg/actor/message" - "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/db" - "github.com/stretchr/testify/require" -) - -func makeCleanTask(uid uint32, tableID uint64) []actormsg.Message { - return []actormsg.Message{actormsg.SorterMessage(message.Task{ - UID: uid, - TableID: tableID, - Cleanup: true, - })} -} - -func prepareData(t *testing.T, db db.DB, data [][]int) { - wb := db.Batch(0) - for _, d := range data { - count, uid, tableID := d[0], d[1], d[2] - for k := 0; k < count; k++ { - key := encoding.EncodeKey( - uint32(uid), uint64(tableID), - model.NewPolymorphicEvent(&model.RawKVEntry{ - OpType: model.OpTypeDelete, - Key: []byte{byte(k)}, - StartTs: 1, - CRTs: 2, - })) - wb.Put(key, key) - } - } - require.Nil(t, wb.Commit()) -} - -func TestCleanerPoll(t *testing.T) { - t.Parallel() - ctx := context.Background() - cfg := config.GetDefaultServerConfig().Clone().Debug.DB - cfg.Count = 1 - - db, err := db.OpenLevelDB(ctx, 1, t.TempDir(), cfg) - require.Nil(t, err) - closedWg := new(sync.WaitGroup) - compact := NewCompactScheduler(actor.NewRouter(t.Name()), cfg) - clean, _, err := NewCleanerActor(1, db, nil, compact, cfg, closedWg) - require.Nil(t, err) - - // Put data to db. - // * 1 key of uid1 table1 - // * 3 key of uid2 table1 - // * 2 key of uid3 table2 - // * 1 key of uid4 table2 - data := [][]int{ - {1, 1, 1}, - {3, 2, 1}, - {2, 3, 2}, - {1, 4, 2}, - } - prepareData(t, db, data) - - // Ensure there are some key/values belongs to uid2 table1. - start := encoding.EncodeTsKey(2, 1, 0) - limit := encoding.EncodeTsKey(2, 2, 0) - iter := db.Iterator(start, limit) - require.True(t, iter.First()) - require.Nil(t, iter.Release()) - - // Clean up uid2 table1 - closed := !clean.Poll(ctx, makeCleanTask(2, 1)) - require.False(t, closed) - - // Ensure no key/values belongs to uid2 table1 - iter = db.Iterator(start, limit) - require.False(t, iter.First()) - require.Nil(t, iter.Release()) - - // Ensure uid1 table1 is untouched. - start = encoding.EncodeTsKey(1, 1, 0) - limit = encoding.EncodeTsKey(1, 2, 0) - iter = db.Iterator(start, limit) - require.True(t, iter.First()) - require.Nil(t, iter.Release()) - - // Ensure uid3 table2 is untouched. - start = encoding.EncodeTsKey(3, 2, 0) - limit = encoding.EncodeTsKey(3, 3, 0) - iter = db.Iterator(start, limit) - require.True(t, iter.First()) - require.Nil(t, iter.Release()) - - // Clean up uid3 table2 - closed = !clean.Poll(ctx, makeCleanTask(3, 2)) - require.False(t, closed) - - // Ensure no key/values belongs to uid3 table2 - iter = db.Iterator(start, limit) - require.False(t, iter.First()) - require.Nil(t, iter.Release()) - - // Ensure uid4 table2 is untouched. - start = encoding.EncodeTsKey(4, 2, 0) - limit = encoding.EncodeTsKey(4, 3, 0) - iter = db.Iterator(start, limit) - require.True(t, iter.First()) - require.Nil(t, iter.Release()) - - // Close leveldb. - closed = !clean.Poll(ctx, []actormsg.Message{actormsg.StopMessage()}) - require.True(t, closed) - closedWg.Wait() - require.Nil(t, db.Close()) -} - -func TestCleanerContextCancel(t *testing.T) { - t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - cfg := config.GetDefaultServerConfig().Clone().Debug.DB - cfg.Count = 1 - - db, err := db.OpenLevelDB(ctx, 1, t.TempDir(), cfg) - require.Nil(t, err) - closedWg := new(sync.WaitGroup) - compact := NewCompactScheduler(actor.NewRouter(t.Name()), cfg) - clean, _, err := NewCleanerActor(1, db, nil, compact, cfg, closedWg) - require.Nil(t, err) - - cancel() - tasks := makeCleanTask(1, 1) - closed := !clean.Poll(ctx, tasks) - require.True(t, closed) - closedWg.Wait() - require.Nil(t, db.Close()) -} - -func TestCleanerWriteRateLimited(t *testing.T) { - t.Parallel() - ctx := context.Background() - cfg := config.GetDefaultServerConfig().Clone().Debug.DB - cfg.Count = 1 - cfg.CleanupSpeedLimit = 4 - // wbSize = cleanup speed limit / 2 - - db, err := db.OpenLevelDB(ctx, 1, t.TempDir(), cfg) - require.Nil(t, err) - closedWg := new(sync.WaitGroup) - compact := NewCompactScheduler(actor.NewRouter(t.Name()), cfg) - clean, _, err := NewCleanerActor(1, db, nil, compact, cfg, closedWg) - require.Nil(t, err) - - // Put data to db. - // * 1 key of uid1 table1 - // * 3 key of uid2 table1 - // * 2 key of uid3 table2 - // * 1 key of uid4 table2 - data := [][]int{ - {1, 1, 1}, - {3, 2, 1}, - {2, 3, 2}, - {1, 4, 2}, - } - prepareData(t, db, data) - - keys := [][]byte{} - start := encoding.EncodeTsKey(0, 0, 0) - limit := encoding.EncodeTsKey(5, 0, 0) - iter := db.Iterator(start, limit) - for iter.Next() { - key := append([]byte{}, iter.Key()...) - keys = append(keys, key) - } - require.Nil(t, iter.Release()) - require.Equal(t, 7, len(keys), "%v", keys) - - // Must speed limited. - wb := db.Batch(0) - var delay time.Duration - var count int - for { - for i := 0; i < cfg.CleanupSpeedLimit/2; i++ { - wb.Delete(keys[i]) - } - delay, err = clean.writeRateLimited(wb, false) - require.Nil(t, err) - if delay != 0 { - break - } - count++ - } - - // Sleep and write again. - time.Sleep(delay * 4) - delay, err = clean.writeRateLimited(wb, false) - require.EqualValues(t, 0, delay) - require.Nil(t, err) - - // Force write ignores speed limit. - for i := 0; i < count*2; i++ { - delay, err = clean.writeRateLimited(wb, true) - require.EqualValues(t, 0, delay) - require.Nil(t, err) - } - - // Close leveldb. - closed := !clean.Poll(ctx, []actormsg.Message{actormsg.StopMessage()}) - require.True(t, closed) - closedWg.Wait() - require.Nil(t, db.Close()) -} - -func TestCleanerTaskRescheduled(t *testing.T) { - t.Parallel() - ctx := context.Background() - cfg := config.GetDefaultServerConfig().Clone().Debug.DB - cfg.Count = 1 - cfg.CleanupSpeedLimit = 4 - // wbSize = cleanup speed limit / 2 - - db, err := db.OpenLevelDB(ctx, 1, t.TempDir(), cfg) - require.Nil(t, err) - closedWg := new(sync.WaitGroup) - router := actor.NewRouter(t.Name()) - compact := NewCompactScheduler(actor.NewRouter(t.Name()), cfg) - clean, mb, err := NewCleanerActor(1, db, router, compact, cfg, closedWg) - require.Nil(t, err) - router.InsertMailbox4Test(actor.ID(1), mb) - require.Nil(t, router.SendB(ctx, actor.ID(1), actormsg.TickMessage())) - receiveTimeout := func() (actormsg.Message, bool) { - for i := 0; i < 10; i++ { // 2s - time.Sleep(200 * time.Millisecond) - task, ok := mb.Receive() - if ok { - return task, ok - } - } - return mb.Receive() - } - mustReceive := func() actormsg.Message { - task, ok := receiveTimeout() - if !ok { - t.Fatal("timeout") - } - return task - } - _ = mustReceive() - - // Put data to db. - // * 8 key of uid1 table1 - // * 2 key of uid2 table1 - // * 2 key of uid3 table2 - data := [][]int{ - {8, 1, 1}, - {2, 2, 1}, - {2, 3, 2}, - } - prepareData(t, db, data) - - tasks := makeCleanTask(1, 1) - tasks = append(tasks, makeCleanTask(2, 1)...) - tasks = append(tasks, makeCleanTask(3, 2)...) - - // All tasks must be rescheduled. - closed := !clean.Poll(ctx, tasks) - require.False(t, closed) - // uid1 table1 - task := mustReceive() - msg := makeCleanTask(1, 1) - msg[0].SorterTask.CleanupRatelimited = true - require.EqualValues(t, msg[0], task) - tasks[0] = task - // uid2 tabl2 - task = mustReceive() - msg = makeCleanTask(2, 1) - require.EqualValues(t, msg[0], task) - tasks[1] = task - // uid3 tabl2 - task = mustReceive() - msg = makeCleanTask(3, 2) - require.EqualValues(t, msg[0], task) - tasks[2] = task - - // Reschedule tasks. - // All tasks can finish eventually. - closed = !clean.Poll(ctx, tasks) - require.False(t, closed) - for { - task, ok := receiveTimeout() - if !ok { - break - } - closed := !clean.Poll(ctx, []actormsg.Message{task}) - require.False(t, closed) - } - - // Ensure all data are deleted. - start := encoding.EncodeTsKey(0, 0, 0) - limit := encoding.EncodeTsKey(4, 0, 0) - iter := db.Iterator(start, limit) - require.False(t, iter.First(), fmt.Sprintln(hex.EncodeToString(iter.Key()))) - require.Nil(t, iter.Release()) - - // Close leveldb. - closed = !clean.Poll(ctx, []actormsg.Message{actormsg.StopMessage()}) - require.True(t, closed) - closedWg.Wait() - // TODO: find a better to test if iterators are leaked. - // stats := leveldb.DBStats{} - // require.Nil(t, db.Stats(&stats)) - // require.Zero(t, stats.AliveIterators) - require.Nil(t, db.Close()) -} - -func TestCleanerCompact(t *testing.T) { - t.Parallel() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - cfg := config.GetDefaultServerConfig().Clone().Debug.DB - cfg.Count = 1 - - id := 1 - db, err := db.OpenLevelDB(ctx, id, t.TempDir(), cfg) - require.Nil(t, err) - closedWg := new(sync.WaitGroup) - compactRouter := actor.NewRouter(t.Name()) - compactMB := actor.NewMailbox(actor.ID(id), 1) - compactRouter.InsertMailbox4Test(compactMB.ID(), compactMB) - compact := NewCompactScheduler(compactRouter, cfg) - cleaner, _, err := NewCleanerActor(id, db, nil, compact, cfg, closedWg) - require.Nil(t, err) - - // Lower compactThreshold to speed up tests. - compact.compactThreshold = 2 - cleaner.wbSize = 1 - - // Put data to db. - // * 1 key of uid1 table1 - // * 2 key of uid2 table1 - data := [][]int{ - {1, 1, 1}, - {2, 2, 1}, - } - prepareData(t, db, data) - - // Empty task must not trigger compact. - closed := !cleaner.Poll(ctx, makeCleanTask(0, 0)) - require.False(t, closed) - _, ok := compactMB.Receive() - require.False(t, ok) - - // Delete 2 keys must trigger compact. - closed = !cleaner.Poll(ctx, makeCleanTask(2, 1)) - require.False(t, closed) - _, ok = compactMB.Receive() - require.True(t, ok) - - // Delete 1 key must not trigger compact. - closed = !cleaner.Poll(ctx, makeCleanTask(1, 1)) - require.False(t, closed) - _, ok = compactMB.Receive() - require.False(t, ok) - - // Close db. - closed = !cleaner.Poll(ctx, []actormsg.Message{actormsg.StopMessage()}) - require.True(t, closed) - closedWg.Wait() - require.Nil(t, db.Close()) -} diff --git a/cdc/sorter/leveldb/compactor.go b/cdc/sorter/leveldb/compactor.go index cc211572817..f9fae191f61 100644 --- a/cdc/sorter/leveldb/compactor.go +++ b/cdc/sorter/leveldb/compactor.go @@ -16,11 +16,13 @@ package leveldb import ( "bytes" "context" + "math/rand" "strconv" "sync" "time" "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/sorter/leveldb/message" "github.com/pingcap/tiflow/pkg/actor" actormsg "github.com/pingcap/tiflow/pkg/actor/message" "github.com/pingcap/tiflow/pkg/config" @@ -30,11 +32,45 @@ import ( "go.uber.org/zap" ) +type deleteThrottle struct { + count int + nextTime time.Time + rnd *rand.Rand + + // The number of delete keys that triggers delete. + countThreshold int + period time.Duration +} + +func (d *deleteThrottle) reset(now time.Time) { + // Randomize next time to avoid thundering herd problem. + randFactor := d.rnd.Int63n(int64(d.period)) + period := d.period + time.Duration(randFactor) + d.nextTime = now.Add(period) + d.count = 0 +} + +func (d *deleteThrottle) trigger(count int, now time.Time) bool { + if d.rnd == nil { + // Init rnd. + d.rnd = rand.New(rand.NewSource(rand.Int63())) + d.reset(now) + } + d.count += count + if d.count >= d.countThreshold || now.After(d.nextTime) { + // Throttle is triggered, reset before returning true. + d.reset(now) + return true + } + return false +} + // CompactActor is an actor that compacts db. // It GCs delete kv entries and reclaim disk space. type CompactActor struct { id actor.ID db db.DB + delete deleteThrottle closedWg *sync.WaitGroup metricCompactDuration prometheus.Observer @@ -44,7 +80,7 @@ var _ actor.Actor = (*CompactActor)(nil) // NewCompactActor returns a compactor actor. func NewCompactActor( - id int, db db.DB, wg *sync.WaitGroup, captureAddr string, + id int, db db.DB, wg *sync.WaitGroup, cfg *config.DBConfig, captureAddr string, ) (*CompactActor, actor.Mailbox, error) { wg.Add(1) idTag := strconv.Itoa(id) @@ -54,6 +90,10 @@ func NewCompactActor( id: actor.ID(id), db: db, closedWg: wg, + delete: deleteThrottle{ + countThreshold: cfg.CompactionDeletionThreshold, + period: time.Duration(cfg.CompactionPeriod * int(time.Second)), + }, metricCompactDuration: sorterCompactDurationHistogram.WithLabelValues(captureAddr, idTag), }, mb, nil @@ -69,10 +109,12 @@ func (c *CompactActor) Poll(ctx context.Context, tasks []actormsg.Message) bool } // Only compact once for every batch. + count := 0 for pos := range tasks { msg := tasks[pos] switch msg.Tp { - case actormsg.TypeTick: + case actormsg.TypeSorterTask: + count += msg.SorterTask.DeleteReq.Count case actormsg.TypeStop: c.close(nil) return false @@ -81,10 +123,14 @@ func (c *CompactActor) Poll(ctx context.Context, tasks []actormsg.Message) bool } } + now := time.Now() + if !c.delete.trigger(count, now) { + return true + } + // A range that is large enough to cover entire db effectively. // See see sorter/encoding/key.go. start, end := []byte{0x0}, bytes.Repeat([]byte{0xff}, 128) - now := time.Now() if err := c.db.Compact(start, end); err != nil { log.Error("db compact error", zap.Error(err)) } @@ -100,28 +146,26 @@ func (c *CompactActor) close(err error) { } // NewCompactScheduler returns a new compact scheduler. -func NewCompactScheduler( - router *actor.Router, cfg *config.DBConfig, -) *CompactScheduler { - return &CompactScheduler{ - router: router, - compactThreshold: cfg.CompactionDeletionThreshold, - } +func NewCompactScheduler(router *actor.Router) *CompactScheduler { + return &CompactScheduler{router: router} } // CompactScheduler schedules compact tasks to compactors. type CompactScheduler struct { // A router to compactors. router *actor.Router - // The number of delete keys that triggers compact. - compactThreshold int } -func (s *CompactScheduler) maybeCompact(id actor.ID, deleteCount int) bool { - if deleteCount < s.compactThreshold { - return false +// tryScheduleCompact try to schedule a compact task. +// Returns true if it schedules compact task successfully. +func (s *CompactScheduler) tryScheduleCompact(id actor.ID, deleteCount int) bool { + task := message.Task{ + DeleteReq: &message.DeleteRequest{ + // Compactor only needs count. DeleteRange is wrote by db actor. + Count: deleteCount, + }, } - err := s.router.Send(id, actormsg.TickMessage()) + err := s.router.Send(id, actormsg.SorterMessage(task)) // An ongoing compaction may block compactor and cause channel full, // skip send the task as there is a pending task. if err != nil && cerrors.ErrMailboxFull.NotEqual(err) { diff --git a/cdc/sorter/leveldb/compactor_test.go b/cdc/sorter/leveldb/compactor_test.go index 90a755f5979..0f9c5ffaf74 100644 --- a/cdc/sorter/leveldb/compactor_test.go +++ b/cdc/sorter/leveldb/compactor_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/pingcap/tiflow/cdc/sorter/leveldb/message" "github.com/pingcap/tiflow/pkg/actor" actormsg "github.com/pingcap/tiflow/pkg/actor/message" "github.com/pingcap/tiflow/pkg/config" @@ -42,14 +43,40 @@ func TestCompactorPoll(t *testing.T) { cfg := config.GetDefaultServerConfig().Clone().Debug.DB cfg.Count = 1 - db, err := db.OpenLevelDB(ctx, 1, t.TempDir(), cfg) + db, err := db.OpenPebble(ctx, 1, t.TempDir(), 0, cfg) require.Nil(t, err) mockDB := mockCompactDB{DB: db, compact: make(chan struct{}, 1)} closedWg := new(sync.WaitGroup) - compactor, _, err := NewCompactActor(1, &mockDB, closedWg, "") + cfg.CompactionDeletionThreshold = 2 + cfg.CompactionPeriod = 1 + compactor, _, err := NewCompactActor(1, &mockDB, closedWg, cfg, "") require.Nil(t, err) - closed := !compactor.Poll(ctx, []actormsg.Message{actormsg.TickMessage()}) + // Must not trigger compact. + task := message.Task{DeleteReq: &message.DeleteRequest{}} + task.DeleteReq.Count = 0 + closed := !compactor.Poll(ctx, []actormsg.Message{actormsg.SorterMessage(task)}) + require.False(t, closed) + select { + case <-mockDB.compact: + t.Fatal("Must trigger compact") + case <-time.After(500 * time.Millisecond): + } + + // Must trigger compact. + task.DeleteReq.Count = 2 * cfg.CompactionDeletionThreshold + closed = !compactor.Poll(ctx, []actormsg.Message{actormsg.SorterMessage(task)}) + require.False(t, closed) + select { + case <-time.After(5 * time.Second): + t.Fatal("Must trigger compact") + case <-mockDB.compact: + } + + // Must trigger compact. + time.Sleep(time.Duration(cfg.CompactionPeriod) * time.Second * 2) + task.DeleteReq.Count = cfg.CompactionDeletionThreshold / 2 + closed = !compactor.Poll(ctx, []actormsg.Message{actormsg.SorterMessage(task)}) require.False(t, closed) select { case <-time.After(5 * time.Second): @@ -70,14 +97,15 @@ func TestComactorContextCancel(t *testing.T) { cfg := config.GetDefaultServerConfig().Clone().Debug.DB cfg.Count = 1 - db, err := db.OpenLevelDB(ctx, 1, t.TempDir(), cfg) + db, err := db.OpenPebble(ctx, 1, t.TempDir(), 0, cfg) require.Nil(t, err) closedWg := new(sync.WaitGroup) - ldb, _, err := NewCompactActor(0, db, closedWg, "") + ldb, _, err := NewCompactActor(0, db, closedWg, cfg, "") require.Nil(t, err) cancel() - closed := !ldb.Poll(ctx, []actormsg.Message{actormsg.TickMessage()}) + closed := !ldb.Poll( + ctx, []actormsg.Message{actormsg.SorterMessage(message.Task{})}) require.True(t, closed) closedWg.Wait() require.Nil(t, db.Close()) @@ -88,26 +116,35 @@ func TestScheduleCompact(t *testing.T) { router := actor.NewRouter(t.Name()) mb := actor.NewMailbox(actor.ID(1), 1) router.InsertMailbox4Test(mb.ID(), mb) - compact := NewCompactScheduler( - router, config.GetDefaultServerConfig().Debug.DB) - compact.compactThreshold = 2 + compact := NewCompactScheduler(router) - // Too few deletion, should not trigger compact. - require.False(t, compact.maybeCompact(mb.ID(), 1)) - _, ok := mb.Receive() - require.False(t, ok) - // Must trigger compact. - require.True(t, compact.maybeCompact(mb.ID(), 3)) + // Must schedule successfully. + require.True(t, compact.tryScheduleCompact(mb.ID(), 3)) msg, ok := mb.Receive() require.True(t, ok) - require.EqualValues(t, actormsg.TickMessage(), msg) + task := message.Task{DeleteReq: &message.DeleteRequest{}} + task.DeleteReq.Count = 3 + require.EqualValues(t, actormsg.SorterMessage(task), msg) // Skip sending unnecessary tasks. - require.True(t, compact.maybeCompact(mb.ID(), 3)) - require.True(t, compact.maybeCompact(mb.ID(), 3)) + require.True(t, compact.tryScheduleCompact(mb.ID(), 3)) + require.True(t, compact.tryScheduleCompact(mb.ID(), 3)) msg, ok = mb.Receive() require.True(t, ok) - require.EqualValues(t, actormsg.TickMessage(), msg) + require.EqualValues(t, actormsg.SorterMessage(task), msg) _, ok = mb.Receive() require.False(t, ok) } + +func TestDeleteThrottle(t *testing.T) { + t.Parallel() + dt := deleteThrottle{ + countThreshold: 2, + period: 1 * time.Second, + } + + require.False(t, dt.trigger(1, time.Now())) + require.True(t, dt.trigger(3, time.Now())) + time.Sleep(2 * dt.period) + require.True(t, dt.trigger(0, time.Now())) +} diff --git a/cdc/sorter/leveldb/leveldb.go b/cdc/sorter/leveldb/leveldb.go index dce80ef12f3..8a5b7ae51fe 100644 --- a/cdc/sorter/leveldb/leveldb.go +++ b/cdc/sorter/leveldb/leveldb.go @@ -138,6 +138,14 @@ func (ldb *DBActor) close(err error) { ldb.closedWg.Done() } +func (ldb *DBActor) tryScheduleCompact() { + // Schedule a compact task when there are too many deletion. + if ldb.compact.tryScheduleCompact(ldb.id, ldb.deleteCount) { + // Reset delete key count if schedule compaction successfully. + ldb.deleteCount = 0 + } +} + func (ldb *DBActor) maybeWrite(force bool) error { bytes := len(ldb.wb.Repr()) if bytes >= ldb.wbSize || (force && bytes != 0) { @@ -155,12 +163,6 @@ func (ldb *DBActor) maybeWrite(force bool) error { } else { ldb.wb = ldb.db.Batch(ldb.wbCap) } - - // Schedule a compact task when there are too many deletion. - if ldb.compact.maybeCompact(ldb.id, ldb.deleteCount) { - // Reset delete key count if schedule compaction successfully. - ldb.deleteCount = 0 - } } return nil } @@ -215,7 +217,7 @@ func (ldb *DBActor) Poll(ctx context.Context, tasks []actormsg.Message) bool { log.Panic("unexpected message", zap.Any("message", msg)) } - for k, v := range task.Events { + for k, v := range task.WriteReq { if len(v) != 0 { ldb.wb.Put([]byte(k), v) } else { @@ -229,6 +231,22 @@ func (ldb *DBActor) Poll(ctx context.Context, tasks []actormsg.Message) bool { log.Panic("db error", zap.Error(err)) } } + if task.DeleteReq != nil { + ldb.deleteCount += task.DeleteReq.Count + if len(task.DeleteReq.Range[0]) != 0 && len(task.DeleteReq.Range[1]) != 0 { + // Force write pending write batch before delete range. + if err := ldb.maybeWrite(true); err != nil { + log.Panic("db error", + zap.Error(err), zap.Uint64("tableID", task.TableID)) + } + start, end := task.DeleteReq.Range[0], task.DeleteReq.Range[1] + if err := ldb.db.DeleteRange(start, end); err != nil { + log.Panic("db error", + zap.Error(err), zap.Uint64("tableID", task.TableID)) + } + ldb.tryScheduleCompact() + } + } if task.IterReq != nil { // Append to slice for later batch acquiring iterators. ldb.iterQ.push(task.UID, task.TableID, task.IterReq) diff --git a/cdc/sorter/leveldb/leveldb_test.go b/cdc/sorter/leveldb/leveldb_test.go index b06847e6d53..afc3ffc67a6 100644 --- a/cdc/sorter/leveldb/leveldb_test.go +++ b/cdc/sorter/leveldb/leveldb_test.go @@ -46,10 +46,10 @@ func TestMaybeWrite(t *testing.T) { cfg := config.GetDefaultServerConfig().Clone().Debug.DB cfg.Count = 1 - db, err := db.OpenLevelDB(ctx, 1, t.TempDir(), cfg) + db, err := db.OpenPebble(ctx, 1, t.TempDir(), 0, cfg) require.Nil(t, err) closedWg := new(sync.WaitGroup) - compact := NewCompactScheduler(actor.NewRouter(t.Name()), cfg) + compact := NewCompactScheduler(actor.NewRouter(t.Name())) ldb, _, err := NewDBActor(0, db, cfg, compact, closedWg, "") require.Nil(t, err) @@ -92,53 +92,53 @@ func TestCompact(t *testing.T) { cfg.Count = 1 id := 1 - db, err := db.OpenLevelDB(ctx, id, t.TempDir(), cfg) + db, err := db.OpenPebble(ctx, id, t.TempDir(), 0, cfg) require.Nil(t, err) closedWg := new(sync.WaitGroup) compactRouter := actor.NewRouter(t.Name()) compactMB := actor.NewMailbox(actor.ID(id), 1) compactRouter.InsertMailbox4Test(compactMB.ID(), compactMB) - compact := NewCompactScheduler(compactRouter, cfg) + compact := NewCompactScheduler(compactRouter) ldb, _, err := NewDBActor(id, db, cfg, compact, closedWg, "") require.Nil(t, err) - // Lower compactThreshold to speed up tests. - compact.compactThreshold = 2 - // Empty task must not trigger compact. task, iterCh := makeTask(make(map[message.Key][]byte), [][]byte{{0x00}, {0xff}}) - closed := !ldb.Poll(ctx, task) - require.False(t, closed) - <-iterCh + require.True(t, ldb.Poll(ctx, task)) + iter := <-iterCh + iter.Release() _, ok := compactMB.Receive() require.False(t, ok) - // Delete 3 keys must trigger compact. - dels := map[message.Key][]byte{"a": {}, "b": {}, "c": {}} - task, iterCh = makeTask(dels, [][]byte{{0x00}, {0xff}}) - closed = !ldb.Poll(ctx, task) - require.False(t, closed) - <-iterCh + // Empty delete range task must not trigger compact. + task = makeDelTask([2][]byte{}, 0) + require.True(t, ldb.Poll(ctx, task)) _, ok = compactMB.Receive() - require.True(t, ok) + require.False(t, ok) - // Delete 1 key must not trigger compact. - dels = map[message.Key][]byte{"a": {}} - task, iterCh = makeTask(dels, [][]byte{{0x00}, {0xff}}) - closed = !ldb.Poll(ctx, task) - require.False(t, closed) - <-iterCh + // A valid delete range task must trigger compact. + task = makeDelTask([2][]byte{{0x00}, {0xff}}, 3) + require.True(t, ldb.Poll(ctx, task)) _, ok = compactMB.Receive() - require.False(t, ok) + require.True(t, ok) // Close db. - closed = !ldb.Poll(ctx, []actormsg.Message{actormsg.StopMessage()}) + closed := !ldb.Poll(ctx, []actormsg.Message{actormsg.StopMessage()}) require.True(t, closed) closedWg.Wait() require.Nil(t, db.Close()) } -func makeTask(events map[message.Key][]byte, rg [][]byte) ([]actormsg.Message, chan *message.LimitedIterator) { +func makeDelTask(delRange [2][]byte, count int) []actormsg.Message { + return []actormsg.Message{actormsg.SorterMessage(message.Task{ + DeleteReq: &message.DeleteRequest{ + Range: delRange, + Count: count, + }, + })} +} + +func makeTask(writes map[message.Key][]byte, rg [][]byte) ([]actormsg.Message, chan *message.LimitedIterator) { var iterReq *message.IterRequest var iterCh chan *message.LimitedIterator if len(rg) != 0 { @@ -149,8 +149,8 @@ func makeTask(events map[message.Key][]byte, rg [][]byte) ([]actormsg.Message, c } } return []actormsg.Message{actormsg.SorterMessage(message.Task{ - Events: events, - IterReq: iterReq, + WriteReq: writes, + IterReq: iterReq, })}, iterCh } @@ -161,15 +161,15 @@ func TestPutReadDelete(t *testing.T) { cfg := config.GetDefaultServerConfig().Clone().Debug.DB cfg.Count = 1 - db, err := db.OpenLevelDB(ctx, 1, t.TempDir(), cfg) + db, err := db.OpenPebble(ctx, 1, t.TempDir(), 0, cfg) require.Nil(t, err) closedWg := new(sync.WaitGroup) - compact := NewCompactScheduler(actor.NewRouter(t.Name()), cfg) + compact := NewCompactScheduler(actor.NewRouter(t.Name())) ldb, _, err := NewDBActor(0, db, cfg, compact, closedWg, "") require.Nil(t, err) // Put only. - tasks, iterCh := makeTask(map[message.Key][]byte{"key": {}}, nil) + tasks, iterCh := makeTask(map[message.Key][]byte{"key": []byte("value")}, nil) require.Nil(t, iterCh) closed := !ldb.Poll(ctx, tasks) require.False(t, closed) @@ -204,7 +204,9 @@ func TestPutReadDelete(t *testing.T) { require.Nil(t, iter.Release()) // Delete and read. - tasks, iterCh = makeTask(map[message.Key][]byte{"key": {}}, [][]byte{{0x00}, {0xff}}) + tasks = makeDelTask([2][]byte{{0x00}, {0xff}}, 0) + iterTasks, iterCh := makeTask(make(map[message.Key][]byte), [][]byte{{0x00}, {0xff}}) + tasks = append(tasks, iterTasks...) closed = !ldb.Poll(ctx, tasks) require.False(t, closed) iter, ok = <-iterCh @@ -226,13 +228,13 @@ func TestAcquireIterators(t *testing.T) { cfg := config.GetDefaultServerConfig().Clone().Debug.DB cfg.Count = 1 - db, err := db.OpenLevelDB(ctx, 1, t.TempDir(), cfg) + db, err := db.OpenPebble(ctx, 1, t.TempDir(), 0, cfg) require.Nil(t, err) closedWg := new(sync.WaitGroup) // Set max iterator count to 1. cfg.Concurrency = 1 - compact := NewCompactScheduler(actor.NewRouter(t.Name()), cfg) + compact := NewCompactScheduler(actor.NewRouter(t.Name())) ldb, _, err := NewDBActor(0, db, cfg, compact, closedWg, "") require.Nil(t, err) @@ -314,10 +316,10 @@ func TestModelChecking(t *testing.T) { cfg := config.GetDefaultServerConfig().Clone().Debug.DB cfg.Count = 1 - db, err := db.OpenLevelDB(ctx, 1, t.TempDir(), cfg) + db, err := db.OpenPebble(ctx, 1, t.TempDir(), 0, cfg) require.Nil(t, err) closedWg := new(sync.WaitGroup) - compact := NewCompactScheduler(actor.NewRouter(t.Name()), cfg) + compact := NewCompactScheduler(actor.NewRouter(t.Name())) ldb, _, err := NewDBActor(0, db, cfg, compact, closedWg, "") require.Nil(t, err) @@ -402,10 +404,10 @@ func TestContextCancel(t *testing.T) { cfg := config.GetDefaultServerConfig().Clone().Debug.DB cfg.Count = 1 - db, err := db.OpenLevelDB(ctx, 1, t.TempDir(), cfg) + db, err := db.OpenPebble(ctx, 1, t.TempDir(), 0, cfg) require.Nil(t, err) closedWg := new(sync.WaitGroup) - compact := NewCompactScheduler(actor.NewRouter(t.Name()), cfg) + compact := NewCompactScheduler(actor.NewRouter(t.Name())) ldb, _, err := NewDBActor(0, db, cfg, compact, closedWg, "") require.Nil(t, err) diff --git a/cdc/sorter/leveldb/message/task.go b/cdc/sorter/leveldb/message/task.go index 319d85f1e70..cf99fa7d321 100644 --- a/cdc/sorter/leveldb/message/task.go +++ b/cdc/sorter/leveldb/message/task.go @@ -27,15 +27,19 @@ type Task struct { UID uint32 TableID uint64 - // encoded key -> serde.marshal(event) - // If a value is empty, it deletes the key/value entry in db. - Events map[Key][]byte + // A batch of events (bytes encoded) need to be wrote. + WriteReq map[Key][]byte // Requests an iterator when it is not nil. IterReq *IterRequest + // Deletes all of the key-values in the range. + DeleteReq *DeleteRequest +} - // For clean-up table task. - Cleanup bool - CleanupRatelimited bool +// DeleteRequest a request to delete range. +type DeleteRequest struct { + Range [2][]byte + // Approximately key value pairs in the range. + Count int } // IterRequest contains parameters that necessary to build an iterator. @@ -74,12 +78,3 @@ func (s *LimitedIterator) Release() error { s.Sema.Release(1) return errors.Trace(s.Iterator.Release()) } - -// NewCleanupTask returns a clean up task to clean up table data. -func NewCleanupTask(uid uint32, tableID uint64) Task { - return Task{ - TableID: tableID, - UID: uid, - Cleanup: true, - } -} diff --git a/cdc/sorter/leveldb/message/task_test.go b/cdc/sorter/leveldb/message/task_test.go index d341764f16f..3ae0722a676 100644 --- a/cdc/sorter/leveldb/message/task_test.go +++ b/cdc/sorter/leveldb/message/task_test.go @@ -35,11 +35,3 @@ func TestPrint(t *testing.T) { require.Equal(t, "uid: 1, tableID: 2, startTs: 0, CRTs: 3", Key(encoding.EncodeTsKey(1, 2, 3)).String()) } - -func TestNewCleanupTask(t *testing.T) { - t.Parallel() - task := NewCleanupTask(1, 2) - require.True(t, task.Cleanup) - require.EqualValues(t, 1, task.UID) - require.EqualValues(t, 2, task.TableID) -} diff --git a/cdc/sorter/leveldb/system/system.go b/cdc/sorter/leveldb/system/system.go index b55445c9267..739be94e44c 100644 --- a/cdc/sorter/leveldb/system/system.go +++ b/cdc/sorter/leveldb/system/system.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/util/memory" lsorter "github.com/pingcap/tiflow/cdc/sorter/leveldb" "github.com/pingcap/tiflow/pkg/actor" "github.com/pingcap/tiflow/pkg/actor/message" @@ -47,13 +48,12 @@ const ( type System struct { dbs []db.DB dbSystem *actor.System - dbRouter *actor.Router - cleanSystem *actor.System - cleanRouter *actor.Router + DBRouter *actor.Router compactSystem *actor.System compactRouter *actor.Router compactSched *lsorter.CompactScheduler dir string + memPercentage float64 cfg *config.DBConfig closedCh chan struct{} closedWg *sync.WaitGroup @@ -63,23 +63,20 @@ type System struct { } // NewSystem returns a system. -func NewSystem(dir string, cfg *config.DBConfig) *System { - dbSystem, dbRouter := actor.NewSystemBuilder("sorter"). +func NewSystem(dir string, memPercentage float64, cfg *config.DBConfig) *System { + dbSystem, dbRouter := actor.NewSystemBuilder("sorter-db"). WorkerNumber(cfg.Count).Build() - cleanSystem, cleanRouter := actor.NewSystemBuilder("cleaner"). + compactSystem, compactRouter := actor.NewSystemBuilder("sorter-compactor"). WorkerNumber(cfg.Count).Build() - compactSystem, compactRouter := actor.NewSystemBuilder("compactor"). - WorkerNumber(cfg.Count).Build() - compactSched := lsorter.NewCompactScheduler(compactRouter, cfg) + compactSched := lsorter.NewCompactScheduler(compactRouter) return &System{ dbSystem: dbSystem, - dbRouter: dbRouter, - cleanSystem: cleanSystem, - cleanRouter: cleanRouter, + DBRouter: dbRouter, compactSystem: compactSystem, compactRouter: compactRouter, compactSched: compactSched, dir: dir, + memPercentage: memPercentage, cfg: cfg, closedCh: make(chan struct{}), closedWg: new(sync.WaitGroup), @@ -99,12 +96,7 @@ func (s *System) ActorID(tableID uint64) actor.ID { // Router returns db actors router. func (s *System) Router() *actor.Router { - return s.dbRouter -} - -// CleanerRouter returns cleaner actors router. -func (s *System) CleanerRouter() *actor.Router { - return s.cleanRouter + return s.DBRouter } // CompactScheduler returns compaction scheduler. @@ -139,19 +131,22 @@ func (s *System) Start(ctx context.Context) error { s.compactSystem.Start(ctx) s.dbSystem.Start(ctx) - s.cleanSystem.Start(ctx) captureAddr := config.GetGlobalServerConfig().AdvertiseAddr - dbCount := s.cfg.Count - for id := 0; id < dbCount; id++ { + totalMemory, err := memory.MemTotal() + if err != nil { + return errors.Trace(err) + } + memInBytePerDB := float64(totalMemory) * s.memPercentage / float64(s.cfg.Count) + for id := 0; id < s.cfg.Count; id++ { // Open db. - db, err := db.OpenPebble(ctx, id, s.dir, s.cfg) + db, err := db.OpenPebble(ctx, id, s.dir, int(memInBytePerDB), s.cfg) if err != nil { return errors.Trace(err) } s.dbs = append(s.dbs, db) // Create and spawn compactor actor. compactor, cmb, err := - lsorter.NewCompactActor(id, db, s.closedWg, captureAddr) + lsorter.NewCompactActor(id, db, s.closedWg, s.cfg, captureAddr) if err != nil { return errors.Trace(err) } @@ -169,16 +164,6 @@ func (s *System) Start(ctx context.Context) error { if err != nil { return errors.Trace(err) } - // Create and spawn cleaner actor. - clac, clmb, err := lsorter.NewCleanerActor( - id, db, s.cleanRouter, s.compactSched, s.cfg, s.closedWg) - if err != nil { - return errors.Trace(err) - } - err = s.cleanSystem.Spawn(clmb, clac) - if err != nil { - return errors.Trace(err) - } } s.closedWg.Add(1) go func() { @@ -219,8 +204,7 @@ func (s *System) Stop() error { ctx, cancel := context.WithDeadline(context.Background(), deadline) defer cancel() // Close actors - s.broadcast(ctx, s.dbRouter, message.StopMessage()) - s.broadcast(ctx, s.cleanRouter, message.StopMessage()) + s.broadcast(ctx, s.DBRouter, message.StopMessage()) s.broadcast(ctx, s.compactRouter, message.StopMessage()) // Close metrics goroutine. close(s.closedCh) @@ -232,10 +216,6 @@ func (s *System) Stop() error { if err != nil { return errors.Trace(err) } - err = s.cleanSystem.Stop() - if err != nil { - return errors.Trace(err) - } err = s.compactSystem.Stop() if err != nil { return errors.Trace(err) diff --git a/cdc/sorter/leveldb/system/system_test.go b/cdc/sorter/leveldb/system/system_test.go index ecc4d941714..a5c53e2a0a1 100644 --- a/cdc/sorter/leveldb/system/system_test.go +++ b/cdc/sorter/leveldb/system/system_test.go @@ -27,7 +27,7 @@ func TestSystemStartStop(t *testing.T) { cfg := config.GetDefaultServerConfig().Clone().Debug.DB cfg.Count = 1 - sys := NewSystem(t.TempDir(), cfg) + sys := NewSystem(t.TempDir(), 1, cfg) require.Nil(t, sys.Start(ctx)) require.Nil(t, sys.Stop()) @@ -42,7 +42,7 @@ func TestSystemStopUnstarted(t *testing.T) { cfg := config.GetDefaultServerConfig().Clone().Debug.DB cfg.Count = 1 - sys := NewSystem(t.TempDir(), cfg) + sys := NewSystem(t.TempDir(), 1, cfg) require.Nil(t, sys.Stop()) } @@ -52,19 +52,19 @@ func TestCollectMetrics(t *testing.T) { cfg := config.GetDefaultServerConfig().Clone().Debug.DB cfg.Count = 2 - sys := NewSystem(t.TempDir(), cfg) + sys := NewSystem(t.TempDir(), 1, cfg) require.Nil(t, sys.Start(ctx)) collectMetrics(sys.dbs, "") require.Nil(t, sys.Stop()) } -func TestActorID(t *testing.T) { +func TestDBActorID(t *testing.T) { t.Parallel() ctx := context.Background() cfg := config.GetDefaultServerConfig().Clone().Debug.DB cfg.Count = 2 - sys := NewSystem(t.TempDir(), cfg) + sys := NewSystem(t.TempDir(), 1, cfg) require.Nil(t, sys.Start(ctx)) id1 := sys.ActorID(1) id2 := sys.ActorID(1) diff --git a/cdc/sorter/leveldb/table_sorter.go b/cdc/sorter/leveldb/table_sorter.go index d11f0df917c..4aae9271fd5 100644 --- a/cdc/sorter/leveldb/table_sorter.go +++ b/cdc/sorter/leveldb/table_sorter.go @@ -252,9 +252,9 @@ func (ls *Sorter) buildTask( } return message.Task{ - UID: ls.uid, - TableID: ls.tableID, - Events: writes, + UID: ls.uid, + TableID: ls.tableID, + WriteReq: writes, }, nil } @@ -542,12 +542,12 @@ func (state *pollState) tryGetIterator( state.iterAliveTime = start state.iterResolvedTs = iter.ResolvedTs state.iterHasRead = false - state.iter.First() + state.iter.Seek([]byte("")) duration := time.Since(start) state.metricIterFirst.Observe(duration.Seconds()) if duration >= state.iterFirstSlowDuration { // Force trigger a compaction if Iterator.Fisrt is too slow. - state.compact.maybeCompact(state.actorID, int(math.MaxInt32)) + state.compact.tryScheduleCompact(state.actorID, int(math.MaxInt32)) } return nil, true default: @@ -717,7 +717,18 @@ func (ls *Sorter) Output() <-chan *model.PolymorphicEvent { return ls.outputCh } -// CleanupTask returns a clean up task that delete sorter's data. -func (ls *Sorter) CleanupTask() actormsg.Message { - return actormsg.SorterMessage(message.NewCleanupTask(ls.uid, ls.tableID)) +// CleanupFunc returns a function that cleans up sorter's data. +func (ls *Sorter) CleanupFunc() func(context.Context) error { + return func(ctx context.Context) error { + task := message.Task{UID: ls.uid, TableID: ls.tableID} + task.DeleteReq = &message.DeleteRequest{ + // We do not set task.Delete.Count, because we don't know + // how many key-value pairs in the range. + Range: [2][]byte{ + encoding.EncodeTsKey(ls.uid, ls.tableID, 0), + encoding.EncodeTsKey(ls.uid, ls.tableID+1, 0), + }, + } + return ls.router.SendB(ctx, ls.actorID, actormsg.SorterMessage(task)) + } } diff --git a/cdc/sorter/leveldb/table_sorter_test.go b/cdc/sorter/leveldb/table_sorter_test.go index f38cfdc6b10..92769b68580 100644 --- a/cdc/sorter/leveldb/table_sorter_test.go +++ b/cdc/sorter/leveldb/table_sorter_test.go @@ -39,7 +39,7 @@ func newTestSorter( mb := actor.NewMailbox(1, capacity) router.InsertMailbox4Test(id, mb) cfg := config.GetDefaultServerConfig().Clone().Debug.DB - compact := NewCompactScheduler(nil, cfg) + compact := NewCompactScheduler(nil) ls := NewSorter(ctx, 1, 1, router, id, compact, cfg) return ls, mb } @@ -275,11 +275,9 @@ func TestBuildTask(t *testing.T) { expectedEvents[key] = []byte{} } require.EqualValues(t, message.Task{ - UID: ls.uid, - TableID: ls.tableID, - Events: expectedEvents, - Cleanup: false, - CleanupRatelimited: false, + UID: ls.uid, + TableID: ls.tableID, + WriteReq: expectedEvents, }, task, "case #%d, %v", i, cs) } } @@ -673,7 +671,7 @@ func TestOutputIterEvents(t *testing.T) { iter := db.Iterator( encoding.EncodeTsKey(ls.uid, ls.tableID, 0), encoding.EncodeTsKey(ls.uid, ls.tableID, cs.maxResolvedTs+1)) - iter.First() + iter.Seek([]byte("")) require.Nil(t, iter.Error(), "case #%d, %v", i, cs) hasReadLastNext, exhaustedRTs, err := ls.outputIterEvents(iter, cs.hasReadNext, buf, cs.maxResolvedTs) @@ -706,14 +704,13 @@ func TestStateIterator(t *testing.T) { sema := semaphore.NewWeighted(1) metricIterDuration := sorterIterReadDurationHistogram.MustCurryWith( prometheus.Labels{"capture": t.Name(), "id": t.Name()}) - cfg := config.GetDefaultServerConfig().Clone().Debug.DB mb := actor.NewMailbox(1, 1) router := actor.NewRouter(t.Name()) router.InsertMailbox4Test(mb.ID(), mb) state := pollState{ actorID: mb.ID(), iterFirstSlowDuration: 100 * time.Second, - compact: NewCompactScheduler(router, cfg), + compact: NewCompactScheduler(router), iterMaxAliveDuration: 100 * time.Millisecond, metricIterFirst: metricIterDuration.WithLabelValues("first"), metricIterRelease: metricIterDuration.WithLabelValues("release"), @@ -755,7 +752,7 @@ func TestStateIterator(t *testing.T) { Iterator: db.Iterator([]byte{}, []byte{0xff}), Sema: sema, } - require.True(t, state.iter.First()) + require.True(t, state.iter.Seek([]byte(""))) state.iterAliveTime = time.Now() time.Sleep(2 * state.iterMaxAliveDuration) require.Nil(t, state.tryReleaseIterator()) diff --git a/cdc/sorter/unified/backend_pool.go b/cdc/sorter/unified/backend_pool.go index 703882ab103..b7c5e301aae 100644 --- a/cdc/sorter/unified/backend_pool.go +++ b/cdc/sorter/unified/backend_pool.go @@ -165,7 +165,7 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) { func (p *backEndPool) alloc(ctx context.Context) (backEnd, error) { sorterConfig := config.GetGlobalServerConfig().Sorter if p.sorterMemoryUsage() < int64(sorterConfig.MaxMemoryConsumption) && - p.memoryPressure() < int32(sorterConfig.MaxMemoryPressure) { + p.memoryPressure() < int32(sorterConfig.MaxMemoryPercentage) { ret := newMemoryBackEnd() return ret, nil diff --git a/cdc/sorter/unified/backend_pool_test.go b/cdc/sorter/unified/backend_pool_test.go index 5f2c71d0afb..4e9d30c89c2 100644 --- a/cdc/sorter/unified/backend_pool_test.go +++ b/cdc/sorter/unified/backend_pool_test.go @@ -47,7 +47,7 @@ func (s *backendPoolSuite) TestBasicFunction(c *check.C) { conf := config.GetDefaultServerConfig() conf.DataDir = dataDir conf.Sorter.SortDir = sortDir - conf.Sorter.MaxMemoryPressure = 90 // 90% + conf.Sorter.MaxMemoryPercentage = 90 // 90% conf.Sorter.MaxMemoryConsumption = 16 * 1024 * 1024 * 1024 // 16G config.StoreGlobalServerConfig(conf) @@ -123,7 +123,7 @@ func (s *backendPoolSuite) TestDirectoryBadPermission(c *check.C) { conf := config.GetGlobalServerConfig() conf.DataDir = dataDir conf.Sorter.SortDir = sortDir - conf.Sorter.MaxMemoryPressure = 0 // force using files + conf.Sorter.MaxMemoryPercentage = 0 // force using files backEndPool, err := newBackEndPool(sortDir, "") c.Assert(err, check.IsNil) @@ -157,7 +157,7 @@ func (s *backendPoolSuite) TestCleanUpSelf(c *check.C) { conf := config.GetDefaultServerConfig() conf.DataDir = dataDir conf.Sorter.SortDir = sorterDir - conf.Sorter.MaxMemoryPressure = 90 // 90% + conf.Sorter.MaxMemoryPercentage = 90 // 90% conf.Sorter.MaxMemoryConsumption = 16 * 1024 * 1024 * 1024 // 16G config.StoreGlobalServerConfig(conf) diff --git a/cdc/sorter/unified/sorter_test.go b/cdc/sorter/unified/sorter_test.go index 7905763a188..df24eab2513 100644 --- a/cdc/sorter/unified/sorter_test.go +++ b/cdc/sorter/unified/sorter_test.go @@ -67,7 +67,7 @@ func (s *sorterSuite) TestSorterBasic(c *check.C) { conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, ChunkSizeLimit: 1 * 1024 * 1024 * 1024, - MaxMemoryPressure: 60, + MaxMemoryPercentage: 60, MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, NumWorkerPoolGoroutine: 4, SortDir: sortDir, @@ -95,7 +95,7 @@ func (s *sorterSuite) TestSorterCancel(c *check.C) { conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, ChunkSizeLimit: 1 * 1024 * 1024 * 1024, - MaxMemoryPressure: 60, + MaxMemoryPercentage: 60, MaxMemoryConsumption: 0, NumWorkerPoolGoroutine: 4, SortDir: sortDir, @@ -268,7 +268,7 @@ func (s *sorterSuite) TestSorterCancelRestart(c *check.C) { conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, ChunkSizeLimit: 1 * 1024 * 1024 * 1024, - MaxMemoryPressure: 0, // disable memory sort + MaxMemoryPercentage: 0, // disable memory sort MaxMemoryConsumption: 0, NumWorkerPoolGoroutine: 4, SortDir: sortDir, @@ -312,7 +312,7 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) { conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, ChunkSizeLimit: 1 * 1024 * 1024 * 1024, - MaxMemoryPressure: 60, + MaxMemoryPercentage: 60, MaxMemoryConsumption: 0, NumWorkerPoolGoroutine: 4, SortDir: sortDir, @@ -389,7 +389,7 @@ func (s *sorterSuite) TestSorterErrorReportCorrect(c *check.C) { conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, ChunkSizeLimit: 1 * 1024 * 1024 * 1024, - MaxMemoryPressure: 60, + MaxMemoryPercentage: 60, MaxMemoryConsumption: 0, NumWorkerPoolGoroutine: 4, SortDir: sortDir, diff --git a/errors.toml b/errors.toml index 458ca7528ff..1c6ff1085c3 100755 --- a/errors.toml +++ b/errors.toml @@ -1021,6 +1021,11 @@ error = ''' unified sorter IO error. Make sure your sort-dir is configured correctly by passing a valid argument or toml file to `cdc server`, or if you use TiUP, review the settings in `tiup cluster edit-config`. Details: %s ''' +["CDC:ErrUnimplemented"] +error = ''' +unimplemented %s +''' + ["CDC:ErrUnknownKVEventType"] error = ''' unknown kv optype: %s, entry: %v diff --git a/go.mod b/go.mod index 7666904d5c0..58a4fcab58a 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/cenkalti/backoff/v4 v4.0.2 github.com/chaos-mesh/go-sqlsmith v0.0.0-20211025024535-03ae33408684 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e - github.com/cockroachdb/pebble v0.0.0-20210719141320-8c3bd06debb5 + github.com/cockroachdb/pebble v0.0.0-20211124172904-3ca75111760c github.com/coreos/go-semver v0.3.0 github.com/davecgh/go-spew v1.1.1 github.com/deepmap/oapi-codegen v1.9.0 diff --git a/go.sum b/go.sum index bd351ebeaaf..ee42888fbda 100644 --- a/go.sum +++ b/go.sum @@ -183,8 +183,9 @@ github.com/cockroachdb/errors v1.8.1 h1:A5+txlVZfOqFBDa4mGz2bUWSp0aHElvHX2bKkdbQ github.com/cockroachdb/errors v1.8.1/go.mod h1:qGwQn6JmZ+oMjuLwjWzUNqblqk0xl4CVV3SQbGwK7Ac= github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f h1:o/kfcElHqOiXqcou5a3rIlMc7oJbMQkeLk0VQJ7zgqY= github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI= -github.com/cockroachdb/pebble v0.0.0-20210719141320-8c3bd06debb5 h1:Igd6YmtOZ77EgLAIaE9+mHl7+sAKaZ5m4iMI0Dz/J2A= github.com/cockroachdb/pebble v0.0.0-20210719141320-8c3bd06debb5/go.mod h1:JXfQr3d+XO4bL1pxGwKKo09xylQSdZ/mpZ9b2wfVcPs= +github.com/cockroachdb/pebble v0.0.0-20211124172904-3ca75111760c h1:ZtrQD4SC7rV+b1apbqCt9pW23DU2KMRPNk8T+YiezPU= +github.com/cockroachdb/pebble v0.0.0-20211124172904-3ca75111760c/go.mod h1:JXfQr3d+XO4bL1pxGwKKo09xylQSdZ/mpZ9b2wfVcPs= github.com/cockroachdb/redact v1.0.8 h1:8QG/764wK+vmEYoOlfobpe12EQcS81ukx/a4hdVMxNw= github.com/cockroachdb/redact v1.0.8/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 h1:IKgmqgMQlVJIZj19CdocBeSfSaiCbEBZGKODaixqtHM= diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index 9e08e9a549c..91a9f74118f 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -4952,7 +4952,12 @@ "pointradius": 2, "points": false, "renderer": "flot", - "seriesOverrides": [], + "seriesOverrides": [ + { + "alias": "/cache-hit.*/", + "yaxis": 2 + } + ], "spaceLength": 10, "stack": false, "steppedLine": false, @@ -4963,6 +4968,13 @@ "intervalFactor": 1, "legendFormat": "{{capture}}", "refId": "A" + }, + { + "expr": "sum(ticdc_db_block_cache_access_total{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\", type=\"hit\"}) by (capture) / sum(ticdc_db_block_cache_access_total{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}) by (capture)", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "cache-hit-{{capture}}", + "refId": "B" } ], "thresholds": [], @@ -4993,11 +5005,11 @@ "show": true }, { - "format": "short", + "format": "percentunit", "label": null, "logBase": 1, "max": null, - "min": null, + "min": "0", "show": true } ], @@ -5356,18 +5368,11 @@ "steppedLine": false, "targets": [ { - "expr": "sum(rate(ticdc_actor_worker_cpu_seconds_total{tidb_cluster=\"$tidb_cluster\", instance=~\"$capture\", name=~\"sorter|cleaner|compactor\"}[1m])) by (name, instance)", + "expr": "sum(rate(ticdc_actor_worker_cpu_seconds_total{tidb_cluster=\"$tidb_cluster\", instance=~\"$capture\", name=~\"sorter.*\"}[1m])) by (name, instance)", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{instance}}-{{name}}", "refId": "A" - }, - { - "expr": "sum(rate(ticdc_actor_worker_cpu_seconds_total{tidb_cluster=\"$tidb_cluster\", instance=~\"$capture\", name=\"sorter\"}[1m])) by (name, id, instance)", - "format": "time_series", - "intervalFactor": 1, - "legendFormat": "{{instance}}-{{name}}-{{id}}", - "refId": "B" } ], "thresholds": [], @@ -16066,4 +16071,4 @@ "title": "Test-Cluster-TiCDC", "uid": "YiGL8hBZ1", "version": 34 -} +} \ No newline at end of file diff --git a/pkg/actor/system.go b/pkg/actor/system.go index aae25aa0b8c..6af442607ce 100644 --- a/pkg/actor/system.go +++ b/pkg/actor/system.go @@ -34,10 +34,10 @@ import ( const ( // The max number of workers of a system. maxWorkerNum = 64 - // defaultActorBatchSize is the default size of polled actor batch. - defaultActorBatchSize = 1 - // defaultMsgBatchSizePerActor is the default size of receive message batch. - defaultMsgBatchSizePerActor = 64 + // DefaultActorBatchSize is the default size of polled actor batch. + DefaultActorBatchSize = 1 + // DefaultMsgBatchSizePerActor is the default size of receive message batch. + DefaultMsgBatchSizePerActor = 64 ) var ( @@ -284,8 +284,8 @@ func NewSystemBuilder(name string) *SystemBuilder { return &SystemBuilder{ name: name, numWorker: defaultWorkerNum, - actorBatchSize: defaultActorBatchSize, - msgBatchSizePerActor: defaultMsgBatchSizePerActor, + actorBatchSize: DefaultActorBatchSize, + msgBatchSizePerActor: DefaultMsgBatchSizePerActor, } } @@ -457,6 +457,7 @@ func (s *System) poll(ctx context.Context, id int) { rd.Unlock() return } + // Batch receive ready procs. n := rd.batchReceiveProcs(batchPBuf) if n != 0 { diff --git a/pkg/actor/system_test.go b/pkg/actor/system_test.go index c007219f78c..20299b0693a 100644 --- a/pkg/actor/system_test.go +++ b/pkg/actor/system_test.go @@ -317,7 +317,7 @@ func TestActorManyMessageOneSchedule(t *testing.T) { id := ID(777) // To avoid blocking, use a large buffer. - size := defaultMsgBatchSizePerActor * 4 + size := DefaultMsgBatchSizePerActor * 4 ch := make(chan message.Message, size) fa := &forwardActor{ ch: ch, @@ -404,7 +404,7 @@ func TestConcurrentPollSameActor(t *testing.T) { syncCount: syncCount, } id := ID(777) - mb := NewMailbox(id, defaultMsgBatchSizePerActor) + mb := NewMailbox(id, DefaultMsgBatchSizePerActor) require.Nil(t, sys.Spawn(mb, fa)) // Test 5 seconds @@ -447,7 +447,7 @@ func TestPollStoppedActor(t *testing.T) { id := ID(777) // To avoid blocking, use a large buffer. - cap := defaultMsgBatchSizePerActor * 4 + cap := DefaultMsgBatchSizePerActor * 4 mb := NewMailbox(id, cap) ch := make(chan int) require.Nil(t, sys.Spawn(mb, &closedActor{ch: ch})) @@ -477,7 +477,7 @@ func TestStoppedActorIsRemovedFromRouter(t *testing.T) { sys.Start(ctx) id := ID(777) - mb := NewMailbox(id, defaultMsgBatchSizePerActor) + mb := NewMailbox(id, DefaultMsgBatchSizePerActor) ch := make(chan int) require.Nil(t, sys.Spawn(mb, &closedActor{ch: ch})) @@ -532,7 +532,7 @@ func TestSendBeforeClose(t *testing.T) { sys.Start(ctx) id := ID(777) - mb := NewMailbox(id, defaultMsgBatchSizePerActor) + mb := NewMailbox(id, DefaultMsgBatchSizePerActor) ch := make(chan struct{}) require.Nil(t, sys.Spawn(mb, &slowActor{ch: ch})) @@ -587,7 +587,7 @@ func TestSendAfterClose(t *testing.T) { id := ID(777) dropCount := 1 - cap := defaultMsgBatchSizePerActor + dropCount + cap := DefaultMsgBatchSizePerActor + dropCount mb := NewMailbox(id, cap) ch := make(chan struct{}) require.Nil(t, sys.Spawn(mb, &slowActor{ch: ch})) @@ -653,7 +653,7 @@ func BenchmarkActorSendReceive(b *testing.B) { sys.Start(ctx) id := ID(777) - size := defaultMsgBatchSizePerActor * 4 + size := DefaultMsgBatchSizePerActor * 4 ch := make(chan message.Message, size) fa := &forwardActor{ ch: ch, diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 515ed511693..8866f147aaf 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -74,7 +74,7 @@ func (o *options) addFlags(cmd *cobra.Command) { cmd.Flags().IntVar(&o.serverConfig.Sorter.NumConcurrentWorker, "sorter-num-concurrent-worker", o.serverConfig.Sorter.NumConcurrentWorker, "sorter concurrency level") cmd.Flags().Uint64Var(&o.serverConfig.Sorter.ChunkSizeLimit, "sorter-chunk-size-limit", o.serverConfig.Sorter.ChunkSizeLimit, "size of heaps for sorting") // 80 is safe on most systems. - cmd.Flags().IntVar(&o.serverConfig.Sorter.MaxMemoryPressure, "sorter-max-memory-percentage", o.serverConfig.Sorter.MaxMemoryPressure, "system memory usage threshold for forcing in-disk sort") + cmd.Flags().IntVar(&o.serverConfig.Sorter.MaxMemoryPercentage, "sorter-max-memory-percentage", o.serverConfig.Sorter.MaxMemoryPercentage, "system memory usage threshold for forcing in-disk sort") // We use 8GB as a safe default before we support local configuration file. cmd.Flags().Uint64Var(&o.serverConfig.Sorter.MaxMemoryConsumption, "sorter-max-memory-consumption", o.serverConfig.Sorter.MaxMemoryConsumption, "maximum memory consumption of in-memory sort") cmd.Flags().StringVar(&o.serverConfig.Sorter.SortDir, "sort-dir", o.serverConfig.Sorter.SortDir, "sorter's temporary file directory") @@ -187,7 +187,7 @@ func (o *options) complete(cmd *cobra.Command) error { case "sorter-chunk-size-limit": cfg.Sorter.ChunkSizeLimit = o.serverConfig.Sorter.ChunkSizeLimit case "sorter-max-memory-percentage": - cfg.Sorter.MaxMemoryPressure = o.serverConfig.Sorter.MaxMemoryPressure + cfg.Sorter.MaxMemoryPercentage = o.serverConfig.Sorter.MaxMemoryPercentage case "sorter-max-memory-consumption": cfg.Sorter.MaxMemoryConsumption = o.serverConfig.Sorter.MaxMemoryConsumption case "ca": diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index bbf131fe4ef..34c22dedb80 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -158,7 +158,7 @@ func TestParseCfg(t *testing.T) { Sorter: &config.SorterConfig{ NumConcurrentWorker: 80, ChunkSizeLimit: 50000000, - MaxMemoryPressure: 70, + MaxMemoryPercentage: 70, MaxMemoryConsumption: 60000, NumWorkerPoolGoroutine: 90, SortDir: config.DefaultSortDir, @@ -192,10 +192,10 @@ func TestParseCfg(t *testing.T) { WriteL0SlowdownTrigger: math.MaxInt32, WriteL0PauseTrigger: math.MaxInt32, CompactionL0Trigger: 160, - CompactionDeletionThreshold: 160000, + CompactionDeletionThreshold: 10485760, + CompactionPeriod: 1800, IteratorMaxAliveDuration: 10000, IteratorSlowReadDuration: 256, - CleanupSpeedLimit: 10000, }, // We expect the default configuration here. Messages: &config.MessagesConfig{ @@ -256,9 +256,9 @@ compression = "none" target-file-size-base = 10 compaction-l0-trigger = 11 compaction-deletion-threshold = 15 +compaction-period = 16 write-l0-slowdown-trigger = 12 write-l0-pause-trigger = 13 -cleanup-speed-limit = 14 [debug.messages] client-max-batch-interval = "500ms" @@ -304,7 +304,7 @@ server-worker-pool-size = 16 Sorter: &config.SorterConfig{ NumConcurrentWorker: 4, ChunkSizeLimit: 10000000, - MaxMemoryPressure: 3, + MaxMemoryPercentage: 3, MaxMemoryConsumption: 2000000, NumWorkerPoolGoroutine: 5, SortDir: config.DefaultSortDir, @@ -334,10 +334,10 @@ server-worker-pool-size = 16 CompactionL0Trigger: 11, WriteL0SlowdownTrigger: 12, WriteL0PauseTrigger: 13, - CleanupSpeedLimit: 14, IteratorMaxAliveDuration: 10000, IteratorSlowReadDuration: 256, CompactionDeletionThreshold: 15, + CompactionPeriod: 16, }, Messages: &config.MessagesConfig{ ClientMaxBatchInterval: config.TomlDuration(500 * time.Millisecond), @@ -440,7 +440,7 @@ cert-allowed-cn = ["dd","ee"] Sorter: &config.SorterConfig{ NumConcurrentWorker: 3, ChunkSizeLimit: 50000000, - MaxMemoryPressure: 70, + MaxMemoryPercentage: 70, MaxMemoryConsumption: 60000000, NumWorkerPoolGoroutine: 5, SortDir: config.DefaultSortDir, @@ -474,10 +474,10 @@ cert-allowed-cn = ["dd","ee"] WriteL0SlowdownTrigger: math.MaxInt32, WriteL0PauseTrigger: math.MaxInt32, CompactionL0Trigger: 160, - CompactionDeletionThreshold: 160000, + CompactionDeletionThreshold: 10485760, + CompactionPeriod: 1800, IteratorMaxAliveDuration: 10000, IteratorSlowReadDuration: 256, - CleanupSpeedLimit: 10000, }, // We expect the default configuration here. Messages: &config.MessagesConfig{ @@ -533,10 +533,10 @@ unknown3 = 3 WriteL0SlowdownTrigger: math.MaxInt32, WriteL0PauseTrigger: math.MaxInt32, CompactionL0Trigger: 160, - CompactionDeletionThreshold: 160000, + CompactionDeletionThreshold: 10485760, + CompactionPeriod: 1800, IteratorMaxAliveDuration: 10000, IteratorSlowReadDuration: 256, - CleanupSpeedLimit: 10000, }, // We expect the default configuration here. Messages: &config.MessagesConfig{ diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index fca23e8dd77..90a7e911180 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -125,10 +125,10 @@ const ( "write-l0-slowdown-trigger": 2147483647, "write-l0-pause-trigger": 2147483647, "compaction-l0-trigger": 160, - "compaction-deletion-threshold": 160000, + "compaction-deletion-threshold": 10485760, + "compaction-period": 1800, "iterator-max-alive-duration": 10000, - "iterator-slow-read-duration": 256, - "cleanup-speed-limit": 10000 + "iterator-slow-read-duration": 256 }, "enable-new-scheduler": false, "messages": { diff --git a/pkg/config/db.go b/pkg/config/db.go index ca9e0b447b9..24247542f96 100644 --- a/pkg/config/db.go +++ b/pkg/config/db.go @@ -71,10 +71,14 @@ type DBConfig struct { // CompactionDeletionThreshold defines the threshold of the number of deletion that // trigger compaction. // - // The default value is 160000. - // Iterator.First() takes about 27ms to 149ms in this case, - // see pkg/db.BenchmarkNext. + // The default value is 10 * 1024 * 1024, 10485760. + // Assume every key-value is about 1KB, 10485760 is about deleting 10GB data. CompactionDeletionThreshold int `toml:"compaction-deletion-threshold" json:"compaction-deletion-threshold"` + // CompactionDeletionThreshold defines the threshold of the number of deletion that + // trigger compaction. + // + // The default value is 30 minutes, 1800. + CompactionPeriod int `toml:"compaction-period" json:"compaction-period"` // IteratorMaxAliveDuration the maximum iterator alive duration in ms. // @@ -86,11 +90,6 @@ type DBConfig struct { // // The default value is 256, 256ms. IteratorSlowReadDuration int `toml:"iterator-slow-read-duration" json:"iterator-slow-read-duration"` - - // CleanupSpeedLimit limits clean up speed, based on key value entry count. - // - // The default value is 10000. - CleanupSpeedLimit int `toml:"cleanup-speed-limit" json:"cleanup-speed-limit"` } // ValidateAndAdjust validates and adjusts the db configuration @@ -98,9 +97,6 @@ func (c *DBConfig) ValidateAndAdjust() error { if c.Compression != "none" && c.Compression != "snappy" { return cerror.ErrIllegalSorterParameter.GenWithStackByArgs("sorter.leveldb.compression must be \"none\" or \"snappy\"") } - if c.CleanupSpeedLimit <= 1 { - return cerror.ErrIllegalSorterParameter.GenWithStackByArgs("sorter.leveldb.cleanup-speed-limit must be larger than 1") - } return nil } diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index bb930dd6f0d..f1723c71c89 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -85,7 +85,7 @@ var defaultServerConfig = &ServerConfig{ Sorter: &SorterConfig{ NumConcurrentWorker: 4, ChunkSizeLimit: 128 * 1024 * 1024, // 128MB - MaxMemoryPressure: 30, // 30% is safe on machines with memory capacity <= 16GB + MaxMemoryPercentage: 30, // 30% is safe on machines with memory capacity <= 16GB MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, // 16GB NumWorkerPoolGoroutine: 16, SortDir: DefaultSortDir, @@ -119,10 +119,10 @@ var defaultServerConfig = &ServerConfig{ WriteL0SlowdownTrigger: math.MaxInt32, WriteL0PauseTrigger: math.MaxInt32, CompactionL0Trigger: 160, - CompactionDeletionThreshold: 160000, + CompactionDeletionThreshold: 10485760, + CompactionPeriod: 1800, IteratorMaxAliveDuration: 10000, IteratorSlowReadDuration: 256, - CleanupSpeedLimit: 10000, }, Messages: defaultMessageConfig.Clone(), }, diff --git a/pkg/config/server_config_test.go b/pkg/config/server_config_test.go index b164defe61d..4c979365e1c 100644 --- a/pkg/config/server_config_test.go +++ b/pkg/config/server_config_test.go @@ -90,6 +90,4 @@ func TestDBConfigValidateAndAdjust(t *testing.T) { require.Nil(t, conf.ValidateAndAdjust()) conf.Compression = "invalid" require.Error(t, conf.ValidateAndAdjust()) - conf.CleanupSpeedLimit = 0 - require.Error(t, conf.ValidateAndAdjust()) } diff --git a/pkg/config/sorter.go b/pkg/config/sorter.go index 1c68c1c5ff2..56ddff8b5eb 100644 --- a/pkg/config/sorter.go +++ b/pkg/config/sorter.go @@ -22,7 +22,7 @@ type SorterConfig struct { // maximum size for a heap ChunkSizeLimit uint64 `toml:"chunk-size-limit" json:"chunk-size-limit"` // the maximum memory use percentage that allows in-memory sorting - MaxMemoryPressure int `toml:"max-memory-percentage" json:"max-memory-percentage"` + MaxMemoryPercentage int `toml:"max-memory-percentage" json:"max-memory-percentage"` // the maximum memory consumption allowed for in-memory sorting MaxMemoryConsumption uint64 `toml:"max-memory-consumption" json:"max-memory-consumption"` // the size of workerpool @@ -48,8 +48,8 @@ func (c *SorterConfig) ValidateAndAdjust() error { if c.NumWorkerPoolGoroutine < 1 { return cerror.ErrIllegalSorterParameter.GenWithStackByArgs("num-workerpool-goroutine should be at least 1, larger than 8 is recommended") } - if c.MaxMemoryPressure < 0 || c.MaxMemoryPressure > 100 { - return cerror.ErrIllegalSorterParameter.GenWithStackByArgs("max-memory-percentage should be a percentage") + if c.MaxMemoryPercentage <= 0 || c.MaxMemoryPercentage > 80 { + return cerror.ErrIllegalSorterParameter.GenWithStackByArgs("max-memory-percentage should be a percentage and within (0, 80]") } return nil diff --git a/pkg/db/db.go b/pkg/db/db.go index c4f6713cf92..de32099ca29 100644 --- a/pkg/db/db.go +++ b/pkg/db/db.go @@ -17,6 +17,7 @@ package db type DB interface { Iterator(lowerBound, upperBound []byte) Iterator Batch(cap int) Batch + DeleteRange(start, end []byte) error Compact(start, end []byte) error Close() error CollectMetrics(captureAddr string, id int) @@ -35,7 +36,6 @@ type Batch interface { // Iterator is an interface of an iterator of a DB. type Iterator interface { Valid() bool - First() bool Seek([]byte) bool Next() bool Key() []byte diff --git a/pkg/db/db_test.go b/pkg/db/db_test.go index d6edcd04c3f..857713bba28 100644 --- a/pkg/db/db_test.go +++ b/pkg/db/db_test.go @@ -43,7 +43,7 @@ func TestDB(t *testing.T) { require.Nil(t, err) testDB(t, db) - db, err = OpenPebble(ctx, 1, filepath.Join(t.TempDir(), "2"), cfg) + db, err = OpenPebble(ctx, 1, filepath.Join(t.TempDir(), "2"), 0, cfg) require.Nil(t, err) testDB(t, db) } @@ -81,9 +81,9 @@ func testDB(t *testing.T, db DB) { Limit: []byte("k4"), }, nil) iter := db.Iterator([]byte(""), []byte("k4")) - // First - require.True(t, liter.First()) - require.True(t, iter.First()) + // Seek + require.True(t, liter.Seek([]byte{})) + require.True(t, iter.Seek([]byte{})) // Valid require.True(t, liter.Valid()) require.True(t, iter.Valid()) @@ -125,7 +125,7 @@ func TestPebbleMetrics(t *testing.T) { cfg.Count = 1 id := 1 - option, ws := buildPebbleOption(id, cfg) + option, ws := buildPebbleOption(id, 0, cfg) db, err := pebble.Open(t.TempDir(), &option) require.Nil(t, err) pdb := &pebbleDB{ @@ -204,7 +204,8 @@ func BenchmarkNext(b *testing.B) { }, { name: "pebble", dbfn: func(name string) DB { - db, err := OpenPebble(ctx, 1, filepath.Join(b.TempDir(), name), cfg) + gb := 1024 * 1024 * 1024 + db, err := OpenPebble(ctx, 1, filepath.Join(b.TempDir(), name), gb, cfg) require.Nil(b, err) return db }, @@ -243,7 +244,7 @@ func BenchmarkNext(b *testing.B) { iter := db.Iterator([]byte{}, bytes.Repeat([]byte{0xff}, len(key))) require.Nil(b, iter.Error()) for i := 0; i < b.N; i++ { - for ok := iter.First(); ok; ok = iter.Next() { + for ok := iter.Seek([]byte{}); ok; ok = iter.Next() { } } }) diff --git a/pkg/db/leveldb.go b/pkg/db/leveldb.go index 7c53a6fae39..06131fdd1ef 100644 --- a/pkg/db/leveldb.go +++ b/pkg/db/leveldb.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/sorter" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/errors" cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/retry" "github.com/syndtr/goleveldb/leveldb" @@ -100,6 +101,10 @@ func (p *levelDB) Batch(cap int) Batch { } } +func (p *levelDB) DeleteRange(start, end []byte) error { + return errors.ErrUnimplemented.FastGenByArgs("leveldb.DeleteRange") +} + func (p *levelDB) Compact(start, end []byte) error { return p.db.CompactRange(util.Range{Start: start, Limit: end}) } @@ -182,10 +187,6 @@ func (i leveldbIter) Valid() bool { return i.Iterator.Valid() } -func (i leveldbIter) First() bool { - return i.Iterator.First() -} - func (i leveldbIter) Next() bool { return i.Iterator.Next() } diff --git a/pkg/db/metrics.go b/pkg/db/metrics.go index c02b2edb313..885d826fc69 100644 --- a/pkg/db/metrics.go +++ b/pkg/db/metrics.go @@ -66,6 +66,13 @@ var ( Name: "write_delay_total", Help: "The total number of leveldb delay", }, []string{"capture", "id"}) + + dbBlockCacheAccess = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "db", + Name: "block_cache_access_total", + Help: "The total number of leveldb block cache access", + }, []string{"capture", "id", "type"}) ) // InitMetrics registers all metrics in this file @@ -77,4 +84,5 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(dbReadBytes) registry.MustRegister(dbWriteDelayDuration) registry.MustRegister(dbWriteDelayCount) + registry.MustRegister(dbBlockCacheAccess) } diff --git a/pkg/db/pebble.go b/pkg/db/pebble.go index 4c458ab33dd..a887b83f33b 100644 --- a/pkg/db/pebble.go +++ b/pkg/db/pebble.go @@ -47,12 +47,12 @@ func (logger *pebbleLogger) Fatalf(format string, args ...interface{}) { // TODO: Update DB config once we switch to pebble, // as some configs are not applicable to pebble. -func buildPebbleOption(id int, cfg *config.DBConfig) (pebble.Options, *writeStall) { +func buildPebbleOption(id int, memInByte int, cfg *config.DBConfig) (pebble.Options, *writeStall) { var option pebble.Options option.ErrorIfExists = true - option.DisableWAL = true + option.DisableWAL = false // Delete range requires WAL. option.MaxOpenFiles = cfg.MaxOpenFiles / cfg.Count - option.Cache = pebble.NewCache(int64(cfg.BlockCacheSize / cfg.Count)) + option.Cache = pebble.NewCache(int64(memInByte)) option.MaxConcurrentCompactions = 6 option.L0CompactionThreshold = cfg.CompactionL0Trigger option.L0StopWritesThreshold = cfg.WriteL0PauseTrigger @@ -98,8 +98,10 @@ func buildPebbleOption(id int, cfg *config.DBConfig) (pebble.Options, *writeStal } // OpenPebble opens a pebble. -func OpenPebble(ctx context.Context, id int, path string, cfg *config.DBConfig) (DB, error) { - option, ws := buildPebbleOption(id, cfg) +func OpenPebble( + ctx context.Context, id int, path string, memInByte int, cfg *config.DBConfig, +) (DB, error) { + option, ws := buildPebbleOption(id, memInByte, cfg) dbDir := filepath.Join(path, fmt.Sprintf("%04d", id)) err := retry.Do(ctx, func() error { err1 := os.RemoveAll(dbDir) @@ -151,6 +153,10 @@ func (p *pebbleDB) Batch(cap int) Batch { } } +func (p *pebbleDB) DeleteRange(start, end []byte) error { + return p.db.DeleteRange(start, end, nil) +} + func (p *pebbleDB) Compact(start, end []byte) error { return p.db.Compact(start, end) } @@ -190,6 +196,10 @@ func (p *pebbleDB) CollectMetrics(captureAddr string, i int) { for level, metric := range stats.Levels { metricLevelCount.WithLabelValues(fmt.Sprint(level)).Set(float64(metric.NumFiles)) } + dbBlockCacheAccess. + WithLabelValues(captureAddr, id, "hit").Set(float64(stats.BlockCache.Hits)) + dbBlockCacheAccess. + WithLabelValues(captureAddr, id, "miss").Set(float64(stats.BlockCache.Misses)) } type pebbleBatch struct { @@ -232,10 +242,6 @@ func (i pebbleIter) Valid() bool { return i.Iterator.Valid() } -func (i pebbleIter) First() bool { - return i.Iterator.First() -} - func (i pebbleIter) Seek(key []byte) bool { return i.Iterator.SeekGE(key) } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 9a20a414a6e..50e1ab4d1f6 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -19,6 +19,9 @@ import ( // errors var ( + // general errors + ErrUnimplemented = errors.Normalize("unimplemented %s", errors.RFCCodeText("CDC:ErrUnimplemented")) + // kv related errors ErrWriteTsConflict = errors.Normalize("write ts conflict", errors.RFCCodeText("CDC:ErrWriteTsConflict")) ErrChangeFeedNotExists = errors.Normalize("changefeed not exists, %s", errors.RFCCodeText("CDC:ErrChangeFeedNotExists")) diff --git a/tests/utils/many_sorters_test/many_sorters.go b/tests/utils/many_sorters_test/many_sorters.go index e6cb6bcfa6b..69c4c600c68 100644 --- a/tests/utils/many_sorters_test/many_sorters.go +++ b/tests/utils/many_sorters_test/many_sorters.go @@ -55,7 +55,7 @@ func main() { conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, ChunkSizeLimit: 1 * 1024 * 1024 * 1024, - MaxMemoryPressure: 60, + MaxMemoryPercentage: 60, MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, NumWorkerPoolGoroutine: 16, } diff --git a/tests/utils/sorter_stress_test/sorter_stress.go b/tests/utils/sorter_stress_test/sorter_stress.go index 3e1d61b0538..0f4985c1314 100644 --- a/tests/utils/sorter_stress_test/sorter_stress.go +++ b/tests/utils/sorter_stress_test/sorter_stress.go @@ -52,7 +52,7 @@ func main() { conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, ChunkSizeLimit: 1 * 1024 * 1024 * 1024, - MaxMemoryPressure: 60, + MaxMemoryPercentage: 60, MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, } config.StoreGlobalServerConfig(conf)