Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/pingcap/tidb into topsql-…
Browse files Browse the repository at this point in the history
…catch-running1
  • Loading branch information
crazycs520 committed Apr 26, 2022
2 parents 0dd6dab + 59566fa commit 3da532e
Show file tree
Hide file tree
Showing 63 changed files with 790 additions and 365 deletions.
32 changes: 27 additions & 5 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,15 +1051,18 @@ func (m noopTableMetaMgr) FinishTable(ctx context.Context) error {
return nil
}

type singleMgrBuilder struct{}
type singleMgrBuilder struct {
taskID int64
}

func (b singleMgrBuilder) Init(context.Context) error {
return nil
}

func (b singleMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr {
return &singleTaskMetaMgr{
pd: pd,
pd: pd,
taskID: b.taskID,
}
}

Expand All @@ -1068,15 +1071,34 @@ func (b singleMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr {
}

type singleTaskMetaMgr struct {
pd *pdutil.PdController
pd *pdutil.PdController
taskID int64
initialized bool
sourceBytes uint64
clusterAvail uint64
}

func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error {
m.sourceBytes = uint64(source)
m.initialized = true
return nil
}

func (m *singleTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error {
_, err := action(nil)
newTasks, err := action([]taskMeta{
{
taskID: m.taskID,
status: taskMetaStatusInitial,
sourceBytes: m.sourceBytes,
clusterAvail: m.clusterAvail,
},
})
for _, t := range newTasks {
if m.taskID == t.taskID {
m.sourceBytes = t.sourceBytes
m.clusterAvail = t.clusterAvail
}
}
return err
}

Expand All @@ -1085,7 +1107,7 @@ func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdut
}

func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return true, nil
return m.initialized, nil
}

func (m *singleTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) {
Expand Down
26 changes: 26 additions & 0 deletions br/pkg/lightning/restore/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"database/sql/driver"
"sort"
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
Expand Down Expand Up @@ -358,3 +359,28 @@ func (t *testChecksumMgr) Checksum(ctx context.Context, tableInfo *checkpoints.T
t.callCnt++
return &t.checksum, nil
}

func TestSingleTaskMetaMgr(t *testing.T) {
metaBuilder := singleMgrBuilder{
taskID: time.Now().UnixNano(),
}
metaMgr := metaBuilder.TaskMetaMgr(nil)

ok, err := metaMgr.CheckTaskExist(context.Background())
require.NoError(t, err)
require.False(t, ok)

err = metaMgr.InitTask(context.Background(), 1<<30)
require.NoError(t, err)

ok, err = metaMgr.CheckTaskExist(context.Background())
require.NoError(t, err)
require.True(t, ok)

err = metaMgr.CheckTasksExclusively(context.Background(), func(tasks []taskMeta) ([]taskMeta, error) {
require.Len(t, tasks, 1)
require.Equal(t, uint64(1<<30), tasks[0].sourceBytes)
return nil, nil
})
require.NoError(t, err)
}
18 changes: 16 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,9 @@ func NewRestoreControllerWithPauser(
needChecksum: cfg.PostRestore.Checksum != config.OpLevelOff,
}
case isSSTImport:
metaBuilder = singleMgrBuilder{}
metaBuilder = singleMgrBuilder{
taskID: cfg.TaskID,
}
default:
metaBuilder = noopMetaMgrBuilder{}
}
Expand Down Expand Up @@ -1928,7 +1930,19 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
if err = rc.taskMgr.InitTask(ctx, source); err != nil {
return common.ErrMetaMgrUnknown.Wrap(err).GenWithStackByArgs()
}
if rc.cfg.App.CheckRequirements {
}
if rc.cfg.App.CheckRequirements {
needCheck := true
if rc.cfg.Checkpoint.Enable {
taskCheckpoints, err := rc.checkpointsDB.TaskCheckpoint(ctx)
if err != nil {
return common.ErrReadCheckpoint.Wrap(err).GenWithStack("get task checkpoint failed")
}
// If task checkpoint is initialized, it means check has been performed before.
// We don't need and shouldn't check again, because lightning may have already imported some data.
needCheck = taskCheckpoints == nil
}
if needCheck {
err = rc.localResource(source)
if err != nil {
return common.ErrCheckLocalResource.Wrap(err).GenWithStackByArgs()
Expand Down
1 change: 0 additions & 1 deletion cmd/explaintest/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

lease = "0"
mem-quota-query = 34359738368
host = "127.0.0.1"
new_collations_enabled_on_first_bootstrap = true

Expand Down
1 change: 0 additions & 1 deletion cmd/explaintest/disable_new_collation.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

lease = "0"
mem-quota-query = 34359738368
host = "127.0.0.1"
new_collations_enabled_on_first_bootstrap = false

Expand Down
2 changes: 2 additions & 0 deletions cmd/explaintest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,8 @@ func main() {
"set @@tidb_projection_concurrency=4",
"set @@tidb_distsql_scan_concurrency=15",
"set @@global.tidb_enable_clustered_index=0;",
"set @@global.tidb_mem_quota_query=34359738368",
"set @@tidb_mem_quota_query=34359738368",
}
for _, sql := range resets {
if _, err = mdb.Exec(sql); err != nil {
Expand Down
10 changes: 1 addition & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ type Config struct {
OOMUseTmpStorage bool `toml:"oom-use-tmp-storage" json:"oom-use-tmp-storage"`
TempStoragePath string `toml:"tmp-storage-path" json:"tmp-storage-path"`
OOMAction string `toml:"oom-action" json:"oom-action"`
MemQuotaQuery int64 `toml:"mem-quota-query" json:"mem-quota-query"`
// TempStorageQuota describe the temporary storage Quota during query exector when OOMUseTmpStorage is enabled
// If the quota exceed the capacity of the TempStoragePath, the tidb-server would exit with fatal error
TempStorageQuota int64 `toml:"tmp-storage-quota" json:"tmp-storage-quota"` // Bytes
Expand Down Expand Up @@ -630,7 +629,6 @@ var defaultConf = Config{
TempStorageQuota: -1,
TempStoragePath: tempStorageDirName,
OOMAction: OOMActionCancel,
MemQuotaQuery: 1 << 30,
EnableBatchDML: false,
CheckMb4ValueInUTF8: *NewAtomicBool(true),
MaxIndexLength: 3072,
Expand Down Expand Up @@ -805,6 +803,7 @@ var deprecatedConfig = map[string]struct{}{
"stmt-summary.max-sql-length": {},
"stmt-summary.refresh-interval": {},
"stmt-summary.history-size": {},
"mem-quota-query": {},
}

func isAllDeprecatedConfigItems(items []string) bool {
Expand All @@ -816,10 +815,6 @@ func isAllDeprecatedConfigItems(items []string) bool {
return true
}

// IsMemoryQuotaQuerySetByUser indicates whether the config item mem-quota-query
// is set by the user.
var IsMemoryQuotaQuerySetByUser bool

// IsOOMActionSetByUser indicates whether the config item mem-action is set by
// the user.
var IsOOMActionSetByUser bool
Expand Down Expand Up @@ -879,9 +874,6 @@ func (c *Config) Load(confFile string) error {
if c.TokenLimit == 0 {
c.TokenLimit = 1000
}
if metaData.IsDefined("mem-quota-query") {
IsMemoryQuotaQuerySetByUser = true
}
if metaData.IsDefined("oom-action") {
IsOOMActionSetByUser = true
}
Expand Down
9 changes: 3 additions & 6 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,10 @@ split-table = true
# The limit of concurrent executed sessions.
token-limit = 1000

# The maximum memory available for a single SQL statement. Default: 1GB
mem-quota-query = 1073741824

# Controls whether to enable the temporary storage for some operators when a single SQL statement exceeds the memory quota specified by mem-quota-query.
# Controls whether to enable the temporary storage for some operators when a single SQL statement exceeds the memory quota specified by the memory quota.
oom-use-tmp-storage = true

# Specifies the temporary storage path for some operators when a single SQL statement exceeds the memory quota specified by mem-quota-query.
# Specifies the temporary storage path for some operators when a single SQL statement exceeds the memory quota specified by the memory quota.
# It defaults to a generated directory in `<TMPDIR>/<os/user.Current().Uid>_tidb/` if it is unset.
# It only takes effect when `oom-use-tmp-storage` is `true`.
# tmp-storage-path = "/tmp/<os/user.Current().Uid>_tidb/MC4wLjAuMDo0MDAwLzAuMC4wLjA6MTAwODA=/tmp-storage"
Expand All @@ -47,7 +44,7 @@ oom-use-tmp-storage = true
# The default value of tmp-storage-quota is under 0 which means tidb-server wouldn't check the capacity.
tmp-storage-quota = -1

# Specifies what operation TiDB performs when a single SQL statement exceeds the memory quota specified by mem-quota-query and cannot be spilled over to disk.
# Specifies what operation TiDB performs when a single SQL statement exceeds the memory quota specified by the memory quota and cannot be spilled over to disk.
# Valid options: ["log", "cancel"]
oom-action = "cancel"

Expand Down
2 changes: 0 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ enable-batch-dml = true
server-version = "test_version"
repair-mode = true
max-server-connections = 200
mem-quota-query = 10000
max-index-length = 3080
index-limit = 70
table-column-count-limit = 4000
Expand Down Expand Up @@ -285,7 +284,6 @@ grpc-max-send-msg-size = 40960
require.True(t, conf.RepairMode)
require.Equal(t, uint64(16), conf.TiKVClient.ResolveLockLiteThreshold)
require.Equal(t, uint32(200), conf.MaxServerConnections)
require.Equal(t, int64(10000), conf.MemQuotaQuery)
require.Equal(t, []string{"tiflash"}, conf.IsolationRead.Engines)
require.Equal(t, 3080, conf.MaxIndexLength)
require.Equal(t, 70, conf.IndexLimit)
Expand Down
1 change: 0 additions & 1 deletion config/config_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ var (
"Performance.StmtCountLimit": {},
"Performance.TCPKeepAlive": {},
"OOMAction": {},
"MemQuotaQuery": {},
"TiKVClient.StoreLimit": {},
"Log.Level": {},
"Log.SlowThreshold": {},
Expand Down
4 changes: 1 addition & 3 deletions config/config_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestMergeConfigItems(t *testing.T) {
newConf.Performance.QueryFeedbackLimit = 123
newConf.Performance.PseudoEstimateRatio = 123
newConf.OOMAction = "panic"
newConf.MemQuotaQuery = 123
newConf.TiKVClient.StoreLimit = 123

// rejected
Expand All @@ -64,7 +63,7 @@ func TestMergeConfigItems(t *testing.T) {
newConf.Log.SlowThreshold = 2345

as, rs := MergeConfigItems(oldConf, newConf)
require.Equal(t, 10, len(as))
require.Equal(t, 9, len(as))
require.Equal(t, 3, len(rs))
for _, a := range as {
_, ok := dynamicConfigItems[a]
Expand All @@ -82,7 +81,6 @@ func TestMergeConfigItems(t *testing.T) {
require.Equal(t, newConf.Performance.QueryFeedbackLimit, oldConf.Performance.QueryFeedbackLimit)
require.Equal(t, newConf.Performance.PseudoEstimateRatio, oldConf.Performance.PseudoEstimateRatio)
require.Equal(t, newConf.OOMAction, oldConf.OOMAction)
require.Equal(t, newConf.MemQuotaQuery, oldConf.MemQuotaQuery)
require.Equal(t, newConf.TiKVClient.StoreLimit, oldConf.TiKVClient.StoreLimit)
require.Equal(t, newConf.Log.SlowThreshold, oldConf.Log.SlowThreshold)

Expand Down
15 changes: 8 additions & 7 deletions ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,16 +1049,17 @@ func TestWriteReorgForColumnTypeChangeOnAmendTxn(t *testing.T) {
}()
hook := &ddl.TestDDLCallback{Do: dom}
times := 0
hook.OnJobUpdatedExported = func(job *model.Job) {
if job.Type != model.ActionModifyColumn || checkErr != nil ||
(job.SchemaState != startColState && job.SchemaState != commitColState) {
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type != model.ActionModifyColumn || checkErr != nil || job.SchemaState != startColState {
return
}

if job.SchemaState == startColState {
tk1.MustExec("use test")
tk1.MustExec("begin pessimistic;")
tk1.MustExec("insert into t1 values(101, 102, 103)")
tk1.MustExec("use test")
tk1.MustExec("begin pessimistic;")
tk1.MustExec("insert into t1 values(101, 102, 103)")
}
hook.OnJobUpdatedExported = func(job *model.Job) {
if job.Type != model.ActionModifyColumn || checkErr != nil || job.SchemaState != commitColState {
return
}
if times == 0 {
Expand Down
66 changes: 16 additions & 50 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -678,66 +679,31 @@ func TestTableDDLWithTimeType(t *testing.T) {
}

func TestUpdateMultipleTable(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("create database umt_db")
tk.MustExec("use umt_db")
tk.MustExec("use test")
tk.MustExec("create table t1 (c1 int, c2 int)")
tk.MustExec("insert t1 values (1, 1), (2, 2)")
tk.MustExec("create table t2 (c1 int, c2 int)")
tk.MustExec("insert t2 values (1, 3), (2, 5)")
ctx := tk.Session()
dom := domain.GetDomain(ctx)
is := dom.InfoSchema()
db, ok := is.SchemaByName(model.NewCIStr("umt_db"))
require.True(t, ok)
t1Tbl, err := is.TableByName(model.NewCIStr("umt_db"), model.NewCIStr("t1"))
require.NoError(t, err)
t1Info := t1Tbl.Meta()

// Add a new column in write only state.
newColumn := &model.ColumnInfo{
ID: 100,
Name: model.NewCIStr("c3"),
Offset: 2,
DefaultValue: 9,
OriginDefaultValue: 9,
FieldType: *types.NewFieldType(mysql.TypeLonglong),
State: model.StateWriteOnly,
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

d := dom.DDL()
hook := &ddl.TestDDLCallback{Do: dom}
hook.OnJobUpdatedExported = func(job *model.Job) {
if job.SchemaState == model.StateWriteOnly {
tk2.MustExec("update t1, t2 set t1.c1 = 8, t2.c2 = 10 where t1.c2 = t2.c1")
tk2.MustQuery("select * from t1").Check(testkit.Rows("8 1", "8 2"))
tk2.MustQuery("select * from t2").Check(testkit.Rows("1 10", "2 10"))
}
}
t1Info.Columns = append(t1Info.Columns, newColumn)

err = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
_, err = m.GenSchemaVersion()
require.NoError(t, err)
require.Nil(t, m.UpdateTable(db.ID, t1Info))
return nil
})
require.NoError(t, err)
err = dom.Reload()
require.NoError(t, err)

tk.MustExec("update t1, t2 set t1.c1 = 8, t2.c2 = 10 where t1.c2 = t2.c1")
tk.MustQuery("select * from t1").Check(testkit.Rows("8 1", "8 2"))
tk.MustQuery("select * from t2").Check(testkit.Rows("1 10", "2 10"))
d.SetHook(hook)

newColumn.State = model.StatePublic

err = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
_, err = m.GenSchemaVersion()
require.NoError(t, err)
require.Nil(t, m.UpdateTable(db.ID, t1Info))
return nil
})
require.NoError(t, err)
err = dom.Reload()
require.NoError(t, err)
tk.MustExec("alter table t1 add column c3 bigint default 9")

tk.MustQuery("select * from t1").Check(testkit.Rows("8 1 9", "8 2 9"))
tk.MustExec("drop database umt_db")
}

func TestNullGeneratedColumn(t *testing.T) {
Expand Down
Loading

0 comments on commit 3da532e

Please sign in to comment.