Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into wenxuan/vector-merg…
Browse files Browse the repository at this point in the history
…e-upstream

* origin/master:
  parser:  call `SetText` correctly for `CreateViewStmt` (#55520)
  lighting: expose limited fields for `kv.Session` (#55517)
  show: prefilter table name with like pattern and show full tables (#55396)
  Planner: Do not allow cardinality to go below 1 (#55242)
  *: replace maps.Copy which is for map clone with std maps.Clone (#55530)
  infoschema: add WithRefillOption for TableByName and TableByID (#55511)
  domain: change groupSize in splitForConcurrentFetch (#55518)
  ddl: fix resuming to wrong checkpoint when failed during adding index (#55506)
  • Loading branch information
breezewish committed Aug 21, 2024
2 parents ba4064d + 3464dae commit 80d9f5f
Show file tree
Hide file tree
Showing 82 changed files with 858 additions and 607 deletions.
1 change: 0 additions & 1 deletion lightning/pkg/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ go_library(
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_x_exp//maps",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_multierr//:multierr",
Expand Down
2 changes: 1 addition & 1 deletion lightning/pkg/importer/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"database/sql"
"fmt"
"io"
"maps"
"strings"

mysql_sql_driver "github.com/go-sql-driver/mysql"
Expand Down Expand Up @@ -52,7 +53,6 @@ import (
"github.com/pingcap/tidb/pkg/util/mock"
pdhttp "github.com/tikv/pd/client/http"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

// compressionRatio is the tikv/tiflash's compression ratio
Expand Down
4 changes: 2 additions & 2 deletions lightning/pkg/importer/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1296,7 +1296,7 @@ func checkFieldCompatibility(
values []types.Datum,
logger log.Logger,
) bool {
se := kv.NewSessionCtx(&encode.SessionOptions{
se := kv.NewSession(&encode.SessionOptions{
SQLMode: mysql.ModeStrictTransTables,
}, logger)
for i, col := range tbl.Columns {
Expand All @@ -1307,7 +1307,7 @@ func checkFieldCompatibility(
if i >= len(values) {
break
}
_, err := table.CastValue(se, values[i], col, true, false)
_, err := table.CastColumnValue(se.GetExprCtx(), values[i], col, true, false)
if err != nil {
logger.Error("field value is not consistent with column type", zap.String("value", values[i].GetString()),
zap.Any("column_info", col), zap.Error(err))
Expand Down
18 changes: 12 additions & 6 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,11 @@ func loadTableRanges(
zap.Int64("physicalTableID", t.GetPhysicalID()))
return []kv.KeyRange{{StartKey: startKey, EndKey: endKey}}, nil
}
failpoint.Inject("setLimitForLoadTableRanges", func(val failpoint.Value) {
if v, ok := val.(int); ok {
limit = v
}
})

rc := s.GetRegionCache()
maxSleep := 10000 // ms
Expand All @@ -466,6 +471,12 @@ func loadTableRanges(
if err != nil {
return false, errors.Trace(err)
}
var mockErr bool
failpoint.InjectCall("beforeLoadRangeFromPD", &mockErr)
if mockErr {
return false, kv.ErrTxnRetryable
}

ranges = make([]kv.KeyRange, 0, len(rs))
for _, r := range rs {
ranges = append(ranges, kv.KeyRange{StartKey: r.StartKey(), EndKey: r.EndKey()})
Expand Down Expand Up @@ -636,12 +647,7 @@ func makeupDecodeColMap(dbName model.CIStr, t table.Table) (map[int64]decoder.Co
return decodeColMap, nil
}

var backfillTaskChanSize = 128

// SetBackfillTaskChanSizeForTest is only used for test.
func SetBackfillTaskChanSizeForTest(n int) {
backfillTaskChanSize = n
}
const backfillTaskChanSize = 128

func (dc *ddlCtx) runAddIndexInLocalIngestMode(
ctx context.Context,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) (adjusted kv.K
if src.cpMgr == nil {
return start, false
}
cpKey := src.cpMgr.LastProcessedKey()
cpKey := src.cpMgr.NextKeyToProcess()
if len(cpKey) == 0 {
return start, false
}
Expand All @@ -364,7 +364,7 @@ func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) (adjusted kv.K
if cpKey.Cmp(end) == 0 {
return cpKey, true
}
return cpKey.Next(), false
return cpKey, false
}

func (src *TableScanTaskSource) generateTasks() error {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ func checkTableForeignKeysValid(sctx sessionctx.Context, is infoschema.InfoSchem
}

referredFKInfos := is.GetTableReferredForeignKeys(schema, tbInfo.Name.L)
ctx := infoschema.WithRefillOption(context.Background(), false)
for _, referredFK := range referredFKInfos {
childTable, err := is.TableByName(context.Background(), referredFK.ChildSchema, referredFK.ChildTable)
childTable, err := is.TableByName(ctx, referredFK.ChildSchema, referredFK.ChildTable)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/ingest/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ func (s *CheckpointManager) IsKeyProcessed(end kv.Key) bool {
return s.localDataIsValid && len(s.flushedKeyLowWatermark) > 0 && end.Cmp(s.flushedKeyLowWatermark) <= 0
}

// LastProcessedKey finds the last processed key in checkpoint.
// If there is no processed key, it returns nil.
func (s *CheckpointManager) LastProcessedKey() kv.Key {
// NextKeyToProcess finds the next unprocessed key in checkpoint.
// If there is no such key, it returns nil.
func (s *CheckpointManager) NextKeyToProcess() kv.Key {
s.mu.Lock()
defer s.mu.Unlock()

Expand Down
1 change: 0 additions & 1 deletion pkg/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ go_library(
"@com_github_tikv_client_go_v2//tikvrpc/interceptor",
"@com_github_tikv_client_go_v2//util",
"@org_golang_google_grpc//metadata",
"@org_golang_x_exp//maps",
"@org_uber_go_zap//:zap",
],
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"container/heap"
"context"
"fmt"
"maps"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -46,7 +47,6 @@ import (
"github.com/tikv/client-go/v2/tikv"
clientutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

var (
Expand Down
1 change: 1 addition & 0 deletions pkg/domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ go_library(
"//pkg/util/globalconn",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/mathutil",
"//pkg/util/memory",
"//pkg/util/memoryusagealarm",
"//pkg/util/printer",
Expand Down
25 changes: 14 additions & 11 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ import (
"github.com/pingcap/tidb/pkg/util/globalconn"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/mathutil"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/memoryusagealarm"
"github.com/pingcap/tidb/pkg/util/replayer"
Expand Down Expand Up @@ -462,20 +463,22 @@ func (do *Domain) fetchAllSchemasWithTables(m *meta.Meta) ([]*model.DBInfo, erro
const fetchSchemaConcurrency = 1

func (*Domain) splitForConcurrentFetch(schemas []*model.DBInfo) [][]*model.DBInfo {
groupSize := (len(schemas) + fetchSchemaConcurrency - 1) / fetchSchemaConcurrency
if variable.SchemaCacheSize.Load() > 0 && len(schemas) > 1000 {
groupCnt := fetchSchemaConcurrency
schemaCnt := len(schemas)
if variable.SchemaCacheSize.Load() > 0 && schemaCnt > 1000 {
// TODO: Temporary solution to speed up when too many databases, will refactor it later.
groupSize = 8
groupCnt = 8
}
splitted := make([][]*model.DBInfo, 0, fetchSchemaConcurrency)
schemaCnt := len(schemas)
for i := 0; i < schemaCnt; i += groupSize {
end := i + groupSize
if end > schemaCnt {
end = schemaCnt
}
splitted = append(splitted, schemas[i:end])

splitted := make([][]*model.DBInfo, 0, groupCnt)
groupSizes := mathutil.Divide2Batches(schemaCnt, groupCnt)

start := 0
for _, groupSize := range groupSizes {
splitted = append(splitted, schemas[start:start+groupSize])
start += groupSize
}

return splitted
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,16 +440,17 @@ func dumpMeta(zw *zip.Writer) error {
return nil
}

func dumpTiFlashReplica(ctx sessionctx.Context, zw *zip.Writer, pairs map[tableNamePair]struct{}) error {
func dumpTiFlashReplica(sctx sessionctx.Context, zw *zip.Writer, pairs map[tableNamePair]struct{}) error {
bf, err := zw.Create(PlanReplayerTiFlashReplicasFile)
if err != nil {
return errors.AddStack(err)
}
is := GetDomain(ctx).InfoSchema()
is := GetDomain(sctx).InfoSchema()
ctx := infoschema.WithRefillOption(context.Background(), false)
for pair := range pairs {
dbName := model.NewCIStr(pair.DBName)
tableName := model.NewCIStr(pair.TableName)
t, err := is.TableByName(context.Background(), dbName, tableName)
t, err := is.TableByName(ctx, dbName, tableName)
if err != nil {
logutil.BgLogger().Warn("failed to find table info", zap.Error(err),
zap.String("dbName", dbName.L), zap.String("tableName", tableName.L))
Expand Down Expand Up @@ -496,11 +497,12 @@ func dumpSchemaMeta(zw *zip.Writer, tables map[tableNamePair]struct{}) error {
func dumpStatsMemStatus(zw *zip.Writer, pairs map[tableNamePair]struct{}, do *Domain) error {
statsHandle := do.StatsHandle()
is := do.InfoSchema()
ctx := infoschema.WithRefillOption(context.Background(), false)
for pair := range pairs {
if pair.IsView {
continue
}
tbl, err := is.TableByName(context.Background(), model.NewCIStr(pair.DBName), model.NewCIStr(pair.TableName))
tbl, err := is.TableByName(ctx, model.NewCIStr(pair.DBName), model.NewCIStr(pair.TableName))
if err != nil {
return err
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/domain/sysvar_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ func (do *Domain) GetSessionCache() (map[string]string, error) {
do.sysVarCache.RLock()
defer do.sysVarCache.RUnlock()
// Perform a deep copy since this will be assigned directly to the session
newMap := make(map[string]string, len(do.sysVarCache.session))
maps.Copy(newMap, do.sysVarCache.session)
return newMap, nil
return maps.Clone(do.sysVarCache.session), nil
}

// GetGlobalVar gets an individual global var from the sysvar cache.
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func getOldRow(ctx context.Context, sctx sessionctx.Context, txn kv.Transaction,
}

cols := t.WritableCols()
oldRow, oldRowMap, err := tables.DecodeRawRowData(sctx, t.Meta(), handle, cols, oldValue)
oldRow, oldRowMap, err := tables.DecodeRawRowData(sctx.GetExprCtx(), t.Meta(), handle, cols, oldValue)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/import_into.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (e *ImportIntoExec) fillJobInfo(ctx context.Context, jobID int64, req *chun
}); err != nil {
return err
}
fillOneImportJobInfo(info, req, unknownImportedRowCount)
FillOneImportJobInfo(info, req, unknownImportedRowCount)
return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ go_library(
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/planner/context",
"//pkg/planner/core",
"//pkg/planner/util",
"//pkg/sessionctx",
Expand Down
9 changes: 5 additions & 4 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
planctx "github.com/pingcap/tidb/pkg/planner/context"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx"
Expand Down Expand Up @@ -1291,17 +1292,17 @@ func (p *Plan) IsGlobalSort() bool {
// CreateColAssignExprs creates the column assignment expressions using session context.
// RewriteAstExpr will write ast node in place(due to xxNode.Accept), but it doesn't change node content,
// so we sync it.
func (e *LoadDataController) CreateColAssignExprs(sctx sessionctx.Context) ([]expression.Expression, []contextutil.SQLWarn, error) {
func (e *LoadDataController) CreateColAssignExprs(planCtx planctx.PlanContext) ([]expression.Expression, []contextutil.SQLWarn, error) {
e.colAssignMu.Lock()
defer e.colAssignMu.Unlock()
res := make([]expression.Expression, 0, len(e.ColumnAssignments))
allWarnings := []contextutil.SQLWarn{}
for _, assign := range e.ColumnAssignments {
newExpr, err := plannerutil.RewriteAstExprWithPlanCtx(sctx.GetPlanCtx(), assign.Expr, nil, nil, false)
newExpr, err := plannerutil.RewriteAstExprWithPlanCtx(planCtx, assign.Expr, nil, nil, false)
// col assign expr warnings is static, we should generate it for each row processed.
// so we save it and clear it here.
allWarnings = append(allWarnings, sctx.GetSessionVars().StmtCtx.GetWarnings()...)
sctx.GetSessionVars().StmtCtx.SetWarnings(nil)
allWarnings = append(allWarnings, planCtx.GetSessionVars().StmtCtx.GetWarnings()...)
planCtx.GetSessionVars().StmtCtx.SetWarnings(nil)
if err != nil {
return nil, nil, err
}
Expand Down
17 changes: 6 additions & 11 deletions pkg/executor/importer/kv_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql" //nolint: goimports
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
Expand Down Expand Up @@ -64,8 +63,8 @@ func NewTableKVEncoder(
return nil, err
}
// we need a non-nil TxnCtx to avoid panic when evaluating set clause
baseKVEncoder.SessionCtx.Vars.TxnCtx = new(variable.TransactionContext)
colAssignExprs, _, err := ti.CreateColAssignExprs(baseKVEncoder.SessionCtx)
baseKVEncoder.SessionCtx.SetTxnCtxNotNil()
colAssignExprs, _, err := ti.CreateColAssignExprs(baseKVEncoder.SessionCtx.GetPlanCtx())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -95,24 +94,20 @@ func (en *tableKVEncoder) Encode(row []types.Datum, rowID int64) (*kv.Pairs, err
}

func (en *tableKVEncoder) GetColumnSize() map[int64]int64 {
sessionVars := en.SessionCtx.GetSessionVars()
sessionVars.TxnCtxMu.Lock()
defer sessionVars.TxnCtxMu.Unlock()
return sessionVars.TxnCtx.TableDeltaMap[en.TableMeta().ID].ColSize
return en.SessionCtx.GetColumnSize(en.TableMeta().ID)
}

// todo merge with code in load_data.go
func (en *tableKVEncoder) parserData2TableData(parserData []types.Datum, rowID int64) ([]types.Datum, error) {
row := make([]types.Datum, 0, len(en.insertColumns))
sessionVars := en.SessionCtx.GetSessionVars()
setVar := func(name string, col *types.Datum) {
// User variable names are not case-sensitive
// https://dev.mysql.com/doc/refman/8.0/en/user-variables.html
name = strings.ToLower(name)
if col == nil || col.IsNull() {
sessionVars.UnsetUserVar(name)
en.SessionCtx.UnsetUserVar(name)
} else {
sessionVars.SetUserVarVal(name, *col)
en.SessionCtx.SetUserVarVal(name, *col)
}
}

Expand Down Expand Up @@ -166,7 +161,7 @@ func (en *tableKVEncoder) getRow(vals []types.Datum, rowID int64) ([]types.Datum
row := make([]types.Datum, len(en.Columns))
hasValue := make([]bool, len(en.Columns))
for i := 0; i < len(en.insertColumns); i++ {
casted, err := table.CastValue(en.SessionCtx, vals[i], en.insertColumns[i].ToInfo(), false, false)
casted, err := table.CastColumnValue(en.SessionCtx.GetExprCtx(), vals[i], en.insertColumns[i].ToInfo(), false, false)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func initEncodeCommitWorkers(e *LoadDataWorker) (*encodeWorker, *commitWorker, e
if err2 != nil {
return nil, nil, err2
}
colAssignExprs, exprWarnings, err2 := e.controller.CreateColAssignExprs(insertValues.Ctx())
colAssignExprs, exprWarnings, err2 := e.controller.CreateColAssignExprs(insertValues.Ctx().GetPlanCtx())
if err2 != nil {
return nil, nil, err2
}
Expand Down
Loading

0 comments on commit 80d9f5f

Please sign in to comment.