Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: Weizhen Wang <[email protected]>
  • Loading branch information
hawkingrei committed Oct 8, 2024
1 parent a761f5a commit c43239a
Showing 1 changed file with 56 additions and 38 deletions.
94 changes: 56 additions & 38 deletions pkg/executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ package executor
import (
"archive/zip"
"bytes"
"cmp"
"context"
"encoding/json"
"fmt"
"os"
"slices"
"strings"

"github.com/BurntSushi/toml"
Expand All @@ -38,6 +36,7 @@ import (
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/replayer"
"go.uber.org/zap"
Expand Down Expand Up @@ -484,52 +483,68 @@ func unzip(z *zip.File) (string, error) {
return bufa.String(), err
}

func compareRestoreTable(a, b *zip.File) int {
originTexta, err := unzip(a)
if err == nil {
return 0
}
originTextb, err := unzip(b)
if err == nil {
return 0
}
af := strings.Contains(originTexta, "FOREIGN KEY")
bf := strings.Contains(originTextb, "FOREIGN KEY")
if af && !bf {
return 1
} else if !af && bf {
return -1
} else if af && bf {
astmt, _, err := parser.New().ParseSQL(originTexta)
func getTableNameFromZipFilename(name string) string {
// dbName.tableName.schema.txt
s := strings.Split(name, ".")
intest.Assert(len(s) == 4, "invalid zip file name")
return s[0] + "." + s[1]
}

func topoSortTable(input []*zip.File) (result []*zip.File, err error) {
outMap := make(map[string]int) // table -> out degree
stmtCache := make(map[string]*ast.CreateTableStmt)
zipMap := make(map[string]*zip.File)
for _, f := range input {
originText, err := unzip(f)
if err != nil {
return 0
return nil, err
}
tableName := getTableNameFromZipFilename(f.Name)
zipMap[tableName] = f
if !strings.Contains(originText, "FOREIGN KEY") {
outMap[tableName] = 0
continue
}
bstmt, _, err := parser.New().ParseSQL(originTextb)
stmt, _, err := parser.New().ParseSQL(originText)
if err != nil {
return 0
return nil, err
}
at := astmt[len(astmt)-1].(*ast.CreateTableStmt)
bt := bstmt[len(bstmt)-1].(*ast.CreateTableStmt)
aFKCount, bFkCount := 0, 0
for _, c := range at.Constraints {
outMap[tableName] = 0
stmtCache[tableName] = stmt[len(stmt)-1].(*ast.CreateTableStmt)
for _, c := range stmtCache[tableName].Constraints {
if c.Tp == ast.ConstraintForeignKey {
aFKCount++
if c.Refer.Table.Name.L == bt.Table.Name.L {
return 1
}
outMap[tableName]++
}
}
for _, c := range bt.Constraints {
if c.Tp == ast.ConstraintForeignKey {
bFkCount++
if c.Refer.Table.Name.L == at.Table.Name.L {
return -1
}
cnt := 0
taskCount := len(outMap)
OUTLOOP:
for len(outMap) != 0 {
cnt++
if cnt > taskCount {
return nil, errors.New("plan replayer: unknown table")
}
for k, v := range outMap {
if v == 0 {
delete(outMap, k)
result = append(result, zipMap[k])
for name, stats := range stmtCache {
if outMap[name] == 0 {
continue
}
for _, c := range stats.Constraints {
if c.Tp == ast.ConstraintForeignKey &&
c.Refer.Table.Schema.L+"."+c.Refer.Table.Name.L == k {
outMap[name]--
}
}
}
continue OUTLOOP
}
}
return cmp.Compare(aFKCount, bFkCount)
}
return 0
return result, nil
}

// Update updates the data of the corresponding table.
Expand Down Expand Up @@ -557,7 +572,10 @@ func (e *PlanReplayerLoadInfo) Update(data []byte) error {
fss = append(fss, zipFile)
}
}
slices.SortStableFunc(fss, compareRestoreTable)
fss, err = topoSortTable(fss)
if err != nil {
return err
}
for _, f := range fss {
err = createSchemaAndItems(e.Ctx, f)
if err != nil {
Expand Down

0 comments on commit c43239a

Please sign in to comment.