Skip to content

Commit

Permalink
lighting: expose limited fields for kv.Session (#55517)
Browse files Browse the repository at this point in the history
ref #53388
  • Loading branch information
lcwangchao authored Aug 20, 2024
1 parent cbdd12e commit 1f095a3
Show file tree
Hide file tree
Showing 21 changed files with 155 additions and 146 deletions.
4 changes: 2 additions & 2 deletions lightning/pkg/importer/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1296,7 +1296,7 @@ func checkFieldCompatibility(
values []types.Datum,
logger log.Logger,
) bool {
se := kv.NewSessionCtx(&encode.SessionOptions{
se := kv.NewSession(&encode.SessionOptions{
SQLMode: mysql.ModeStrictTransTables,
}, logger)
for i, col := range tbl.Columns {
Expand All @@ -1307,7 +1307,7 @@ func checkFieldCompatibility(
if i >= len(values) {
break
}
_, err := table.CastValue(se, values[i], col, true, false)
_, err := table.CastColumnValue(se.GetExprCtx(), values[i], col, true, false)
if err != nil {
logger.Error("field value is not consistent with column type", zap.String("value", values[i].GetString()),
zap.Any("column_info", col), zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func getOldRow(ctx context.Context, sctx sessionctx.Context, txn kv.Transaction,
}

cols := t.WritableCols()
oldRow, oldRowMap, err := tables.DecodeRawRowData(sctx, t.Meta(), handle, cols, oldValue)
oldRow, oldRowMap, err := tables.DecodeRawRowData(sctx.GetExprCtx(), t.Meta(), handle, cols, oldValue)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ go_library(
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/planner/context",
"//pkg/planner/core",
"//pkg/planner/util",
"//pkg/sessionctx",
Expand Down
9 changes: 5 additions & 4 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
planctx "github.com/pingcap/tidb/pkg/planner/context"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx"
Expand Down Expand Up @@ -1291,17 +1292,17 @@ func (p *Plan) IsGlobalSort() bool {
// CreateColAssignExprs creates the column assignment expressions using session context.
// RewriteAstExpr will write ast node in place(due to xxNode.Accept), but it doesn't change node content,
// so we sync it.
func (e *LoadDataController) CreateColAssignExprs(sctx sessionctx.Context) ([]expression.Expression, []contextutil.SQLWarn, error) {
func (e *LoadDataController) CreateColAssignExprs(planCtx planctx.PlanContext) ([]expression.Expression, []contextutil.SQLWarn, error) {
e.colAssignMu.Lock()
defer e.colAssignMu.Unlock()
res := make([]expression.Expression, 0, len(e.ColumnAssignments))
allWarnings := []contextutil.SQLWarn{}
for _, assign := range e.ColumnAssignments {
newExpr, err := plannerutil.RewriteAstExprWithPlanCtx(sctx.GetPlanCtx(), assign.Expr, nil, nil, false)
newExpr, err := plannerutil.RewriteAstExprWithPlanCtx(planCtx, assign.Expr, nil, nil, false)
// col assign expr warnings is static, we should generate it for each row processed.
// so we save it and clear it here.
allWarnings = append(allWarnings, sctx.GetSessionVars().StmtCtx.GetWarnings()...)
sctx.GetSessionVars().StmtCtx.SetWarnings(nil)
allWarnings = append(allWarnings, planCtx.GetSessionVars().StmtCtx.GetWarnings()...)
planCtx.GetSessionVars().StmtCtx.SetWarnings(nil)
if err != nil {
return nil, nil, err
}
Expand Down
17 changes: 6 additions & 11 deletions pkg/executor/importer/kv_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql" //nolint: goimports
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
Expand Down Expand Up @@ -64,8 +63,8 @@ func NewTableKVEncoder(
return nil, err
}
// we need a non-nil TxnCtx to avoid panic when evaluating set clause
baseKVEncoder.SessionCtx.Vars.TxnCtx = new(variable.TransactionContext)
colAssignExprs, _, err := ti.CreateColAssignExprs(baseKVEncoder.SessionCtx)
baseKVEncoder.SessionCtx.SetTxnCtxNotNil()
colAssignExprs, _, err := ti.CreateColAssignExprs(baseKVEncoder.SessionCtx.GetPlanCtx())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -95,24 +94,20 @@ func (en *tableKVEncoder) Encode(row []types.Datum, rowID int64) (*kv.Pairs, err
}

func (en *tableKVEncoder) GetColumnSize() map[int64]int64 {
sessionVars := en.SessionCtx.GetSessionVars()
sessionVars.TxnCtxMu.Lock()
defer sessionVars.TxnCtxMu.Unlock()
return sessionVars.TxnCtx.TableDeltaMap[en.TableMeta().ID].ColSize
return en.SessionCtx.GetColumnSize(en.TableMeta().ID)
}

// todo merge with code in load_data.go
func (en *tableKVEncoder) parserData2TableData(parserData []types.Datum, rowID int64) ([]types.Datum, error) {
row := make([]types.Datum, 0, len(en.insertColumns))
sessionVars := en.SessionCtx.GetSessionVars()
setVar := func(name string, col *types.Datum) {
// User variable names are not case-sensitive
// https://dev.mysql.com/doc/refman/8.0/en/user-variables.html
name = strings.ToLower(name)
if col == nil || col.IsNull() {
sessionVars.UnsetUserVar(name)
en.SessionCtx.UnsetUserVar(name)
} else {
sessionVars.SetUserVarVal(name, *col)
en.SessionCtx.SetUserVarVal(name, *col)
}
}

Expand Down Expand Up @@ -166,7 +161,7 @@ func (en *tableKVEncoder) getRow(vals []types.Datum, rowID int64) ([]types.Datum
row := make([]types.Datum, len(en.Columns))
hasValue := make([]bool, len(en.Columns))
for i := 0; i < len(en.insertColumns); i++ {
casted, err := table.CastValue(en.SessionCtx, vals[i], en.insertColumns[i].ToInfo(), false, false)
casted, err := table.CastColumnValue(en.SessionCtx.GetExprCtx(), vals[i], en.insertColumns[i].ToInfo(), false, false)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func initEncodeCommitWorkers(e *LoadDataWorker) (*encodeWorker, *commitWorker, e
if err2 != nil {
return nil, nil, err2
}
colAssignExprs, exprWarnings, err2 := e.controller.CreateColAssignExprs(insertValues.Ctx())
colAssignExprs, exprWarnings, err2 := e.controller.CreateColAssignExprs(insertValues.Ctx().GetPlanCtx())
if err2 != nil {
return nil, nil, err2
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/lightning/backend/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,12 @@ go_test(
"base_test.go",
"kv2sql_test.go",
"session_internal_test.go",
"session_test.go",
"sql2kv_test.go",
],
embed = [":kv"],
flaky = True,
race = "on",
shard_count = 19,
shard_count = 18,
deps = [
"//pkg/ddl",
"//pkg/kv",
Expand Down
27 changes: 11 additions & 16 deletions pkg/lightning/backend/kv/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
Expand Down Expand Up @@ -208,11 +207,7 @@ func (e *BaseKVEncoder) Record2KV(record, originalRow []types.Datum, rowID int64

// AddRecord adds a record into encoder
func (e *BaseKVEncoder) AddRecord(record []types.Datum) (kv.Handle, error) {
txn, err := e.SessionCtx.Txn(true)
if err != nil {
return nil, err
}
return e.table.AddRecord(e.SessionCtx.GetTableCtx(), txn, record, table.DupKeyCheckSkip)
return e.table.AddRecord(e.SessionCtx.GetTableCtx(), e.SessionCtx.Txn(), record, table.DupKeyCheckSkip)
}

// TableAllocators returns the allocators of the table
Expand Down Expand Up @@ -258,8 +253,10 @@ func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatu
)

isBadNullValue := false
exprCtx := e.SessionCtx.GetExprCtx()
errCtx := exprCtx.GetEvalCtx().ErrCtx()
if inputDatum != nil {
value, err = table.CastValue(e.SessionCtx, *inputDatum, col.ToInfo(), false, false)
value, err = table.CastColumnValue(exprCtx, *inputDatum, col.ToInfo(), false, false)
if err != nil {
return value, err
}
Expand All @@ -272,7 +269,7 @@ func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatu
switch {
case IsAutoIncCol(col.ToInfo()):
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(e.SessionCtx,
value, err = table.CastColumnValue(exprCtx,
types.NewIntDatum(rowID), col.ToInfo(), false, false)
case e.IsAutoRandomCol(col.ToInfo()):
var val types.Datum
Expand All @@ -282,21 +279,19 @@ func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatu
} else {
val = types.NewIntDatum(realRowID)
}
value, err = table.CastValue(e.SessionCtx, val, col.ToInfo(), false, false)
value, err = table.CastColumnValue(exprCtx, val, col.ToInfo(), false, false)
case col.IsGenerated():
// inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil.
// if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic.
value = types.GetMinValue(&col.FieldType)
case isBadNullValue:
err = col.HandleBadNull(e.SessionCtx.Vars.StmtCtx.ErrCtx(), &value, 0)
err = col.HandleBadNull(errCtx, &value, 0)
default:
// copy from the following GetColDefaultValue function, when this is true it will use getColDefaultExprValue
if col.DefaultIsExpr {
// the expression rewriter requires a non-nil TxnCtx.
e.SessionCtx.Vars.TxnCtx = new(variable.TransactionContext)
defer func() {
e.SessionCtx.Vars.TxnCtx = nil
}()
deferFn := e.SessionCtx.SetTxnCtxNotNil()
defer deferFn()
}
value, err = table.GetColDefaultValue(e.SessionCtx.GetExprCtx(), col.ToInfo())
}
Expand Down Expand Up @@ -363,7 +358,7 @@ func (e *BaseKVEncoder) LogEvalGenExprFailed(row []types.Datum, colInfo *model.C

// TruncateWarns resets the warnings in session context.
func (e *BaseKVEncoder) TruncateWarns() {
e.SessionCtx.Vars.StmtCtx.TruncateWarnings(0)
e.SessionCtx.GetExprCtx().GetEvalCtx().TruncateWarnings(0)
}

func evalGeneratedColumns(se *Session, record []types.Datum, cols []*table.Column,
Expand All @@ -375,7 +370,7 @@ func evalGeneratedColumns(se *Session, record []types.Datum, cols []*table.Colum
if err != nil {
return col, err
}
value, err := table.CastValue(se, evaluated, col, false, false)
value, err := table.CastColumnValue(se.GetExprCtx(), evaluated, col, false, false)
if err != nil {
return col, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/lightning/backend/kv/kv2sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (t *TableKVDecoder) DecodeHandleFromIndex(indexInfo *model.IndexInfo, key,

// DecodeRawRowData decodes raw row data into a datum slice and a (columnID:columnValue) map.
func (t *TableKVDecoder) DecodeRawRowData(h kv.Handle, value []byte) ([]types.Datum, map[int64]types.Datum, error) {
return tables.DecodeRawRowData(t.se, t.tbl.Meta(), h, t.tbl.Cols(), value)
return tables.DecodeRawRowData(t.se.GetExprCtx(), t.tbl.Meta(), h, t.tbl.Cols(), value)
}

// DecodeRawRowDataAsStr decodes raw row data into a string.
Expand Down Expand Up @@ -92,6 +92,8 @@ func (t *TableKVDecoder) IterRawIndexKeys(h kv.Handle, rawRow []byte, fn func([]

var buffer []types.Datum
var indexBuffer []byte
evalCtx := t.se.GetExprCtx().GetEvalCtx()
ec, loc := evalCtx.ErrCtx(), evalCtx.Location()
for _, index := range indices {
// skip clustered PK
if index.Meta().Primary && isCommonHandle {
Expand All @@ -102,8 +104,7 @@ func (t *TableKVDecoder) IterRawIndexKeys(h kv.Handle, rawRow []byte, fn func([]
if err != nil {
return err
}
sc := t.se.Vars.StmtCtx
iter := index.GenIndexKVIter(sc.ErrCtx(), sc.TimeZone(), indexValues, h, nil)
iter := index.GenIndexKVIter(ec, loc, indexValues, h, nil)
for iter.Valid() {
indexKey, _, _, err := iter.Next(indexBuffer, nil)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions pkg/lightning/backend/kv/kv2sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ func TestIterRawIndexKeysClusteredPK(t *testing.T) {
require.NoError(t, err)

sctx := kv.NewSession(sessionOpts, log.L())
txn, err := sctx.Txn(true)
require.NoError(t, err)
txn := sctx.Txn()
handle, err := tbl.AddRecord(sctx.GetTableCtx(), txn, []types.Datum{types.NewIntDatum(1), types.NewIntDatum(2)})
require.NoError(t, err)
paris := sctx.TakeKvPairs()
Expand Down Expand Up @@ -94,7 +93,7 @@ func TestIterRawIndexKeysIntPK(t *testing.T) {
require.NoError(t, err)

sctx := kv.NewSession(sessionOpts, log.L())
txn, err := sctx.Txn(true)
txn := sctx.Txn()
require.NoError(t, err)
handle, err := tbl.AddRecord(sctx.GetTableCtx(), txn, []types.Datum{types.NewIntDatum(1), types.NewIntDatum(2)})
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 1f095a3

Please sign in to comment.