Skip to content

Commit

Permalink
lightning: buffer all region jobs and consider store balance (#56112)
Browse files Browse the repository at this point in the history
close #54886, close #56113
  • Loading branch information
lance6716 authored Sep 25, 2024
1 parent a159c49 commit 918b58d
Show file tree
Hide file tree
Showing 5 changed files with 644 additions and 80 deletions.
1 change: 1 addition & 0 deletions pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ go_library(
"//pkg/util/compress",
"//pkg/util/engine",
"//pkg/util/hack",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/mathutil",
"//pkg/util/ranger",
Expand Down
187 changes: 128 additions & 59 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/engine"
"github.com/pingcap/tidb/pkg/util/intest"
tikvclient "github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
Expand Down Expand Up @@ -867,12 +868,8 @@ func getRegionSplitKeys(
return keys, err
}

// prepareAndSendJob will read the engine to get estimated key range,
// then split and scatter regions for these range and send region jobs to jobToWorkerCh.
// NOTE when ctx is Done, this function will NOT return error even if it hasn't sent
// all the jobs to jobToWorkerCh. This is because the "first error" can only be
// found by checking the work group LATER, we don't want to return an error to
// seize the "first" error.
// prepareAndSendJob will read the engine to get estimated key range, then split
// and scatter regions for these range and send region jobs to jobToWorkerCh.
func (local *Backend) prepareAndSendJob(
ctx context.Context,
engine common.Engine,
Expand Down Expand Up @@ -982,9 +979,7 @@ func (local *Backend) generateAndSendJob(
case <-egCtx.Done():
// this job is not put into jobToWorkerCh
job.done(jobWg)
// if the context is canceled, it means worker has error, the first error can be
// found by worker's error group LATER. if this function returns an error it will
// seize the "first error".
// if the context is canceled, it means worker has error.
return nil
case jobToWorkerCh <- job:
}
Expand Down Expand Up @@ -1077,12 +1072,14 @@ func (local *Backend) generateJobForRange(
}

// startWorker creates a worker that reads from the job channel and processes.
// startWorker will return nil if it's expected to stop, where the only case is
// the context canceled. It will return not nil error when it actively stops.
// startWorker must Done the jobWg if it does not put the job into jobOutCh.
// startWorker will return nil if it's expected to stop, where the cases are all
// jobs are finished or the context canceled because other components report
// error. It will return not nil error when it actively stops. startWorker must
// call job.done() if it does not put the job into jobOutCh.
func (local *Backend) startWorker(
ctx context.Context,
jobInCh, jobOutCh chan *regionJob,
afterExecuteJob func([]*metapb.Peer),
jobWg *sync.WaitGroup,
) error {
metrics.GlobalSortIngestWorkerCnt.WithLabelValues("execute job").Set(0)
Expand All @@ -1092,17 +1089,29 @@ func (local *Backend) startWorker(
return nil
case job, ok := <-jobInCh:
if !ok {
// In fact we don't use close input channel to notify worker to
// exit, because there's a cycle in workflow.
return nil
}

var peers []*metapb.Peer
// in unit test, we may not have the real peers
if job.region != nil && job.region.Region != nil {
peers = job.region.Region.GetPeers()
}
metrics.GlobalSortIngestWorkerCnt.WithLabelValues("execute job").Inc()
err := local.executeJob(ctx, job)
metrics.GlobalSortIngestWorkerCnt.WithLabelValues("execute job").Dec()

if afterExecuteJob != nil {
afterExecuteJob(peers)
}
switch job.stage {
case regionScanned, wrote, ingested:
jobOutCh <- job
select {
case <-ctx.Done():
job.done(jobWg)
return nil
case jobOutCh <- job:
}
case needRescan:
jobs, err2 := local.generateJobForRange(
ctx,
Expand All @@ -1126,7 +1135,12 @@ func (local *Backend) startWorker(
}
for _, j := range jobs {
j.lastRetryableErr = job.lastRetryableErr
jobOutCh <- j
select {
case <-ctx.Done():
j.done(jobWg)
// don't exit here, we mark done for each job and exit in the outer loop
case jobOutCh <- j:
}
}
}

Expand Down Expand Up @@ -1364,63 +1378,100 @@ func (local *Backend) doImport(
regionSplitSize, regionSplitKeyCnt int64,
) error {
/*
[prepareAndSendJob]-----jobToWorkerCh--->[workers]
^ |
| jobFromWorkerCh
| |
| v
[regionJobRetryer]<--[dispatchJobGoroutine]-->done
[prepareAndSendJob]---jobToWorkerCh->[storeBalancer(optional)]->[workers]
^ |
| jobFromWorkerCh
| |
| v
[regionJobRetryer]<-------------[dispatchJobGoroutine]-->done
*/

// Above is the happy path workflow of region jobs. A job is generated by
// prepareAndSendJob and terminated to "done" state by dispatchJobGoroutine. We
// maintain an invariant that the number of generated jobs (after job.ref())
// minus the number of "done" jobs (after job.done()) equals to jobWg. So we can
// use jobWg to wait for all jobs to be finished.
//
// To handle the error case, we still maintain the invariant, but the workflow
// becomes a bit more complex. When an error occurs, the owner components of a
// job need to convert the job to "done" state, or send all its owned jobs to
// next components. The exit order is important because if the next component is
// exited before the owner component, deadlock will happen.
//
// All components are spawned by workGroup so the main goroutine can wait all
// components to exit. Component exit order in happy path is:
//
// 1. prepareAndSendJob is finished, its goroutine will wait all jobs are
// finished by jobWg.Wait(). Then it will exit and close the output channel of
// workers.
//
// 2. one-by-one, when every component see its input channel is closed, it knows
// the workflow is finished. It will exit and (except for workers) close the
// output channel which is the input channel of the next component.
//
// 3. Now all components are exited, the main goroutine can exit after
// workGroup.Wait().
//
// Component exit order in error case is:
//
// 1. The error component exits and causes workGroup's context to be canceled.
//
// 2. All other components will exit because of the canceled context. No need to
// close channels.
//
// 3. the main goroutine can see the error and exit after workGroup.Wait().
var (
ctx2, workerCancel = context.WithCancel(ctx)
// workerCtx.Done() means workflow is canceled by error. It may be caused
// by calling workerCancel() or workers in workGroup meets error.
workGroup, workerCtx = util.NewErrorGroupWithRecoverWithCtx(ctx2)
firstErr common.OnceError
workGroup, workerCtx = util.NewErrorGroupWithRecoverWithCtx(ctx)
// jobToWorkerCh and jobFromWorkerCh are unbuffered so jobs will not be
// owned by them.
jobToWorkerCh = make(chan *regionJob)
jobFromWorkerCh = make(chan *regionJob)
// jobWg tracks the number of jobs in this workflow.
// prepareAndSendJob, workers and regionJobRetryer can own jobs.
// When cancel on error, the goroutine of above three components have
// responsibility to Done jobWg of their owning jobs.
jobWg sync.WaitGroup
dispatchJobGoroutine = make(chan struct{})
jobWg sync.WaitGroup
balancer *storeBalancer
)
defer workerCancel()

// storeBalancer does not have backpressure, it should not be used with external
// engine to avoid OOM.
if _, ok := engine.(*Engine); ok {
balancer = newStoreBalancer(jobToWorkerCh, &jobWg)
workGroup.Go(func() error {
return balancer.run(workerCtx)
})
}

failpoint.Inject("injectVariables", func() {
jobToWorkerCh = testJobToWorkerCh
testJobWg = &jobWg
})

retryer := startRegionJobRetryer(workerCtx, jobToWorkerCh, &jobWg)
retryer := newRegionJobRetryer(workerCtx, jobToWorkerCh, &jobWg)
workGroup.Go(func() error {
retryer.run()
return nil
})

// dispatchJobGoroutine handles processed job from worker, it will only exit
// when jobFromWorkerCh is closed to avoid worker is blocked on sending to
// jobFromWorkerCh.
defer func() {
// use defer to close jobFromWorkerCh after all workers are exited
close(jobFromWorkerCh)
<-dispatchJobGoroutine
}()
go func() {
defer close(dispatchJobGoroutine)
// dispatchJobGoroutine
workGroup.Go(func() error {
var (
job *regionJob
ok bool
)
for {
job, ok := <-jobFromWorkerCh
select {
case <-workerCtx.Done():
return nil
case job, ok = <-jobFromWorkerCh:
}
if !ok {
return
retryer.close()
return nil
}
switch job.stage {
case regionScanned, wrote:
job.retryCount++
if job.retryCount > maxWriteAndIngestRetryTimes {
firstErr.Set(job.lastRetryableErr)
workerCancel()
job.done(&jobWg)
continue
return job.lastRetryableErr
}
// max retry backoff time: 2+4+8+16+30*26=810s
sleepSecond := math.Pow(2, float64(job.retryCount))
Expand All @@ -1444,15 +1495,21 @@ func (local *Backend) doImport(
panic("should not reach here")
}
}
}()
})

failpoint.Inject("skipStartWorker", func() {
failpoint.Goto("afterStartWorker")
})

for i := 0; i < local.WorkerConcurrency; i++ {
workGroup.Go(func() error {
return local.startWorker(workerCtx, jobToWorkerCh, jobFromWorkerCh, &jobWg)
toCh := jobToWorkerCh
var afterExecuteJob func([]*metapb.Peer)
if balancer != nil {
toCh = balancer.innerJobToWorkerCh
afterExecuteJob = balancer.releaseStoreLoad
}
return local.startWorker(workerCtx, toCh, jobFromWorkerCh, afterExecuteJob, &jobWg)
})
}

Expand All @@ -1473,16 +1530,28 @@ func (local *Backend) doImport(
}

jobWg.Wait()
workerCancel()
if balancer != nil {
intest.AssertFunc(func() bool {
allZero := true
balancer.storeLoadMap.Range(func(_, value any) bool {
if value.(int) != 0 {
allZero = false
return false
}
return true
})
return allZero
})
}
close(jobFromWorkerCh)
return nil
})
if err := workGroup.Wait(); err != nil {
if !common.IsContextCanceledError(err) {
log.FromContext(ctx).Error("do import meets error", zap.Error(err))
}
firstErr.Set(err)

err := workGroup.Wait()
if err != nil && !common.IsContextCanceledError(err) {
log.FromContext(ctx).Error("do import meets error", zap.Error(err))
}
return firstErr.Get()
return err
}

// GetImportedKVCount returns the number of imported KV pairs of some engine.
Expand Down
37 changes: 32 additions & 5 deletions pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,7 @@ func TestLocalWriteAndIngestPairsFailFast(t *testing.T) {
jobCh := make(chan *regionJob, 1)
jobCh <- &regionJob{}
jobOutCh := make(chan *regionJob, 1)
err := bak.startWorker(context.Background(), jobCh, jobOutCh, nil)
err := bak.startWorker(context.Background(), jobCh, jobOutCh, nil, nil)
require.Error(t, err)
require.Regexp(t, "the remaining storage capacity of TiKV.*", err.Error())
require.Len(t, jobCh, 0)
Expand Down Expand Up @@ -1293,7 +1293,7 @@ func TestCheckPeersBusy(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
err := local.startWorker(ctx, jobCh, jobOutCh, nil)
err := local.startWorker(ctx, jobCh, jobOutCh, nil, nil)
require.NoError(t, err)
}()

Expand Down Expand Up @@ -1400,7 +1400,7 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
err := local.startWorker(ctx, jobCh, jobOutCh, &jobWg)
err := local.startWorker(ctx, jobCh, jobOutCh, nil, &jobWg)
require.NoError(t, err)
}()

Expand Down Expand Up @@ -1500,7 +1500,7 @@ func TestPartialWriteIngestErrorWontPanic(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
err := local.startWorker(ctx, jobCh, jobOutCh, &jobWg)
err := local.startWorker(ctx, jobCh, jobOutCh, nil, &jobWg)
require.NoError(t, err)
}()

Expand Down Expand Up @@ -1620,7 +1620,7 @@ func TestPartialWriteIngestBusy(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
err := local.startWorker(ctx, jobCh, jobOutCh, &jobWg)
err := local.startWorker(ctx, jobCh, jobOutCh, nil, &jobWg)
require.NoError(t, err)
}()

Expand Down Expand Up @@ -2116,12 +2116,30 @@ func TestRegionJobResetRetryCounter(t *testing.T) {
ingestData: &Engine{},
injected: getNeedRescanWhenIngestBehaviour(),
retryCount: maxWriteAndIngestRetryTimes,
region: &split.RegionInfo{
Region: &metapb.Region{
Peers: []*metapb.Peer{
{Id: 1, StoreId: 1},
{Id: 2, StoreId: 2},
{Id: 3, StoreId: 3},
},
},
},
},
{
keyRange: common.Range{Start: []byte{'c', '2'}, End: []byte{'d'}},
ingestData: &Engine{},
injected: getSuccessInjectedBehaviour(),
retryCount: maxWriteAndIngestRetryTimes,
region: &split.RegionInfo{
Region: &metapb.Region{
Peers: []*metapb.Peer{
{Id: 4, StoreId: 4},
{Id: 5, StoreId: 5},
{Id: 6, StoreId: 6},
},
},
},
},
},
},
Expand All @@ -2131,6 +2149,15 @@ func TestRegionJobResetRetryCounter(t *testing.T) {
keyRange: common.Range{Start: []byte{'c'}, End: []byte{'c', '2'}},
ingestData: &Engine{},
injected: getSuccessInjectedBehaviour(),
region: &split.RegionInfo{
Region: &metapb.Region{
Peers: []*metapb.Peer{
{Id: 7, StoreId: 7},
{Id: 8, StoreId: 8},
{Id: 9, StoreId: 9},
},
},
},
},
},
},
Expand Down
Loading

0 comments on commit 918b58d

Please sign in to comment.