Skip to content

Commit

Permalink
Merge branch 'master' into small-copr-opt
Browse files Browse the repository at this point in the history
  • Loading branch information
you06 committed Oct 14, 2022
2 parents c0bb7b8 + 3ef8352 commit 51e4eac
Show file tree
Hide file tree
Showing 37 changed files with 727 additions and 93 deletions.
1 change: 1 addition & 0 deletions bindinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_test(
],
embed = [":bindinfo"],
flaky = True,
race = "on",
shard_count = 50,
deps = [
"//config",
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package glue
import (
"context"

"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -42,7 +43,7 @@ type Session interface {
Execute(ctx context.Context, sql string) error
ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error
CreateDatabase(ctx context.Context, schema *model.DBInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error
CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error
Close()
GetGlobalVariable(name string) (string, error)
Expand All @@ -51,7 +52,7 @@ type Session interface {

// BatchCreateTableSession is an interface to batch create table parallelly
type BatchCreateTableSession interface {
CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error
CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error
}

// Progress is an interface recording the current execution progress.
Expand Down
13 changes: 7 additions & 6 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.
}

// CreateTables implements glue.BatchCreateTableSession.
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error {
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
d := domain.GetDomain(gs.se).DDL()
var dbName model.CIStr

Expand All @@ -231,7 +231,7 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo
cloneTables = append(cloneTables, table)
}
gs.se.SetValue(sessionctx.QueryString, queryBuilder.String())
err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, ddl.OnExistIgnore)
err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, append(cs, ddl.OnExistIgnore)...)
if err != nil {
//It is possible to failure when TiDB does not support model.ActionCreateTables.
//In this circumstance, BatchCreateTableWithInfo returns errno.ErrInvalidDDLJob,
Expand All @@ -245,7 +245,7 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo
}

// CreateTable implements glue.Session.
func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error {
func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
d := domain.GetDomain(gs.se).DDL()
query, err := gs.showCreateTable(table)
if err != nil {
Expand All @@ -259,7 +259,8 @@ func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, tabl
newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...)
table.Partition = &newPartition
}
return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore)

return d.CreateTableWithInfo(gs.se, dbName, table, append(cs, ddl.OnExistIgnore)...)
}

// Close implements glue.Session.
Expand Down Expand Up @@ -349,13 +350,13 @@ func (s *mockSession) CreatePlacementPolicy(ctx context.Context, policy *model.P
}

