Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into concurrency_merge…
Browse files Browse the repository at this point in the history
…_topn
  • Loading branch information
Yisaer committed Oct 14, 2022
2 parents 842a2fd + 61eed5c commit 187a6a2
Show file tree
Hide file tree
Showing 147 changed files with 3,440 additions and 957 deletions.
20 changes: 10 additions & 10 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2790,8 +2790,8 @@ def go_deps():
name = "com_github_pingcap_errors",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/errors",
sum = "h1:xpW9bvK+HuuTmyFqUwr+jcCvpVkK7sumiz+ko5H9eq4=",
version = "v0.11.5-0.20211224045212-9687c2b0f87c",
sum = "h1:3Dm0DWeQlwV8LbpQxP2tojHhxd9aY59KI+QN0ns6bBo=",
version = "v0.11.5-0.20220729040631-518f63d66278",
)
go_repository(
name = "com_github_pingcap_failpoint",
Expand All @@ -2818,8 +2818,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:/92S0s/TCoCmK2vv6WbkXNeqtLn90sHRJ5Vlx1Sigas=",
version = "v0.0.0-20220913025519-586cff113d10",
sum = "h1:ceg4xjEEXNgPsScTQ5dtidiltLF4h17Y/jUqfyLAy9E=",
version = "v0.0.0-20220929075948-06e08d5ed64c",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -3422,15 +3422,15 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:TxDSQAmtGdE34BvOaYF35mRrAXePeZEq8quvuAwrKsI=",
version = "v2.0.1-0.20220923061703-33efe476e022",
sum = "h1:/13jzD/AR7v3dCLweFQ2JG8bihh3HLVIci2tbOHHGW0=",
version = "v2.0.1-0.20221012074856-6def8d7b90c4",
)
go_repository(
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sum = "h1:r1eMh9Rny3hfWuBuxOnbsCRrR4FhthiNxLQ5rAUtaww=",
version = "v0.0.0-20220725055910-7187a7ab72db",
sum = "h1:REQOR1XraH1fT9BCoNBPZs1CAe+w7VPLU+d+si7DLYo=",
version = "v0.0.0-20221010134149-d50e5fe43f14",
)
go_repository(
name = "com_github_timakin_bodyclose",
Expand Down Expand Up @@ -4250,8 +4250,8 @@ def go_deps():
name = "org_golang_x_crypto",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/crypto",
sum = "h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=",
version = "v0.0.0-20210921155107-089bfa567519",
sum = "h1:kUhD7nTDoI3fVd9G4ORWrbV5NY0liEs/Jg2pv5f+bBA=",
version = "v0.0.0-20220411220226-7b82a4e95df4",
)
go_repository(
name = "org_golang_x_exp",
Expand Down
2 changes: 1 addition & 1 deletion Makefile.common
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,5 @@ TEST_COVERAGE_DIR := "test_coverage"

ifneq ("$(CI)", "0")
BAZEL_GLOBAL_CONFIG := --output_user_root=/home/jenkins/.tidb/tmp
BAZEL_CMD_CONFIG := --config=ci
BAZEL_CMD_CONFIG := --config=ci --repository_cache=/home/jenkins/.tidb/tmp
endif
1 change: 1 addition & 0 deletions bindinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_test(
],
embed = [":bindinfo"],
flaky = True,
race = "on",
shard_count = 50,
deps = [
"//config",
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package glue
import (
"context"

"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -42,7 +43,7 @@ type Session interface {
Execute(ctx context.Context, sql string) error
ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error
CreateDatabase(ctx context.Context, schema *model.DBInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error
CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error
Close()
GetGlobalVariable(name string) (string, error)
Expand All @@ -51,7 +52,7 @@ type Session interface {

// BatchCreateTableSession is an interface to batch create table parallelly
type BatchCreateTableSession interface {
CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error
CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error
}

// Progress is an interface recording the current execution progress.
Expand Down
13 changes: 7 additions & 6 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.
}

// CreateTables implements glue.BatchCreateTableSession.
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error {
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
d := domain.GetDomain(gs.se).DDL()
var dbName model.CIStr

Expand All @@ -231,7 +231,7 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo
cloneTables = append(cloneTables, table)
}
gs.se.SetValue(sessionctx.QueryString, queryBuilder.String())
err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, ddl.OnExistIgnore)
err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, append(cs, ddl.OnExistIgnore)...)
if err != nil {
//It is possible to failure when TiDB does not support model.ActionCreateTables.
//In this circumstance, BatchCreateTableWithInfo returns errno.ErrInvalidDDLJob,
Expand All @@ -245,7 +245,7 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo
}

// CreateTable implements glue.Session.
func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error {
func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
d := domain.GetDomain(gs.se).DDL()
query, err := gs.showCreateTable(table)
if err != nil {
Expand All @@ -259,7 +259,8 @@ func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, tabl
newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...)
table.Partition = &newPartition
}
return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore)

return d.CreateTableWithInfo(gs.se, dbName, table, append(cs, ddl.OnExistIgnore)...)
}

