Skip to content

Commit

Permalink
checker(dm): add a worker pool to concurrently working (#7796)
Browse files Browse the repository at this point in the history
ref #4287
  • Loading branch information
lance6716 authored Dec 8, 2022
1 parent aac0f64 commit 2f65729
Show file tree
Hide file tree
Showing 4 changed files with 421 additions and 193 deletions.
79 changes: 50 additions & 29 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,35 +169,6 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er
return nil, egErr
}

if _, ok := c.checkingItems[config.LightningFreeSpaceChecking]; ok &&
c.stCfgs[0].LoaderConfig.ImportMode == config.LoadModePhysical &&
c.stCfgs[0].Mode != config.ModeIncrement {
// TODO: concurrently read it intra-source later
for idx := range c.instances {
i := idx
eg.Go(func() error {
for _, sourceTables := range tableMapPerUpstream[i] {
for _, sourceTable := range sourceTables {
size, err2 := conn.FetchTableEstimatedBytes(
ctx,
c.instances[i].sourceDB,
sourceTable.Schema,
sourceTable.Name,
)
if err2 != nil {
return err2
}
info.totalDataSize.Add(size)
}
}
return nil
})
}
}
if egErr := eg.Wait(); egErr != nil {
return nil, egErr
}

info.targetTable2ExtendedColumns = extendedColumnPerTable
info.targetTable2SourceTablesMap = make(map[filter.Table]map[string][]filter.Table)
info.targetTableShardNum = make(map[filter.Table]int)
Expand Down Expand Up @@ -226,6 +197,8 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er
info.sourceID2SourceTables = make(map[string][]filter.Table, len(c.instances))
info.sourceID2InterestedDB = make([]map[string]struct{}, len(c.instances))
info.sourceID2TableMap = make(map[string]map[filter.Table][]filter.Table, len(c.instances))
sourceIDs := make([]string, 0, len(c.instances))
dbs := make(map[string]*conn.BaseDB, len(c.instances))
for i, inst := range c.instances {
sourceID := inst.cfg.SourceID
info.sourceID2InterestedDB[i] = make(map[string]struct{})
Expand All @@ -237,7 +210,55 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er
info.sourceID2InterestedDB[i][table.Schema] = struct{}{}
}
}
sourceIDs = append(sourceIDs, sourceID)
dbs[sourceID] = inst.sourceDB
}

if _, ok := c.checkingItems[config.LightningFreeSpaceChecking]; ok &&
c.stCfgs[0].LoaderConfig.ImportMode == config.LoadModePhysical &&
c.stCfgs[0].Mode != config.ModeIncrement {
concurrency, err := checker.GetConcurrency(ctx, sourceIDs, dbs, c.stCfgs[0].MydumperConfig.Threads)
if err != nil {
return nil, err
}

type job struct {
db *conn.BaseDB
schema string
table string
}

pool := checker.NewWorkerPoolWithContext[job, int64](ctx, func(result int64) {
info.totalDataSize.Add(result)
})
for i := 0; i < concurrency; i++ {
pool.Go(func(ctx context.Context, job job) (int64, error) {
return conn.FetchTableEstimatedBytes(
ctx,
job.db,
job.schema,
job.table,
)
})
}

for idx := range c.instances {
for _, sourceTables := range tableMapPerUpstream[idx] {
for _, sourceTable := range sourceTables {
pool.PutJob(job{
db: c.instances[idx].sourceDB,
schema: sourceTable.Schema,
table: sourceTable.Name,
})
}
}
}
err2 := pool.Wait()
if err2 != nil {
return nil, err2
}
}

return info, nil
}

Expand Down
Loading

0 comments on commit 2f65729

Please sign in to comment.