Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sorter/leveldb(ticdc): separate write from table sorter #4686

Merged
merged 7 commits into from
Feb 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ func (n *sorterNode) start(ctx pipeline.NodeContext, isTableActorMode bool, eg *

if config.GetGlobalServerConfig().Debug.EnableDBSorter {
startTs := ctx.ChangefeedVars().Info.StartTs
actorID := ctx.GlobalVars().SorterSystem.ActorID(uint64(n.tableID))
router := ctx.GlobalVars().SorterSystem.Router()
actorID := ctx.GlobalVars().SorterSystem.DBActorID(uint64(n.tableID))
router := ctx.GlobalVars().SorterSystem.DBRouter
compactScheduler := ctx.GlobalVars().SorterSystem.CompactScheduler()
levelSorter := leveldb.NewSorter(
ctx, n.tableID, startTs, router, actorID, compactScheduler,
Expand Down
16 changes: 16 additions & 0 deletions cdc/sorter/leveldb/message/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sorter/encoding"
"github.com/pingcap/tiflow/pkg/db"
"golang.org/x/sync/semaphore"
Expand All @@ -27,11 +28,20 @@ type Task struct {
UID uint32
TableID uint64

// Input unsorted event for writers.
// Sorter.AddEntry -> writer.
InputEvent *model.PolymorphicEvent
// Latest resolved ts / commit ts for readers.
// writer -> reader
ReadTs ReadTs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes the logic complex and unreadable to add so many fields in one struct, some fields are always nil in some cases.
And the Task is a part of Message, The size will be larger and larger, maybe we should find a better way to extend the Task or Message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good point! Ideally tasks should be an enum type in rust, so tasks can be separated and more readable. Unfortunately, golang does not support it.

And the Task is a part of Message, The size will be larger and larger, maybe we should find a better way to extend the Task or Message.

Maybe we should make Task be a pointer in Message? Though it makes memory allocation, not sure if it helps performance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also see, notes in #4631

// A batch of events (bytes encoded) need to be wrote.
// writer -> leveldb
WriteReq map[Key][]byte
// Requests an iterator when it is not nil.
// reader -> leveldb
IterReq *IterRequest
// Deletes all of the key-values in the range.
// reader -> leveldb and leveldb -> compactor
DeleteReq *DeleteRequest
}

Expand All @@ -42,6 +52,12 @@ type DeleteRequest struct {
Count int
}

// ReadTs wraps the latest resolved ts and commit ts.
type ReadTs struct {
MaxCommitTs uint64
MaxResolvedTs uint64
}

// IterRequest contains parameters that necessary to build an iterator.
type IterRequest struct {
UID uint32
Expand Down
25 changes: 18 additions & 7 deletions cdc/sorter/leveldb/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type System struct {
dbs []db.DB
dbSystem *actor.System
DBRouter *actor.Router
WriterSystem *actor.System
WriterRouter *actor.Router
compactSystem *actor.System
compactRouter *actor.Router
compactSched *lsorter.CompactScheduler
Expand All @@ -64,14 +66,22 @@ type System struct {

// NewSystem returns a system.
func NewSystem(dir string, memPercentage float64, cfg *config.DBConfig) *System {
// A system polles actors that read and write leveldb.
dbSystem, dbRouter := actor.NewSystemBuilder("sorter-db").
WorkerNumber(cfg.Count).Build()
// A system polles actors that compact leveldb, garbage collection.
compactSystem, compactRouter := actor.NewSystemBuilder("sorter-compactor").
WorkerNumber(cfg.Count).Build()
// A system polles actors that receive events from Puller and batch send
// writes to leveldb.
writerSystem, writerRouter := actor.NewSystemBuilder("sorter-writer").
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have sorter-db and sorter-writer, can you add some comments about sorter-db

WorkerNumber(cfg.Count).Throughput(4, 64).Build()
compactSched := lsorter.NewCompactScheduler(compactRouter)
return &System{
dbSystem: dbSystem,
DBRouter: dbRouter,
WriterSystem: writerSystem,
WriterRouter: writerRouter,
compactSystem: compactSystem,
compactRouter: compactRouter,
compactSched: compactSched,
Expand All @@ -85,20 +95,15 @@ func NewSystem(dir string, memPercentage float64, cfg *config.DBConfig) *System
}
}

// ActorID returns an ActorID correspond with tableID.
func (s *System) ActorID(tableID uint64) actor.ID {
// DBActorID returns an DBActorID correspond with tableID.
func (s *System) DBActorID(tableID uint64) actor.ID {
h := fnv.New64()
b := [8]byte{}
binary.LittleEndian.PutUint64(b[:], tableID)
h.Write(b[:])
return actor.ID(h.Sum64() % uint64(s.cfg.Count))
}

// Router returns db actors router.
func (s *System) Router() *actor.Router {
return s.DBRouter
}

// CompactScheduler returns compaction scheduler.
func (s *System) CompactScheduler() *lsorter.CompactScheduler {
return s.compactSched
Expand Down Expand Up @@ -131,6 +136,7 @@ func (s *System) Start(ctx context.Context) error {

s.compactSystem.Start(ctx)
s.dbSystem.Start(ctx)
s.WriterSystem.Start(ctx)
captureAddr := config.GetGlobalServerConfig().AdvertiseAddr
totalMemory, err := memory.MemTotal()
if err != nil {
Expand Down Expand Up @@ -205,6 +211,7 @@ func (s *System) Stop() error {
defer cancel()
// Close actors
s.broadcast(ctx, s.DBRouter, message.StopMessage())
s.broadcast(ctx, s.WriterRouter, message.StopMessage())
s.broadcast(ctx, s.compactRouter, message.StopMessage())
// Close metrics goroutine.
close(s.closedCh)
Expand All @@ -216,6 +223,10 @@ func (s *System) Stop() error {
if err != nil {
return errors.Trace(err)
}
err = s.WriterSystem.Stop()
if err != nil {
return errors.Trace(err)
}
err = s.compactSystem.Stop()
if err != nil {
return errors.Trace(err)
Expand Down
4 changes: 2 additions & 2 deletions cdc/sorter/leveldb/system/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func TestDBActorID(t *testing.T) {

sys := NewSystem(t.TempDir(), 1, cfg)
require.Nil(t, sys.Start(ctx))
id1 := sys.ActorID(1)
id2 := sys.ActorID(1)
id1 := sys.DBActorID(1)
id2 := sys.DBActorID(1)
// tableID to actor ID must be deterministic.
require.Equal(t, id1, id2)
require.Nil(t, sys.Stop())
Expand Down
59 changes: 43 additions & 16 deletions cdc/sorter/leveldb/table_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,34 @@ func allocID() uint32 {
return atomic.AddUint32(&levelDBSorterIDAlloc, 1)
}

// Sorter accepts out-of-order raw kv entries and output sorted entries
type Sorter struct {
actorID actor.ID
router *actor.Router
compact *CompactScheduler
type common struct {
dbActorID actor.ID
dbRouter *actor.Router

uid uint32
tableID uint64
serde *encoding.MsgPackGenSerde
errCh chan error
}

// reportError notifies Sorter to return an error and close.
func (c *common) reportError(msg string, err error) {
if errors.Cause(err) != context.Canceled {
log.L().WithOptions(zap.AddCallerSkip(1)).
Warn(msg, zap.Uint64("tableID", c.tableID), zap.Error(err))
}
select {
case c.errCh <- err:
default:
// It means there is an error already.
}
}

// Sorter accepts out-of-order raw kv entries and output sorted entries
type Sorter struct {
common

compact *CompactScheduler

iterMaxAliveDuration time.Duration
iterFirstSlowDuration time.Duration
Expand Down Expand Up @@ -85,13 +105,16 @@ func NewSorter(
metricIterDuration := sorterIterReadDurationHistogram.MustCurryWith(
prometheus.Labels{"capture": captureAddr, "id": changefeedID})
return &Sorter{
actorID: actorID,
router: router,
common: common{
dbActorID: actorID,
dbRouter: router,
uid: allocID(),
tableID: uint64(tableID),
serde: &encoding.MsgPackGenSerde{},
errCh: make(chan error, 1),
},
compact: compact,
uid: allocID(),
tableID: uint64(tableID),
lastSentResolvedTs: startTs,
serde: &encoding.MsgPackGenSerde{},

iterMaxAliveDuration: time.Duration(cfg.IteratorMaxAliveDuration) * time.Millisecond,
iterFirstSlowDuration: time.Duration(cfg.IteratorSlowReadDuration) * time.Millisecond,
Expand All @@ -111,6 +134,8 @@ func (ls *Sorter) waitInput(ctx context.Context) (*model.PolymorphicEvent, error
select {
case <-ctx.Done():
return nil, errors.Trace(ctx.Err())
case err := <-ls.errCh:
return nil, errors.Trace(err)
case ev := <-ls.inputCh:
return ev, nil
}
Expand All @@ -129,6 +154,8 @@ func (ls *Sorter) waitInputOutput(
select {
case <-ctx.Done():
return nil, errors.Trace(ctx.Err())
case err := <-ls.errCh:
return nil, errors.Trace(err)
case ev := <-ls.inputCh:
return ev, nil
case ls.outputCh <- dummyEvent:
Expand Down Expand Up @@ -187,7 +214,7 @@ func (ls *Sorter) wait(
if err != nil {
atomic.StoreInt32(&ls.closed, 1)
close(ls.outputCh)
return 0, 0, 0, errors.Trace(ctx.Err())
return 0, 0, 0, errors.Trace(err)
}
if ev == nil {
// No input event and output is available.
Expand All @@ -200,7 +227,7 @@ func (ls *Sorter) wait(
if err != nil {
atomic.StoreInt32(&ls.closed, 1)
close(ls.outputCh)
return 0, 0, 0, errors.Trace(ctx.Err())
return 0, 0, 0, errors.Trace(err)
}
appendInputEvent(ev)
}
Expand Down Expand Up @@ -629,13 +656,13 @@ func (ls *Sorter) poll(ctx context.Context, state *pollState) error {
return errors.Trace(err)
}
// Send write task to leveldb.
return ls.router.SendB(ctx, ls.actorID, actormsg.SorterMessage(task))
return ls.dbRouter.SendB(ctx, ls.dbActorID, actormsg.SorterMessage(task))
}

var hasIter bool
task.IterReq, hasIter = state.tryGetIterator(ls.uid, ls.tableID)
// Send write/read task to leveldb.
err = ls.router.SendB(ctx, ls.actorID, actormsg.SorterMessage(task))
err = ls.dbRouter.SendB(ctx, ls.dbActorID, actormsg.SorterMessage(task))
if err != nil {
// Skip read iterator if send fails.
return errors.Trace(err)
Expand Down Expand Up @@ -668,7 +695,7 @@ func (ls *Sorter) Run(ctx context.Context) error {
maxResolvedTs: uint64(0),
exhaustedResolvedTs: uint64(0),

actorID: ls.actorID,
actorID: ls.dbActorID,
compact: ls.compact,
iterFirstSlowDuration: ls.iterFirstSlowDuration,
iterMaxAliveDuration: ls.iterMaxAliveDuration,
Expand Down Expand Up @@ -729,6 +756,6 @@ func (ls *Sorter) CleanupFunc() func(context.Context) error {
encoding.EncodeTsKey(ls.uid, ls.tableID+1, 0),
},
}
return ls.router.SendB(ctx, ls.actorID, actormsg.SorterMessage(task))
return ls.dbRouter.SendB(ctx, ls.dbActorID, actormsg.SorterMessage(task))
}
}
46 changes: 21 additions & 25 deletions cdc/sorter/leveldb/table_sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
actormsg "github.com/pingcap/tiflow/pkg/actor/message"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/db"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"golang.org/x/sync/semaphore"
Expand All @@ -44,6 +45,26 @@ func newTestSorter(
return ls, mb
}

func TestRunAndReportError(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s, _ := newTestSorter(ctx, 2)
go func() {
time.Sleep(100 * time.Millisecond)
s.common.reportError(
"test", errors.ErrLevelDBSorterError.GenWithStackByArgs())
}()
require.Error(t, s.Run(ctx))

// Must be nonblock.
s.common.reportError(
"test", errors.ErrLevelDBSorterError.GenWithStackByArgs())
s.common.reportError(
"test", errors.ErrLevelDBSorterError.GenWithStackByArgs())
}

func TestInputOutOfOrder(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -471,15 +492,6 @@ func TestOutputBufferedResolvedEvents(t *testing.T) {
}
}

func newTestEvent(crts, startTs uint64, key int) *model.PolymorphicEvent {
return model.NewPolymorphicEvent(&model.RawKVEntry{
OpType: model.OpTypePut,
Key: []byte{byte(key)},
StartTs: startTs,
CRTs: crts,
})
}

func prepareTxnData(
t *testing.T, ls *Sorter, txnCount, txnSize int,
) db.DB {
Expand All @@ -501,22 +513,6 @@ func prepareTxnData(
return db
}

func receiveOutputEvents(
outputCh chan *model.PolymorphicEvent,
) []*model.PolymorphicEvent {
outputEvents := []*model.PolymorphicEvent{}
RECV:
for {
select {
case ev := <-outputCh:
outputEvents = append(outputEvents, ev)
default:
break RECV
}
}
return outputEvents
}

func TestOutputIterEvents(t *testing.T) {
t.Parallel()

Expand Down
Loading