// Close implements glue.Session.
Expand Down Expand Up @@ -349,13 +350,13 @@ func (s *mockSession) CreatePlacementPolicy(ctx context.Context, policy *model.P
}

// CreateTables implements glue.BatchCreateTableSession.
func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error {
func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// CreateTable implements glue.Session.
func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error {
func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}
Expand Down
17 changes: 16 additions & 1 deletion br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func InterpolateMySQLString(s string) string {
}

// TableExists return whether table with specified name exists in target db
func TableExists(ctx context.Context, db *sql.DB, schema, table string) (bool, error) {
func TableExists(ctx context.Context, db utils.QueryExecutor, schema, table string) (bool, error) {
query := "SELECT 1 from INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?"
var exist string
err := db.QueryRowContext(ctx, query, schema, table).Scan(&exist)
Expand All @@ -309,6 +309,21 @@ func TableExists(ctx context.Context, db *sql.DB, schema, table string) (bool, e
}
}

// SchemaExists return whether schema with specified name exists.
func SchemaExists(ctx context.Context, db utils.QueryExecutor, schema string) (bool, error) {
query := "SELECT 1 from INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?"
var exist string
err := db.QueryRowContext(ctx, query, schema).Scan(&exist)
switch {
case err == nil:
return true, nil
case err == sql.ErrNoRows:
return false, nil
default:
return false, errors.Annotatef(err, "check schema exists failed")
}
}

// GetJSON fetches a page and parses it as JSON. The parsed result will be
// stored into the `v`. The variable `v` must be a pointer to a type that can be
// unmarshalled from JSON.
Expand Down
28 changes: 26 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,15 @@ type restoreSchemaWorker struct {
func (worker *restoreSchemaWorker) addJob(sqlStr string, job *schemaJob) error {
stmts, err := createIfNotExistsStmt(worker.glue.GetParser(), sqlStr, job.dbName, job.tblName)
if err != nil {
return err
worker.logger.Warn("failed to rewrite statement, will use raw input instead",
zap.String("db", job.dbName),
zap.String("table", job.tblName),
zap.String("statement", sqlStr),
zap.Error(err))
job.stmts = []string{sqlStr}
} else {
job.stmts = stmts
}
job.stmts = stmts
return worker.appendJob(job)
}

Expand Down Expand Up @@ -656,7 +662,25 @@ loop:
for _, stmt := range job.stmts {
task := logger.Begin(zap.DebugLevel, fmt.Sprintf("execute SQL: %s", stmt))
err = sqlWithRetry.Exec(worker.ctx, "run create schema job", stmt)
if err != nil {
// try to imitate IF NOT EXISTS behavior for parsing errors
exists := false
switch job.stmtType {
case schemaCreateDatabase:
var err2 error
exists, err2 = common.SchemaExists(worker.ctx, session, job.dbName)
if err2 != nil {
task.Error("failed to check database existence", zap.Error(err2))
}
case schemaCreateTable:
exists, _ = common.TableExists(worker.ctx, session, job.dbName, job.tblName)
}
if exists {
err = nil
}
}
task.End(zap.ErrorLevel, err)

if err != nil {
err = common.ErrCreateSchema.Wrap(err).GenWithStackByArgs(common.UniqueTable(job.dbName, job.tblName), job.stmtType.String())
worker.wg.Done()
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ go_test(
],
embed = [":restore"],
flaky = True,
race = "on",
shard_count = 20,
deps = [
"//br/pkg/backup",
"//br/pkg/conn",
Expand Down Expand Up @@ -141,6 +143,7 @@ go_test(
"//types",
"//util/codec",
"//util/mathutil",
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_golang_protobuf//proto",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
38 changes: 29 additions & 9 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
tidalloc "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
"github.com/pingcap/tidb/br/pkg/rtree"
Expand Down Expand Up @@ -173,6 +174,9 @@ type Client struct {

// see RestoreCommonConfig.WithSysTable
withSysTable bool

// the successfully preallocated table IDs.
preallocedTableIDs *tidalloc.PreallocIDs
}

// NewRestoreClient returns a new RestoreClient.
Expand Down Expand Up @@ -237,6 +241,26 @@ func (rc *Client) Init(g glue.Glue, store kv.Storage) error {
return errors.Trace(err)
}

func (rc *Client) allocTableIDs(ctx context.Context, tables []*metautil.Table) error {
rc.preallocedTableIDs = tidalloc.New(tables)
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR)
err := kv.RunInNewTxn(ctx, rc.GetDomain().Store(), true, func(_ context.Context, txn kv.Transaction) error {
return rc.preallocedTableIDs.Alloc(meta.NewMeta(txn))
})
if err != nil {
return err
}

log.Info("registering the table IDs", zap.Stringer("ids", rc.preallocedTableIDs))
for i := range rc.dbPool {
rc.dbPool[i].registerPreallocatedIDs(rc.preallocedTableIDs)
}
if rc.db != nil {
rc.db.registerPreallocatedIDs(rc.preallocedTableIDs)
}
return nil
}

// SetPlacementPolicyMode to policy mode.
func (rc *Client) SetPlacementPolicyMode(withPlacementPolicy string) {
switch strings.ToUpper(withPlacementPolicy) {
Expand Down Expand Up @@ -274,15 +298,6 @@ func (rc *Client) GetSupportPolicy() bool {
return rc.supportPolicy
}

// GetTruncateSafepoint read the truncate checkpoint from the storage bind to the client.
func (rc *Client) GetTruncateSafepoint(ctx context.Context) (uint64, error) {
ts, err := GetTSFromFile(ctx, rc.storage, TruncateSafePointFileName)
if err != nil {
log.Warn("failed to get truncate safepoint, using 0", logutil.ShortError(err))
}
return ts, err
}

func (rc *Client) GetDomain() *domain.Domain {
return rc.dom
}
Expand Down Expand Up @@ -733,6 +748,11 @@ func (rc *Client) GoCreateTables(
}
outCh := make(chan CreatedTable, len(tables))
rater := logutil.TraceRateOver(logutil.MetricTableCreatedCounter)
if err := rc.allocTableIDs(ctx, tables); err != nil {
errCh <- err
close(outCh)
return outCh
}

var err error

Expand Down
26 changes: 23 additions & 3 deletions br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/metautil"
prealloctableid "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
Expand All @@ -24,7 +26,8 @@ import (

// DB is a TiDB instance, not thread-safe.
type DB struct {
se glue.Session
se glue.Session
preallocedIDs *prealloctableid.PreallocIDs
}

type UniqueTableName struct {
Expand Down Expand Up @@ -78,6 +81,10 @@ func NewDB(g glue.Glue, store kv.Storage, policyMode string) (*DB, bool, error)
}, supportPolicy, nil
}

func (db *DB) registerPreallocatedIDs(ids *prealloctableid.PreallocIDs) {
db.preallocedIDs = ids
}

// ExecDDL executes the query of a ddl job.
func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
var err error
Expand Down Expand Up @@ -272,6 +279,19 @@ func (db *DB) CreateTablePostRestore(ctx context.Context, table *metautil.Table,
return nil
}

func (db *DB) tableIDAllocFilter() ddl.AllocTableIDIf {
return func(ti *model.TableInfo) bool {
if db.preallocedIDs == nil {
return true
}
prealloced := db.preallocedIDs.Prealloced(ti.ID)
if prealloced {
log.Info("reusing table ID", zap.Stringer("table", ti.Name))
}
return !prealloced
}
}

// CreateTables execute a internal CREATE TABLES.
func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table,
ddlTables map[UniqueTableName]bool, supportPolicy bool, policyMap *sync.Map) error {
Expand All @@ -289,7 +309,7 @@ func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table,
}
}
}
if err := batchSession.CreateTables(ctx, m); err != nil {
if err := batchSession.CreateTables(ctx, m, db.tableIDAllocFilter()); err != nil {
return err
}

Expand All @@ -316,7 +336,7 @@ func (db *DB) CreateTable(ctx context.Context, table *metautil.Table,
}
}

err := db.se.CreateTable(ctx, table.DB.Name, table.Info)
err := db.se.CreateTable(ctx, table.DB.Name, table.Info, db.tableIDAllocFilter())
if err != nil {
log.Error("create table failed",
zap.Stringer("db", table.DB.Name),
Expand Down
Loading

0 comments on commit 187a6a2

Please sign in to comment.