Skip to content

Commit

Permalink
Merge branch 'master' into change-alter-no-ttl-grammar
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei committed Nov 23, 2022
2 parents 885cfa5 + cf49466 commit 9cc5dc0
Show file tree
Hide file tree
Showing 32 changed files with 3,666 additions and 283 deletions.
13 changes: 11 additions & 2 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,16 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
}
})

failpoint.Inject("PrintStatus", func() {
defer func() {
finished, total := l.Status()
o.logger.Warn("PrintStatus Failpoint",
zap.Int64("finished", finished),
zap.Int64("total", total),
zap.Bool("equal", finished == total))
}()
})

if err := taskCfg.TiDB.Security.RegisterMySQL(); err != nil {
return common.ErrInvalidTLSConfig.Wrap(err)
}
Expand Down Expand Up @@ -504,8 +514,6 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
dbMetas := mdl.GetDatabases()
web.BroadcastInitProgress(dbMetas)

var procedure *restore.Controller

param := &restore.ControllerParam{
DBMetas: dbMetas,
Status: &l.status,
Expand All @@ -516,6 +524,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
CheckpointName: o.checkpointName,
}

var procedure *restore.Controller
procedure, err = restore.NewRestoreController(ctx, taskCfg, param)
if err != nil {
o.logger.Error("restore failed", log.ShortError(err))
Expand Down
23 changes: 21 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,15 @@ type Controller struct {
precheckItemBuilder *PrecheckItemBuilder
}

// LightningStatus provides the finished bytes and total bytes of the current task.
// It should keep the value after restart from checkpoint.
// When it is tidb backend, FinishedFileSize can be counted after chunk data is
// restored to tidb. When it is local backend it's counted after whole engine is
// imported.
// TotalFileSize may be an estimated value, so when the task is finished, it may
// not equal to FinishedFileSize.
type LightningStatus struct {
backend string
FinishedFileSize atomic.Int64
TotalFileSize atomic.Int64
}
Expand Down Expand Up @@ -353,6 +361,7 @@ func NewRestoreControllerWithPauser(
default:
return nil, common.ErrUnknownBackend.GenWithStackByArgs(cfg.TikvImporter.Backend)
}
p.Status.backend = cfg.TikvImporter.Backend

var metaBuilder metaMgrBuilder
isSSTImport := cfg.TikvImporter.Backend == config.BackendLocal
Expand Down Expand Up @@ -2427,8 +2436,13 @@ func (cr *chunkRestore) deliverLoop(
// comes from chunk.Chunk.Offset. so it shouldn't happen that currOffset - startOffset < 0.
// but we met it one time, but cannot reproduce it now, we add this check to make code more robust
// TODO: reproduce and find the root cause and fix it completely
if currOffset >= startOffset {
m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(currOffset - startOffset))

delta := currOffset - startOffset
if delta >= 0 {
m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(delta))
if rc.status != nil && rc.status.backend == config.BackendTiDB {
rc.status.FinishedFileSize.Add(delta)
}
} else {
deliverLogger.Warn("offset go back", zap.Int64("curr", currOffset),
zap.Int64("start", startOffset))
Expand All @@ -2441,6 +2455,11 @@ func (cr *chunkRestore) deliverLoop(
}
failpoint.Inject("SlowDownWriteRows", func() {
deliverLogger.Warn("Slowed down write rows")
finished := rc.status.FinishedFileSize.Load()
total := rc.status.TotalFileSize.Load()
deliverLogger.Warn("PrintStatus Failpoint",
zap.Int64("finished", finished),
zap.Int64("total", total))
})
failpoint.Inject("FailAfterWriteRows", nil)
// TODO: for local backend, we may save checkpoint more frequently, e.g. after written
Expand Down
10 changes: 9 additions & 1 deletion br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp
dataWorker := rc.closedEngineLimit.Apply()
defer rc.closedEngineLimit.Recycle(dataWorker)
err = tr.importEngine(ctx, dataClosedEngine, rc, eid, ecp)
if rc.status != nil {
if rc.status != nil && rc.status.backend == config.BackendLocal {
for _, chunk := range ecp.Chunks {
rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset)
}
Expand Down Expand Up @@ -406,6 +406,11 @@ func (tr *TableRestore) restoreEngine(
if err != nil {
return closedEngine, errors.Trace(err)
}
if rc.status != nil && rc.status.backend == config.BackendTiDB {
for _, chunk := range cp.Chunks {
rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset)
}
}
return closedEngine, nil
}

Expand Down Expand Up @@ -475,6 +480,9 @@ func (tr *TableRestore) restoreEngine(

// Restore table data
for chunkIndex, chunk := range cp.Chunks {
if rc.status != nil && rc.status.backend == config.BackendTiDB {
rc.status.FinishedFileSize.Add(chunk.Chunk.Offset - chunk.Key.Offset)
}
if chunk.Chunk.Offset >= chunk.Chunk.EndOffset {
continue
}
Expand Down
9 changes: 8 additions & 1 deletion br/tests/lightning_checkpoint_columns/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ echo "INSERT INTO tbl (j, i) VALUES (3, 1),(4, 2);" > "$DBPATH/cp_tsr.tbl.sql"
# Set the failpoint to kill the lightning instance as soon as one row is written
PKG="github.com/pingcap/tidb/br/pkg/lightning/restore"
export GO_FAILPOINTS="$PKG/SlowDownWriteRows=sleep(1000);$PKG/FailAfterWriteRows=panic;$PKG/SetMinDeliverBytes=return(1)"
# Check after 1 row is written in tidb backend, the finished progress is updated
export GO_FAILPOINTS="${GO_FAILPOINTS};github.com/pingcap/tidb/br/pkg/lightning/PrintStatus=return()"

# Start importing the tables.
run_sql 'DROP DATABASE IF EXISTS cp_tsr'
Expand All @@ -40,11 +42,16 @@ set -e
run_sql 'SELECT count(*) FROM `cp_tsr`.tbl'
check_contains "count(*): 1"

# After FailAfterWriteRows, the finished bytes is 36 as the first row size
grep "PrintStatus Failpoint" "$TEST_DIR/lightning.log" | grep -q "finished=36"

# restart lightning from checkpoint, the second line should be written successfully
export GO_FAILPOINTS=
# also check after restart from checkpoint, final finished equals to total
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/PrintStatus=return()"
set +e
run_lightning -d "$DBPATH" --backend tidb --enable-checkpoint=1 2> /dev/null
set -e

run_sql 'SELECT j FROM `cp_tsr`.tbl WHERE i = 2;'
check_contains "j: 4"
grep "PrintStatus Failpoint" "$TEST_DIR/lightning.log" | grep -q "equal=true"
2 changes: 2 additions & 0 deletions ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ func TestCancel(t *testing.T) {

// Prepare schema.
tk.MustExec("use test")
// TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("drop table if exists t_partition;")
tk.MustExec(`create table t_partition (
c1 int, c2 int, c3 int
Expand Down
1 change: 1 addition & 0 deletions ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,7 @@ func TestWriteReorgForColumnTypeChangeOnAmendTxn(t *testing.T) {

tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_metadata_lock=0")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0")
tk.MustExec("set global tidb_enable_amend_pessimistic_txn = ON")
defer tk.MustExec("set global tidb_enable_amend_pessimistic_txn = OFF")

Expand Down
2 changes: 2 additions & 0 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1719,6 +1719,8 @@ func TestCreateUniqueExpressionIndex(t *testing.T) {

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table t(a int default 0, b int default 0)")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4)")

Expand Down
2 changes: 2 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,7 @@ func TestCommitTxnWithIndexChange(t *testing.T) {
// Prepare work.
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_metadata_lock=0")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;")
tk.MustExec("use test")
tk.MustExec("create table t1 (c1 int primary key, c2 int, c3 int, index ok2(c2))")
Expand Down Expand Up @@ -1385,6 +1386,7 @@ func TestAmendTxnSavepointWithDDL(t *testing.T) {
tk.MustExec("use test;")
tk.MustExec("set global tidb_enable_metadata_lock=0")
tk2.MustExec("use test;")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;")

prepareFn := func() {
Expand Down
5 changes: 5 additions & 0 deletions ddl/failtest/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ func TestRunDDLJobPanicEnableClusteredIndex(t *testing.T) {
s := createFailDBSuite(t)
testAddIndexWorkerNum(t, s, func(tk *testkit.TestKit) {
tk.Session().GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOn
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1, c3))")
})
}
Expand All @@ -328,6 +330,8 @@ func TestRunDDLJobPanicEnableClusteredIndex(t *testing.T) {
func TestRunDDLJobPanicDisableClusteredIndex(t *testing.T) {
s := createFailDBSuite(t)
testAddIndexWorkerNum(t, s, func(tk *testkit.TestKit) {
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))")
})
}
Expand Down Expand Up @@ -420,6 +424,7 @@ func TestPartitionAddIndexGC(t *testing.T) {
s := createFailDBSuite(t)
tk := testkit.NewTestKit(t, s.store)
tk.MustExec("use test")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec(`create table partition_add_idx (
id int not null,
hired date not null
Expand Down
2 changes: 2 additions & 0 deletions ddl/index_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func TestIndexChange(t *testing.T) {
ddl.SetWaitTimeWhenErrorOccurred(1 * time.Microsecond)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table t (c1 int primary key, c2 int)")
tk.MustExec("insert t values (1, 1), (2, 2), (3, 3);")

Expand Down
1 change: 0 additions & 1 deletion ddl/index_merge_tmp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ func TestPessimisticAmendIncompatibleWithFastReorg(t *testing.T) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 1;")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 1;")

tk.MustGetErrMsg("set @@tidb_enable_amend_pessimistic_txn = 1;",
"amend pessimistic transactions is not compatible with tidb_ddl_enable_fast_reorg")
Expand Down
3 changes: 3 additions & 0 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,8 @@ func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
originHook := dom.DDL().GetHook()

// Test cancel successfully.
Expand Down Expand Up @@ -1014,6 +1016,7 @@ func TestMultiSchemaChangeMixCancelled(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")

tk.MustExec("create table t (a int, b int, c int, index i1(c), index i2(c));")
tk.MustExec("insert into t values (1, 2, 3);")
Expand Down
2 changes: 2 additions & 0 deletions ddl/serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,8 @@ func TestCancelAddIndexPanic(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/errorMockPanic"))
}()
tk.MustExec("use test")
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(c1 int, c2 int)")

Expand Down
9 changes: 6 additions & 3 deletions ddl/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ func TestDDLStatsInfo(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease)
d := domain.DDL()

tk := testkit.NewTestKit(t, store)
ctx := tk.Session()
dbInfo, err := testSchemaInfo(store, "test_stat")
require.NoError(t, err)
testCreateSchema(t, testkit.NewTestKit(t, store).Session(), d, dbInfo)
testCreateSchema(t, ctx, d, dbInfo)
tblInfo, err := testTableInfo(store, "t", 2)
require.NoError(t, err)
testCreateTable(t, testkit.NewTestKit(t, store).Session(), d, dbInfo, tblInfo)
ctx := testkit.NewTestKit(t, store).Session()
testCreateTable(t, ctx, d, dbInfo, tblInfo)
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
err = sessiontxn.NewTxn(context.Background(), ctx)
require.NoError(t, err)

Expand Down
1 change: 1 addition & 0 deletions ddl/table_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func TestLockTableReadOnly(t *testing.T) {
tk1.MustExec("admin cleanup table lock t1")
tk2.MustExec("insert into t1 set a=1, b=2")

tk1.MustExec("set global tidb_ddl_enable_fast_reorg = 0")
tk1.MustExec("set tidb_enable_amend_pessimistic_txn = 1")
tk1.MustExec("begin pessimistic")
tk1.MustQuery("select * from t1 where a = 1").Check(testkit.Rows("1 2"))
Expand Down
3 changes: 2 additions & 1 deletion executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -187,7 +188,7 @@ func (e *AnalyzeColumnsExecV2) decodeSampleDataWithVirtualColumn(
}
}
}
err := FillVirtualColumnValue(fieldTps, virtualColIdx, schema, e.colsInfo, e.ctx, chk)
err := table.FillVirtualColumnValue(fieldTps, virtualColIdx, schema.Columns, e.colsInfo, e.ctx, chk)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
driver "github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -190,7 +191,7 @@ func (e *BatchPointGetExec) Next(ctx context.Context, req *chunk.Chunk) error {
e.index++
}

err := FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.schema, e.columns, e.ctx, req)
err := table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.schema.Columns, e.columns, e.ctx, req)
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,12 +920,12 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)
joinType: testCase.joinType, // 0 for InnerJoin, 1 for LeftOutersJoin, 2 for RightOuterJoin
isOuterJoin: false,
useOuterToBuild: testCase.useOuterToBuild,
concurrency: uint(testCase.concurrency),
},
probeSideTupleFetcher: &probeSideTupleFetcher{
probeSideExec: outerExec,
},
probeWorkers: make([]probeWorker, testCase.concurrency),
concurrency: uint(testCase.concurrency),
buildKeys: joinKeys,
probeKeys: probeKeys,
buildSideExec: innerExec,
Expand All @@ -936,6 +936,9 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)
defaultValues := make([]types.Datum, e.buildSideExec.Schema().Len())
lhsTypes, rhsTypes := retTypes(innerExec), retTypes(outerExec)
for i := uint(0); i < e.concurrency; i++ {
e.probeWorkers[i].workerID = i
e.probeWorkers[i].sessCtx = e.ctx
e.probeWorkers[i].hashJoinCtx = e.hashJoinCtx
e.probeWorkers[i].joiner = newJoiner(testCase.ctx, e.joinType, true, defaultValues,
nil, lhsTypes, rhsTypes, childrenUsedSchema, false)
}
Expand Down
6 changes: 5 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1429,8 +1429,8 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
isOuterJoin: v.JoinType.IsOuterJoin(),
useOuterToBuild: v.UseOuterToBuild,
joinType: v.JoinType,
concurrency: v.Concurrency,
},
concurrency: v.Concurrency,
}
defaultValues := v.DefaultValues
lhsTypes, rhsTypes := retTypes(leftExec), retTypes(rightExec)
Expand Down Expand Up @@ -1484,9 +1484,13 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema())
e.probeWorkers = make([]probeWorker, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
e.probeWorkers[i].hashJoinCtx = e.hashJoinCtx
e.probeWorkers[i].workerID = i
e.probeWorkers[i].sessCtx = e.ctx
e.probeWorkers[i].joiner = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues,
v.OtherConditions, lhsTypes, rhsTypes, childrenUsedSchema, isNAJoin)
}
e.hashJoinCtx.isNullAware = isNAJoin
executorCountHashJoinExec.Inc()

// We should use JoinKey to construct the type information using by hashing, instead of using the child's schema directly.
Expand Down
Loading

0 comments on commit 9cc5dc0

Please sign in to comment.