Skip to content

Commit

Permalink
br: batch ddl for binlog (#31028)
Browse files Browse the repository at this point in the history
* executor: migrate test-infra to testify for executor/show_stats_test.go (#30000)

* ddl/table_split_test.go: migrate test-infra to testify (#29960)

* *: fix unstable test caused by TestRandomPanicAggConsume (#30035)

* ddl/ddl_test.go: refactor testSchemaInfo to return error #29964 (#30008)

* executor: remove useless parameter (#30043)

* executor: fix error msg of granting non-table level privilege (#29321)

* server: fix bug https://asktug.com/t/topic/213082/11 (#29577)

* *: replace compareDatum by compare (#30048)

* executor, store: replace compareDatum by compare (#30044)

* executor: migrate test-infra to testify for executor/write_test.go (#29953)

* planner: add sub plan info of shuffleReceiver when query explain analyze (#27992)

* expression: fix wrong flen for CastAsString funtion (#29563)

* br: migrate test-infra to testify for redact (#29747)

* ddl: stable create expression index test (#29990)

* *: replace compareDatum by compare (#30060)

* planner: fix the issue that binding cannot work when sql_select_limit is enabled  (#29789)

* br : migrate test-infra to testify for lightning/backend/tidb/tidb_test.go (#30042)

* *: fix unstable test in placement gc cases (#30045)

* lightning: let ignore columns be compatible with tidb backend (#27850)

* statistics: replace compareDatum by compare (#30052)

* br/pkg/mock: migrate test-infra to testify (#30034)

* ddl: forbit alter table cache in system db (#29998)

* topsql: distinguish the row and index operation type (#29044)

* ddl, util/codec: fix add index OOM and prevent panic in logging (#29925)

* br/pkg/utils: migrate tests to testify (#30032)

* expression, sessionctx: support rand_seed1 and rand_seed2 sysvar (#29936)

* *: track the memory usage of IndexJoin more accurate (#29068)

* br : migrate test-infra to testify for lightning/backend/importer (#30073)

* planner: fix panic when the join key is scalarFunction (#30002)

* ddl: fix none info of `alter table placement` (#29929)

* expression: cast charset according to the function's resulting charset (#29905)

* *: replace compareDatum by compare and fix compare (#30090)

* store/driver: Use BatchGet method supported in client-go. (#29860)

* privilege: disable role privilege when it is revoked from an user (#30028)

* partition: Show partition reformat (#29945)

* planner: fix the unstable test `TestPartitionWithVariedDatasources` (#30139)

* server: Add a `MockConn` to enable testing connections (#30119)

* *: skip unstable test (#29433)

* skip test

Signed-off-by: yisaer <[email protected]>

* skip unstable test

Signed-off-by: yisaer <[email protected]>

* table/tables: add `StateRemote` interface for the cached table (#29152)

* planner: rebuild range when the range is empty (#30003)

* planner: support dump file for trace plan statement (#30059)

* planner: make clear for MaybeOverOptimized4PlanCache (#29782)

* update: fix the updatable table name resolution in build update list (#30061)

* expression : prevent function DATE_ADD/SUB_STRING_XXX pushed down to TiFlash (#30154)

* *: remove unused profile memory tracker from global tracker (#30143)

* expression, util/codec: fix wrongly eliminated conditions caused by `HashCode()` collision (#30120)

* cmd, expression: fix convert binary string to gbk (#30004)

* sessionctx/variable: make lc_time_names read only (#30084)

* expression: fix misuse of datumsToConstants in GBK test (#30072)

* plugin: add more tests to cover audit logs (#30165)

* planner: consider prefix index column length in skyline pruning (#27527)

* server: support download optimize trace file (#30150)

* expression: add ut config (#30156)

* ddl/ddl_test.go: refactor testTableInfo to return error (#30069)

* *: skip a case in TestClusterTables/ForClusterServerInfo on mac M1 (#29933)

* planner: do not add extra limit when handle the execute stmt (#30152)

* ddl: migrate test-infra to testify for db_cache_test.go (#30117)

* tables: fix data race in mockStatRemoteData (#30180)

* server: Port missing in processlist (#30183)

* server: add build not race flag (#30184)

* unistore: get/batchGet/scan support read-through-lock (#29898)

* topsql: fix nil pointer panic in stmtctx (#30181)

* types: fix wrong str_to_date() microseconds with leading zeros are not converted correctly (#30122)

* dumpling: Add support for `Create Placement` (#29724)

* *: add cardinality estimation trace for `Selectivity` (#29883)

* *: replace compareDatum by compare and ignore warnings is collate is empty (#30105)

* planner: implement aggregation eliminate optimize trace (#30114)

* server: Combined fix for authentication issues (#29738)

* executer: fix data race (#30186)

* dumpling : add a function for the variable call of dm (#30033)

* planner: revise optimize trace logic (#30163)

* statistics: fix unstable test case TestTraceCE (#30235)

* lightning: avoid retry write rows and record duplicate rows in tidb backend (#30179)

* executor: send a task with error to the resultCh when panic happen (#30214)

* docs: update placement policy limits (#30202)

* expression: Support GBK for builtin function AesEncrypt. (#29946)

* executor: remove useless log (#30248)

* docs: Update 2021-09-29-secure-bootstrap.md (#30243)

* executor: fix unstable test of topsql (#30257)

* copr: add paging API for streaming-like process (#29612)

* *: forbid set TiFlash Replica for a table with unsupport charset (#30162)

* privilege, session, server: consistently map user login to identity (#30204)

* expression: fix flen for timestamp_add (#30213)

* types: check values equals to NaN or Inf when convert to float (#30148)

* parser: respect TiDB comment when DROP INDEX IF EXISTS (#30173)

* ddl: disallow change columns from zero int to time (#25728)

* * : statement summary should know the slow write is blocked on read lock lease (#30166)

* planner: regard NULL as point when accessing composite index (#30244)

* planner: Add trace for proj elimination rule (#30275)

* privilege,session: Match loopback connections to 'localhost' accounts (#29995) (#30189)

* ddltest: refactor logutil.InitLogger in ddltest to avoid data race (#30299)

* load data: fix bug if load data with long content (#29222)

* topsql: reduce data race of sql digest (#30296)

* ddl: Do not consider the clustered index when checking the length of the secondary index (#29660)

* *: update client-go to use resolveForRead (#30314)

* ddl/ddl_algorithm_test.go: migrate test-infra to testify (#30268)

* store: Add metrics for pd api call time (#30062)

* *: replace compareDatum by compare and fix wrong optimize order by (#30273)

* metrics/grafana: fix display for 'Start TSO Wait Duration' panel (#30311)

* ddl: migrate test-infra to testify for ddl/options_test.go (#30269)

* planner: fix inconsistent schema between UnionAll and child operator (#30231)

* test: fix incorrect regexp pattern during migrating test (#30080)

* br: add more precise check for lock file (#30218)

* executor: replace should not change other rows when auto ID is out of range (#30301)

* tidb-server: return 1 while failing to set cpu affinity (#30197)

* executor: migrate test-infra to testify for executor/index_lookup_join_test.go (#30260)

* expression: fix wrong result of greatest/least(mixed unsigned/signed int) (#30121)

* types: casting JSON literal `null` to number types should fail (#30278)

* executor: fix data race in oomtest (#30339)

* parser: fix missing charset and collation of handle column (#30320)

* expression: don't append null when param for char() is null (#30236)

* *: add warn log for stale read (#30340)

* parser:  support multi bracket for subquery (#29656)

* session, sessionctx/variable: fix validation recursion bug (#30293)

* planner: Add trace for agg pushdown rule (#30262)

* planner/core: fix a data race when building plan for cached table (#30355)

* executor: avoid sum from avg overflow (#30010)

* config: make EnableSlowLog atomic (#30346)

* *: fix goroutine leak in ddl intergration test (#30369)

* executor: make projection executor unparallel for insert/update/delete (#30290)

* executor, util: reset offsets and nullBitMap for MutRow when setting new values (#30265)

* expression: fix tidb can't alter table from other-type with null value to timestamp with NOT NULL attribute (#29664)

* dumpling: fix default collation with upstream when dump database and table (#30292)

* ddl: fix the enum default value by triming trailing space (#30356)

* expression: migrate test-infra to testify for flag_simplify_test.go (#30407)

* server: refine code logic in handleDownloadFile (#30422)

* refine logic

Signed-off-by: yisaer <[email protected]>

* fix

Signed-off-by: yisaer <[email protected]>

* ddl: migrate test-infra to testify for ddl/table_test.go (#30267)

* ddl: handle the error from `addBatchDDLJobs()` correctly (#30401)

* br: fix the integration tests (#30423)

* util, cmd: remove unused filesort (#30438)

* *: update client-go for small backoff time (#30436)

* server: Fix unstable tests with FakeAuthSwitch (#30287)

* dumpling: fix dump failed when sequence exists (#30164)

* *: replace compareDatum by compare (#30421)

* lightning: fix gcs max key limit (#30393)

* expression, parser: add built-in func is_uuid (#30318)

* expression: migrate test-infra to testify for constant_fold_test.go (#30424)

* executor: fix pipelined window invalid memory address (#30418)

* makefile: add gotestsum for verify ci (#29848)

* server: close sql rows to fix unstable test (#30306)

* Makefile: add coverage record for BR and Dumpling (#30457)

* executor: track the mem usage of IndexMergeReader (#30210)

* infosync: close body when ReadAll encounters error (#30462)

* planner: show accessed partition when explain mpp query over partition table (#30367)

* *: Fix use of user identity in SHOW GRANTS + error messages (#30294)

* ddl: add not null flag for auto_increment column  (#30477)

* expression: make some unstable test serial (#30323)

* expression: migrate test-infra to testify for constant_propagation_test.go (#30430)

* executor: stable test TestSetDDLReorgBatchSize and TestSetDDLReorgWorkerCnt (#30480)

* statistics, util/ranger: add cardinality estimation trace for `GetRowCountBy...` (#30321)

* *: skip mysql client goroutine leak detection in integration ddl (#30467)

* executor,util: write slow query to slow log no matter what log level (#30461)

* executor: enable index_merge used in transaction. (#29875)

* logutil: add testcase for SlowQueryLogger.MaxDays/MaxSize/MaxBackups (#30316)

* expression: fix data race in builtin_other_vec_generated_test.go (#30503)

* expression: fix data race in the collationInfo (#30490)

* planner/core, session: fix error message of wrong variable scope (#30510)

* lightning: support Re/ReregisterMySQL by different tls name (#30463)

* executor: TestBatchGetandPointGetwithHashPartition test typo (#29669) (#29671)

* mockstore: improve log to avoid panic for nil pointer (#30513)

* *: replace compareDatum by compare, PR 10 (#30456)

* planner: Disable dynamic partition prune mode for all non-autocommit (#27532) (#30505)

* expression: change the log level of an confusing log from warn to debug (#30484)

* br: Check crypter.key valid before backup (#29991)

* *: replace compareDatum by compare, PR 11 (#30465)

* dumpling: fix default column collation with upstream when dump table (#30531)

* server: fix prepared cursor select (#30285)

* executor: HashJoinExec checks the buildError even if the probeSide is empty (#30471)

* parser, expression: follow mysql, increase interval precedence (#30528)

* makefile: set timeout 25m for make race (#30555)

* planner: fix the unstable test TestAnalyzeGlobalStatsWithOpts/2 (#30576)

* expression,types: Adjusts UNIX_TIMESTAMP() for non-existing DST values (#28739) (#30405)

* br: add res.Body.close to avoid leak (#30545)

* lightning: add back integration test lightning_error_summary (#30547)

* sessionctx/variable: small refactor (split large file) (#30511)

* ddl: let `admin cancel ddl jobs` run in a new transaction (#30549)

* *: Retry when placement PutBundles failed (#30590)

* dumpling: delete unit test in github actions (#30562)

* *: support trace plan target='estimation' statement (#30491)

* expression: migrate test-infra to testify for integration_test.go (#30548)

* planner: support trace for min/max eliminate (#30441)

* support min/max trace

Signed-off-by: yisaer <[email protected]>

* address the comment

Signed-off-by: yisaer <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>

* br: remove cdclog in br (#30573)

* *: show cmd to check if all needed histograms are loaded (#29672)

* expression: clone repertoire when clone the scalar function (#30602)

* *: use the real StateRemote interface implementation for cached table (#30066)

* *: query failed after add index / timestamp out-of-range (#28424) (#29323)

* planner: implement collecting predicate columns from logical plan (#29878)

* *: show PK name when decoding the clustered index row key (#30623)

* ddl/callback_test.go: migrate test-infra to testify (#30317)

* *: Rename some names of placement ddl operation (#30622)

* executor: fix data race in the index_lookup_hash_join (#30619)

* ddl: remove unnecessary locking when adding an index (#29772)

* server: try to make `TidbTestSuite` more stable (#30643)

* *: Add some PD tests for placement and fix some bug found (#30621)

* *: migrate sync.WaitGroup to util.WaitGroupWrapper (#30644)

* planner: add trace for join eliminate rule (#30343)

* executor: migrate test-infra to testify for executor/shuffle_test.go (#30514)

* planner: make (*AccessPath).OnlyPointRange more succinct (#30520)

* planner: add trace for join reorder (#30394)

* executor: migrate test-infra to testify for executor/union_scan_test.go (#30525)

* expression: make cast return error if cast binary literal to another character set (#30537)

* *: update tikv client (#30670)

* *: update sysutil in go.mod to fix panic when search log (#30523)

* topsql: shouldn't evict the SQL meta, since the evicted SQL can be appear on Other components (TiKV) TopN records (#27050)

* testify: migrate test-infra to testify for analyze_test.go (#30640)

* util: replace compareDatum by compare,  point part (#30575)

* test: make all the tests run in serial (#30692)

* statistics: add mutex for Handle.globalMap and Handle.feedback (#30550)

* executor: fix regular expression in json so that it could match identifer start with '$' (#29750)

* util/testkit/testkit.go: fix typo (#30638)

* planner: Introduce a new global variable to control the historical statistics feature (#30646)

* topsql: introduce datasink interface (#30662)

* planner: unify the argument of stats functions to use SessionCtx instead of StatementContext (#30668)

* metrics: fix the Max SafeTS Gap metrics (#30689)

* lightning: Add source dir existence check for s3 (#30674)

* golangci-lint: support durationcheck (#30027)

* executor: fix data race on IndexHashJoin.cancelFunc (#30701)

* sessionctx/variable: change tidb_store_limit to global only (#30522)

* statistics: remove reassignment of Handle.pool in NewHandle (#30675)

* br: fix some unstable unit test cases. (#30716)

* bindinfo: fix the comment typo (#30616)

* server: support decoding prepared string args to character_set_client (#30723)

* expression: fix enum type join binary get wrong result (#30445)

* cmd/explaintest: fix wrong result comparison for explain test (#30717)

* parallel create tables in br

* metrics: fix copr-cache metrics (#30712)

* test: merge executor's serial tests to other tests (#30711)

* ddl: add batch create table api

Signed-off-by: xhe <[email protected]>

* ddl: add unit tests

Signed-off-by: xhe <[email protected]>

* ddl: fix fmt

Signed-off-by: xhe <[email protected]>

* ddl: typo

Co-authored-by: Arenatlx <[email protected]>

* ddl: fix tests

Signed-off-by: xhe <[email protected]>

* ddl: rename to BatchCreateTableWithInfo

Signed-off-by: xhe <[email protected]>

* ddl: trace the error

Signed-off-by: xhe <[email protected]>

* ddl: comments

Signed-off-by: xhe <[email protected]>

* ddl: cancle the job right

Signed-off-by: xhe <[email protected]>

* ddl: cancel the job right 2

Signed-off-by: xhe <[email protected]>

* ddl: report error if entry too large

Signed-off-by: xhe <[email protected]>

* ddl: report error when table is duplicated

Signed-off-by: xhe <[email protected]>

* ddl: go fmt

Signed-off-by: xhe <[email protected]>

* infoschema: improve batch memory perf

Signed-off-by: xhe <[email protected]>

* ddl: retain ID

Signed-off-by: xhe <[email protected]>

* ddl: reduce log frequency

Signed-off-by: xhe <[email protected]>

* ddl: fix tests

Signed-off-by: xhe <[email protected]>

* fix merge issue

* fix issue that loss table restore

* refactoring code

* refactoring code

Co-authored-by: Weizhen Wang <[email protected]>
Co-authored-by: wangggong <[email protected]>
Co-authored-by: 王超 <[email protected]>
Co-authored-by: José Clovis Ramírez de la Rosa <[email protected]>
Co-authored-by: unconsolable <[email protected]>
Co-authored-by: zhangguangchao <[email protected]>
Co-authored-by: wjHuang <[email protected]>
Co-authored-by: mmyj <[email protected]>
Co-authored-by: Shenghui Wu <[email protected]>
Co-authored-by: tison <[email protected]>
Co-authored-by: Yuanjia Zhang <[email protected]>
Co-authored-by: xiaolunzhou <[email protected]>
Co-authored-by: glorv <[email protected]>
Co-authored-by: sylzd <[email protected]>
Co-authored-by: Yexiang Zhang <[email protected]>
Co-authored-by: tangenta <[email protected]>
Co-authored-by: Yujie Xia <[email protected]>
Co-authored-by: Hangjie Mo <[email protected]>
Co-authored-by: HuaiyuXu <[email protected]>
Co-authored-by: Song Gao <[email protected]>
Co-authored-by: xiongjiwei <[email protected]>
Co-authored-by: WangDeng <[email protected]>
Co-authored-by: Mattias Jonsson <[email protected]>
Co-authored-by: djshow832 <[email protected]>
Co-authored-by: tiancaiamao <[email protected]>
Co-authored-by: Chengpeng Yan <[email protected]>
Co-authored-by: Arenatlx <[email protected]>
Co-authored-by: Meng Xin <[email protected]>
Co-authored-by: Zhou Kunqin <[email protected]>
Co-authored-by: Youra Cho <[email protected]>
Co-authored-by: Yifan Xu <[email protected]>
Co-authored-by: Xiang Zhang <[email protected]>
Co-authored-by: Lei Zhao <[email protected]>
Co-authored-by: Daniël van Eeden <[email protected]>
Co-authored-by: docsir <[email protected]>
Co-authored-by: Morgan Tocker <[email protected]>
Co-authored-by: jakevin <[email protected]>
Co-authored-by: 姬小野 <[email protected]>
Co-authored-by: you06 <[email protected]>
Co-authored-by: lance6716 <[email protected]>
Co-authored-by: TonsnakeLin <[email protected]>
Co-authored-by: imentu <[email protected]>
Co-authored-by: Alkaid <[email protected]>
Co-authored-by: lvtu <[email protected]>
Co-authored-by: 3pointer <[email protected]>
Co-authored-by: Zhuhe Fang <[email protected]>
Co-authored-by: WizardXiao <[email protected]>
Co-authored-by: Jianjun Liao <[email protected]>
Co-authored-by: guo-shaoge <[email protected]>
Co-authored-by: Ryan Leung <[email protected]>
Co-authored-by: xufei <[email protected]>
Co-authored-by: Ehco <[email protected]>
Co-authored-by: Zak Zhao <[email protected]>
Co-authored-by: xhe <[email protected]>
Co-authored-by: Chunzhu Li <[email protected]>
Co-authored-by: Ti Chi Robot <[email protected]>
Co-authored-by: Xiaoju Wu <[email protected]>
Co-authored-by: JmPotato <[email protected]>
Co-authored-by: Zach <[email protected]>
Co-authored-by: bb7133 <[email protected]>
Co-authored-by: crazycs <[email protected]>
Co-authored-by: znhh6018 <[email protected]>
Co-authored-by: eddie lin <[email protected]>
Co-authored-by: dongjunduo <[email protected]>
Co-authored-by: Zhenchi <[email protected]>
Co-authored-by: zhangjinpeng1987 <[email protected]>
Co-authored-by: Jack Yu <[email protected]>
Co-authored-by: Arenatlx <[email protected]>
  • Loading branch information
Show file tree
Hide file tree
Showing 20 changed files with 543 additions and 396 deletions.
2 changes: 1 addition & 1 deletion br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// interface is to bulk create table parallelly
type BulkCreateTableSession interface {
CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, batchDdlSize uint) error
CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error
}

// Glue is an abstraction of TiDB function calls used in BR.
Expand Down
7 changes: 4 additions & 3 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo)
}

// CreateTable implements glue.Session.
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, batchDdlSize uint) error {
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error {
d := domain.GetDomain(gs.se).DDL()
log.Info("tidb start create tables", zap.Uint("batchDdlSize", batchDdlSize))
log.Info("tidb start create tables")
var dbName model.CIStr
cloneTables := make([]*model.TableInfo, 0, len(tables))

Expand All @@ -159,7 +159,8 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo
cloneTables = append(cloneTables, table)
}
gs.se.SetValue(sessionctx.QueryString, queryBuilder.String())
err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, ddl.OnExistIgnore, true)
err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, ddl.OnExistIgnore)

if err != nil {
log.Info("Bulk create table from tidb failure, it possible caused by version mismatch with BR.", zap.String("Error", err.Error()))
return err
Expand Down
226 changes: 158 additions & 68 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -81,6 +82,7 @@ type Client struct {

restoreStores []uint64

cipher *backuppb.CipherInfo
storage storage.ExternalStorage
backend *backuppb.StorageBackend
switchModeInterval time.Duration
Expand Down Expand Up @@ -135,6 +137,10 @@ func (rc *Client) SetRateLimit(rateLimit uint64) {
rc.rateLimit = rateLimit
}

func (rc *Client) SetCrypter(crypter *backuppb.CipherInfo) {
rc.cipher = crypter
}

// SetStorage set ExternalStorage for client.
func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error {
var err error
Expand Down Expand Up @@ -415,7 +421,7 @@ func (rc *Client) createTables(
if rc.IsSkipCreateSQL() {
log.Info("skip create table and alter autoIncID")
} else {
err := db.CreateTables(ctx, tables, rc.GetBatchDdlSize())
err := db.CreateTables(ctx, tables)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -451,11 +457,12 @@ func (rc *Client) createTable(
dom *domain.Domain,
table *metautil.Table,
newTS uint64,
ddlTables map[UniqueTableName]bool,
) (CreatedTable, error) {
if rc.IsSkipCreateSQL() {
log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name))
} else {
err := db.CreateTable(ctx, table)
err := db.CreateTable(ctx, table, ddlTables)
if err != nil {
return CreatedTable{}, errors.Trace(err)
}
Expand Down Expand Up @@ -494,62 +501,71 @@ func (rc *Client) GoCreateTables(
// Could we have a smaller size of tables?
log.Info("start create tables")

ddlTables := rc.DDLJobsMap()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("Client.GoCreateTables", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
outCh := make(chan CreatedTable, len(tables))
rater := logutil.TraceRateOver(logutil.MetricTableCreatedCounter)
err := rc.createTablesInWorkerPool(ctx, dom, tables, dbPool, newTS, outCh)
//cts, err := rc.createTables(ctx, rc.db, dom, tables, newTS)

if err == nil {
defer close(outCh)
// fall back to old create table (sequential create table)
} else if strings.Contains(err.Error(), "[ddl:8204]invalid ddl job") {
log.Info("fall back to the old DDL way to create table.")
createOneTable := func(c context.Context, db *DB, t *metautil.Table) error {
select {
case <-c.Done():
return c.Err()
default:
}
rt, err := rc.createTable(c, db, dom, t, newTS)
if err != nil {
log.Error("create table failed",
zap.Error(err),
zap.Stringer("db", t.DB.Name),
zap.Stringer("table", t.Info.Name))
return errors.Trace(err)
}
log.Debug("table created and send to next",
zap.Int("output chan size", len(outCh)),
zap.Stringer("table", t.Info.Name),
zap.Stringer("database", t.DB.Name))
outCh <- rt
rater.Inc()
rater.L().Info("table created",
zap.Stringer("table", t.Info.Name),
zap.Stringer("database", t.DB.Name))
return nil
}
go func() {
var err error = nil
if rc.batchDllSize > 0 {
err = rc.createTablesInWorkerPool(ctx, dom, tables, dbPool, newTS, outCh)

if err == nil {
log.Info("bulk to create tables success.")
defer close(outCh)
defer log.Debug("all tables are created")
var err error
if len(dbPool) > 0 {
err = rc.createTablesWithDBPool(ctx, createOneTable, tables, dbPool)
} else {
err = rc.createTablesWithSoleDB(ctx, createOneTable, tables)
}
if err != nil {
errCh <- err
}
}()
} else {
errCh <- err
// fall back to old create table (sequential create table)
} else if strings.Contains(err.Error(), "[ddl:8204]invalid ddl job") {
log.Info("fall back to the old DDL way to create table.")
} else {
log.Error("bulk to create tables failure.")
errCh <- err
return outCh
}
}

createOneTable := func(c context.Context, db *DB, t *metautil.Table) error {
select {
case <-c.Done():
return c.Err()
default:

}
rt, err := rc.createTable(c, db, dom, t, newTS, ddlTables)
if err != nil {
log.Error("create table failed",
zap.Error(err),
zap.Stringer("db", t.DB.Name),
zap.Stringer("table", t.Info.Name))
return errors.Trace(err)
}
log.Debug("table created and send to next",
zap.Int("output chan size", len(outCh)),
zap.Stringer("table", t.Info.Name),
zap.Stringer("database", t.DB.Name))
outCh <- rt
rater.Inc()
rater.L().Info("table created",
zap.Stringer("table", t.Info.Name),
zap.Stringer("database", t.DB.Name))
return nil
}
go func() {
defer close(outCh)
defer log.Debug("all tables are created")
var err error
if len(dbPool) > 0 {
err = rc.createTablesWithDBPool(ctx, createOneTable, tables, dbPool)
} else {
err = rc.createTablesWithSoleDB(ctx, createOneTable, tables)
}
if err != nil {
errCh <- err
}
}()
return outCh
}

Expand Down Expand Up @@ -585,7 +601,9 @@ func (rc *Client) createTablesInWorkerPool(ctx context.Context, dom *domain.Doma
workers := utils.NewWorkerPool(uint(len(dbPool)), "Create Tables Worker")
numOfTables := len(tables)
lastSent := 0
for i := int(rc.batchDllSize); i <= numOfTables; i = i + int(rc.batchDllSize) {

for i := int(rc.batchDllSize); i < numOfTables+int(rc.batchDllSize); i = i + int(rc.batchDllSize) {

log.Info("create tables", zap.Int("table start", lastSent), zap.Int("table end", i))
if i > numOfTables {
i = numOfTables
Expand Down Expand Up @@ -727,7 +745,7 @@ func (rc *Client) RestoreFiles(
zap.Duration("take", time.Since(fileStart)))
updateCh.Inc()
}()
return rc.fileImporter.Import(ectx, filesReplica, rewriteRules)
return rc.fileImporter.Import(ectx, filesReplica, rewriteRules, rc.cipher)
})
}

Expand Down Expand Up @@ -768,7 +786,7 @@ func (rc *Client) RestoreRaw(
rc.workerPool.ApplyOnErrorGroup(eg,
func() error {
defer updateCh.Inc()
return rc.fileImporter.Import(ectx, []*backuppb.File{fileReplica}, EmptyRewriteRule())
return rc.fileImporter.Import(ectx, []*backuppb.File{fileReplica}, EmptyRewriteRule(), rc.cipher)
})
}
if err := eg.Wait(); err != nil {
Expand Down Expand Up @@ -844,6 +862,8 @@ func (rc *Client) switchTiKVMode(ctx context.Context, mode import_sstpb.SwitchMo
gctx,
store.GetAddress(),
opt,
grpc.WithBlock(),
grpc.FailOnNonTempDialError(true),
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
// we don't need to set keepalive timeout here, because the connection lives
// at most 5s. (shorter than minimal value for keepalive time!)
Expand Down Expand Up @@ -880,17 +900,25 @@ func (rc *Client) GoValidateChecksum(
) <-chan struct{} {
log.Info("Start to validate checksum")
outCh := make(chan struct{}, 1)
wg := new(sync.WaitGroup)
wg.Add(2)
loadStatCh := make(chan *CreatedTable, 1024)
// run the stat loader
go func() {
defer wg.Done()
rc.updateMetaAndLoadStats(ctx, loadStatCh)
}()
workers := utils.NewWorkerPool(defaultChecksumConcurrency, "RestoreChecksum")
go func() {
wg, ectx := errgroup.WithContext(ctx)
eg, ectx := errgroup.WithContext(ctx)
defer func() {
log.Info("all checksum ended")
if err := wg.Wait(); err != nil {
if err := eg.Wait(); err != nil {
errCh <- err
}
outCh <- struct{}{}
close(outCh)
close(loadStatCh)
wg.Done()
}()

for {
select {
// if we use ectx here, maybe canceled will mask real error.
Expand All @@ -900,14 +928,14 @@ func (rc *Client) GoValidateChecksum(
if !ok {
return
}
workers.ApplyOnErrorGroup(wg, func() error {

workers.ApplyOnErrorGroup(eg, func() error {
start := time.Now()
defer func() {
elapsed := time.Since(start)
summary.CollectDuration("restore checksum", elapsed)
summary.CollectSuccessUnit("table checksum", 1, elapsed)
}()
err := rc.execChecksum(ectx, tbl, kvClient, concurrency)
err := rc.execChecksum(ectx, tbl, kvClient, concurrency, loadStatCh)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -917,10 +945,21 @@ func (rc *Client) GoValidateChecksum(
}
}
}()
go func() {
wg.Wait()
log.Info("all checksum ended")
close(outCh)
}()
return outCh
}

func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient kv.Client, concurrency uint) error {
func (rc *Client) execChecksum(
ctx context.Context,
tbl CreatedTable,
kvClient kv.Client,
concurrency uint,
loadStatCh chan<- *CreatedTable,
) error {
logger := log.With(
zap.String("db", tbl.OldTable.DB.Name.O),
zap.String("table", tbl.OldTable.Info.Name.O),
Expand Down Expand Up @@ -969,16 +1008,49 @@ func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient k
)
return errors.Annotate(berrors.ErrRestoreChecksumMismatch, "failed to validate checksum")
}
if table.Stats != nil {
logger.Info("start loads analyze after validate checksum",
zap.Int64("old id", tbl.OldTable.Info.ID),
zap.Int64("new id", tbl.Table.ID),
)
if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil {
logger.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err))

loadStatCh <- &tbl
return nil
}

func (rc *Client) updateMetaAndLoadStats(ctx context.Context, input <-chan *CreatedTable) {
for {
select {
case <-ctx.Done():
return
case tbl, ok := <-input:
if !ok {
return
}

// Not need to return err when failed because of update analysis-meta
restoreTS, err := rc.GetTS(ctx)
if err != nil {
log.Error("getTS failed", zap.Error(err))
} else {
err = rc.db.UpdateStatsMeta(ctx, tbl.Table.ID, restoreTS, tbl.OldTable.TotalKvs)
if err != nil {
log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(err))
}
}

table := tbl.OldTable
if table.Stats != nil {
log.Info("start loads analyze after validate checksum",
zap.Int64("old id", tbl.OldTable.Info.ID),
zap.Int64("new id", tbl.Table.ID),
)
start := time.Now()
if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil {
log.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err))
}
log.Info("restore stat done",
zap.String("table", table.Info.Name.L),
zap.String("db", table.DB.Name.L),
zap.Duration("cost", time.Since(start)))
}
}
}
return nil
}

const (
Expand Down Expand Up @@ -1155,6 +1227,24 @@ func (rc *Client) IsSkipCreateSQL() bool {
return rc.noSchema
}

// DDLJobsMap returns a map[UniqueTableName]bool about < db table, hasCreate/hasTruncate DDL >.
// if we execute some DDLs before create table.
// we may get two situation that need to rebase auto increment/random id.
// 1. truncate table: truncate will generate new id cache.
// 2. create table/create and rename table: the first create table will lock down the id cache.
// because we cannot create onExistReplace table.
// so the final create DDL with the correct auto increment/random id won't be executed.
func (rc *Client) DDLJobsMap() map[UniqueTableName]bool {
m := make(map[UniqueTableName]bool)
for _, job := range rc.ddlJobs {
switch job.Type {
case model.ActionTruncateTable, model.ActionCreateTable, model.ActionRenameTable:
m[UniqueTableName{job.SchemaName, job.BinlogInfo.TableInfo.Name.String()}] = true
}
}
return m
}

// PreCheckTableTiFlashReplica checks whether TiFlash replica is less than TiFlash node.
func (rc *Client) PreCheckTableTiFlashReplica(
ctx context.Context,
Expand Down
Loading

0 comments on commit 6fa8234

Please sign in to comment.