// CreateTables implements glue.BatchCreateTableSession.
func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error {
func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// CreateTable implements glue.Session.
func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error {
func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ go_test(
"//types",
"//util/codec",
"//util/mathutil",
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_golang_protobuf//proto",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
29 changes: 29 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
tidalloc "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
"github.com/pingcap/tidb/br/pkg/rtree"
Expand Down Expand Up @@ -173,6 +174,9 @@ type Client struct {

// see RestoreCommonConfig.WithSysTable
withSysTable bool

// the successfully preallocated table IDs.
preallocedTableIDs *tidalloc.PreallocIDs
}

// NewRestoreClient returns a new RestoreClient.
Expand Down Expand Up @@ -237,6 +241,26 @@ func (rc *Client) Init(g glue.Glue, store kv.Storage) error {
return errors.Trace(err)
}

func (rc *Client) allocTableIDs(ctx context.Context, tables []*metautil.Table) error {
rc.preallocedTableIDs = tidalloc.New(tables)
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR)
err := kv.RunInNewTxn(ctx, rc.GetDomain().Store(), true, func(_ context.Context, txn kv.Transaction) error {
return rc.preallocedTableIDs.Alloc(meta.NewMeta(txn))
})
if err != nil {
return err
}

log.Info("registering the table IDs", zap.Stringer("ids", rc.preallocedTableIDs))
for i := range rc.dbPool {
rc.dbPool[i].registerPreallocatedIDs(rc.preallocedTableIDs)
}
if rc.db != nil {
rc.db.registerPreallocatedIDs(rc.preallocedTableIDs)
}
return nil
}

// SetPlacementPolicyMode to policy mode.
func (rc *Client) SetPlacementPolicyMode(withPlacementPolicy string) {
switch strings.ToUpper(withPlacementPolicy) {
Expand Down Expand Up @@ -724,6 +748,11 @@ func (rc *Client) GoCreateTables(
}
outCh := make(chan CreatedTable, len(tables))
rater := logutil.TraceRateOver(logutil.MetricTableCreatedCounter)
if err := rc.allocTableIDs(ctx, tables); err != nil {
errCh <- err
close(outCh)
return outCh
}

var err error

Expand Down
26 changes: 23 additions & 3 deletions br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/metautil"
prealloctableid "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
Expand All @@ -24,7 +26,8 @@ import (

// DB is a TiDB instance, not thread-safe.
type DB struct {
se glue.Session
se glue.Session
preallocedIDs *prealloctableid.PreallocIDs
}

type UniqueTableName struct {
Expand Down Expand Up @@ -78,6 +81,10 @@ func NewDB(g glue.Glue, store kv.Storage, policyMode string) (*DB, bool, error)
}, supportPolicy, nil
}

func (db *DB) registerPreallocatedIDs(ids *prealloctableid.PreallocIDs) {
db.preallocedIDs = ids
}

// ExecDDL executes the query of a ddl job.
func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
var err error
Expand Down Expand Up @@ -272,6 +279,19 @@ func (db *DB) CreateTablePostRestore(ctx context.Context, table *metautil.Table,
return nil
}

func (db *DB) tableIDAllocFilter() ddl.AllocTableIDIf {
return func(ti *model.TableInfo) bool {
if db.preallocedIDs == nil {
return true
}
prealloced := db.preallocedIDs.Prealloced(ti.ID)
if prealloced {
log.Info("reusing table ID", zap.Stringer("table", ti.Name))
}
return !prealloced
}
}

// CreateTables execute a internal CREATE TABLES.
func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table,
ddlTables map[UniqueTableName]bool, supportPolicy bool, policyMap *sync.Map) error {
Expand All @@ -289,7 +309,7 @@ func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table,
}
}
}
if err := batchSession.CreateTables(ctx, m); err != nil {
if err := batchSession.CreateTables(ctx, m, db.tableIDAllocFilter()); err != nil {
return err
}

Expand All @@ -316,7 +336,7 @@ func (db *DB) CreateTable(ctx context.Context, table *metautil.Table,
}
}

err := db.se.CreateTable(ctx, table.DB.Name, table.Info)
err := db.se.CreateTable(ctx, table.DB.Name, table.Info, db.tableIDAllocFilter())
if err != nil {
log.Error("create table failed",
zap.Stringer("db", table.DB.Name),
Expand Down
77 changes: 77 additions & 0 deletions br/pkg/restore/prealloc_table_id/alloc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

package prealloctableid

import (
"fmt"
"math"

"github.com/pingcap/tidb/br/pkg/metautil"
)

// Allocator is the interface needed to allocate table IDs.
type Allocator interface {
GetGlobalID() (int64, error)
AdvanceGlobalIDs(n int) (int64, error)
}

// PreallocIDs mantains the state of preallocated table IDs.
type PreallocIDs struct {
end int64

allocedFrom int64
}

// New collects the requirement of prealloc IDs and return a
// not-yet-allocated PreallocIDs.
func New(tables []*metautil.Table) *PreallocIDs {
if len(tables) == 0 {
return &PreallocIDs{
allocedFrom: math.MaxInt64,
}
}

max := int64(0)

for _, t := range tables {
if t.Info.ID > max {
max = t.Info.ID
}
}
return &PreallocIDs{
end: max + 1,

allocedFrom: math.MaxInt64,
}
}

// String implements fmt.Stringer.
func (p *PreallocIDs) String() string {
if p.allocedFrom >= p.end {
return fmt.Sprintf("ID:empty(end=%d)", p.end)
}
return fmt.Sprintf("ID:[%d,%d)", p.allocedFrom, p.end)
}

// preallocTableIDs peralloc the id for [start, end)
func (p *PreallocIDs) Alloc(m Allocator) error {
currentId, err := m.GetGlobalID()
if err != nil {
return err
}
if currentId > p.end {
return nil
}

alloced, err := m.AdvanceGlobalIDs(int(p.end - currentId))
if err != nil {
return err
}
p.allocedFrom = alloced
return nil
}

// Prealloced checks whether a table ID has been successfully allocated.
func (p *PreallocIDs) Prealloced(tid int64) bool {
return p.allocedFrom <= tid && tid < p.end
}
85 changes: 85 additions & 0 deletions br/pkg/restore/prealloc_table_id/alloc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

package prealloctableid_test

import (
"fmt"
"testing"

"github.com/pingcap/tidb/br/pkg/metautil"
prealloctableid "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id"
"github.com/pingcap/tidb/parser/model"
"github.com/stretchr/testify/require"
)

type testAllocator int64

func (t *testAllocator) GetGlobalID() (int64, error) {
return int64(*t), nil
}

func (t *testAllocator) AdvanceGlobalIDs(n int) (int64, error) {
old := int64(*t)
*t = testAllocator(int64(*t) + int64(n))
return old, nil
}

func TestAllocator(t *testing.T) {
type Case struct {
tableIDs []int64
hasAllocatedTo int64
successfullyAllocated []int64
shouldAllocatedTo int64
}

cases := []Case{
{
tableIDs: []int64{1, 2, 5, 6, 7},
hasAllocatedTo: 6,
successfullyAllocated: []int64{6, 7},
shouldAllocatedTo: 8,
},
{
tableIDs: []int64{4, 6, 9, 2},
hasAllocatedTo: 1,
successfullyAllocated: []int64{2, 4, 6, 9},
shouldAllocatedTo: 10,
},
{
tableIDs: []int64{1, 2, 3, 4},
hasAllocatedTo: 5,
successfullyAllocated: []int64{},
shouldAllocatedTo: 5,
},
}

run := func(t *testing.T, c Case) {
tables := make([]*metautil.Table, 0, len(c.tableIDs))
for _, id := range c.tableIDs {
tables = append(tables, &metautil.Table{
Info: &model.TableInfo{
ID: id,
},
})
}

ids := prealloctableid.New(tables)
allocator := testAllocator(c.hasAllocatedTo)
require.NoError(t, ids.Alloc(&allocator))

allocated := make([]int64, 0, len(c.successfullyAllocated))
for _, t := range c.tableIDs {
if ids.Prealloced(t) {
allocated = append(allocated, t)
}
}
require.ElementsMatch(t, allocated, c.successfullyAllocated)
require.Equal(t, int64(allocator), c.shouldAllocatedTo)
}

for i, c := range cases {
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
run(t, c)
})
}
}
Loading

0 comments on commit 51e4eac

Please sign in to comment.