Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: support null value for auto-incr column on local backend (#34552) #34595

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 68 additions & 29 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,50 +356,27 @@ func (kvcodec *tableKVEncoder) Encode(
}

meta := kvcodec.tbl.Meta()
isAutoRandom := meta.PKIsHandle && meta.ContainsAutoRandomBits()
for i, col := range cols {
var theDatum *types.Datum = nil
j := columnPermutation[i]
isAutoIncCol := mysql.HasAutoIncrementFlag(col.Flag)
isPk := mysql.HasPriKeyFlag(col.Flag)
switch {
case j >= 0 && j < len(row):
value, err = table.CastValue(kvcodec.se, row[j], col.ToInfo(), false, false)
if err == nil {
err = col.HandleBadNull(&value, kvcodec.se.vars.StmtCtx)
}
case isAutoIncCol:
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false)
case isAutoRandom && isPk:
var val types.Datum
realRowID := kvcodec.autoIDFn(rowID)
if mysql.HasUnsignedFlag(col.Flag) {
val = types.NewUintDatum(uint64(realRowID))
} else {
val = types.NewIntDatum(realRowID)
}
value, err = table.CastValue(kvcodec.se, val, col.ToInfo(), false, false)
case col.IsGenerated():
// inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil.
// if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic.
value = types.GetMinValue(&col.FieldType)
default:
value, err = table.GetColDefaultValue(kvcodec.se, col.ToInfo())
if j >= 0 && j < len(row) {
theDatum = &row[j]
}
value, err = kvcodec.getActualDatum(rowID, i, theDatum)
if err != nil {
return nil, logKVConvertFailed(logger, row, j, col.ToInfo(), err)
}

record = append(record, value)

if isAutoRandom && isPk {
if isTableAutoRandom(meta) && isPKCol(col.ToInfo()) {
incrementalBits := autoRandomIncrementBits(col, int(meta.AutoRandomBits))
alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.AutoRandomType)
if err := alloc.Rebase(context.Background(), value.GetInt64()&((1<<incrementalBits)-1), false); err != nil {
return nil, errors.Trace(err)
}
}
if isAutoIncCol {
if isAutoIncCol(col.ToInfo()) {
alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.AutoIncrementType)
if err := alloc.Rebase(context.Background(), getAutoRecordID(value, &col.FieldType), false); err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -450,6 +427,68 @@ func (kvcodec *tableKVEncoder) Encode(
return kvPairs, nil
}

func isTableAutoRandom(tblMeta *model.TableInfo) bool {
return tblMeta.PKIsHandle && tblMeta.ContainsAutoRandomBits()
}

func isAutoIncCol(colInfo *model.ColumnInfo) bool {
return mysql.HasAutoIncrementFlag(colInfo.Flag)
}

func isPKCol(colInfo *model.ColumnInfo) bool {
return mysql.HasPriKeyFlag(colInfo.Flag)
}

func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) {
var (
value types.Datum
err error
)

tblMeta := kvcodec.tbl.Meta()
cols := kvcodec.tbl.Cols()

// Since this method is only called when iterating the columns in the `Encode()` method,
// we can assume that the `colIndex` always have a valid input
col := cols[colIndex]

isBadNullValue := false
if inputDatum != nil {
value, err = table.CastValue(kvcodec.se, *inputDatum, col.ToInfo(), false, false)
if err != nil {
return value, err
}
if err := col.CheckNotNull(&value); err == nil {
return value, nil // the most normal case
}
isBadNullValue = true
}
// handle special values
switch {
case isAutoIncCol(col.ToInfo()):
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false)
case isTableAutoRandom(tblMeta) && isPKCol(col.ToInfo()):
var val types.Datum
realRowID := kvcodec.autoIDFn(rowID)
if mysql.HasUnsignedFlag(col.Flag) {
val = types.NewUintDatum(uint64(realRowID))
} else {
val = types.NewIntDatum(realRowID)
}
value, err = table.CastValue(kvcodec.se, val, col.ToInfo(), false, false)
case col.IsGenerated():
// inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil.
// if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic.
value = types.GetMinValue(&col.FieldType)
case isBadNullValue:
err = col.HandleBadNull(&value, kvcodec.se.vars.StmtCtx)
default:
value, err = table.GetColDefaultValue(kvcodec.se, col.ToInfo())
}
return value, err
}

// get record value for auto-increment field
//
// See: https://github.com/pingcap/tidb/blob/47f0f15b14ed54fc2222f3e304e29df7b05e6805/executor/insert_common.go#L781-L852
Expand Down
92 changes: 88 additions & 4 deletions br/pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,17 @@ func TestEncodeDoubleAutoIncrement(t *testing.T) {
},
})
require.NoError(t, err)
pairs, err := encoder.Encode(logger, []types.Datum{
types.NewStringDatum("1"),

strDatumForID := types.NewStringDatum("1")
actualDatum, err := encoder.(*tableKVEncoder).getActualDatum(70, 0, &strDatumForID)
require.NoError(t, err)
require.Equal(t, types.NewFloat64Datum(1.0), actualDatum)

pairsExpect, err := encoder.Encode(logger, []types.Datum{
types.NewFloat64Datum(1.0),
}, 70, []int{0, -1}, "1.csv", 1234)
require.NoError(t, err)
require.Equal(t, pairs, &KvPairs{pairs: []common.KvPair{
require.Equal(t, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x8, 0x0, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Expand All @@ -337,10 +343,88 @@ func TestEncodeDoubleAutoIncrement(t *testing.T) {
Val: []uint8{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
RowID: 70,
},
}})
}}, pairsExpect)

pairs, err := encoder.Encode(logger, []types.Datum{
types.NewStringDatum("1"),
}, 70, []int{0, -1}, "1.csv", 1234)
require.NoError(t, err)

require.Equal(t, pairsExpect, pairs)
require.Equal(t, tbl.Allocators(encoder.(*tableKVEncoder).se).Get(autoid.AutoIncrementType).Base(), int64(70))
}

func TestEncodeMissingAutoValue(t *testing.T) {
logger := log.Logger{Logger: zap.NewNop()}

var rowID int64 = 70
type testTableInfo struct {
AllocType autoid.AllocatorType
CreateStmt string
}

for _, testTblInfo := range []testTableInfo{
{
AllocType: autoid.AutoIncrementType,
CreateStmt: "create table t (id integer primary key auto_increment);",
},
{
AllocType: autoid.AutoRandomType,
CreateStmt: "create table t (id integer primary key auto_random(3));",
},
} {
tblInfo := mockTableInfo(t, testTblInfo.CreateStmt)
if testTblInfo.AllocType == autoid.AutoRandomType {
// seems parser can't parse auto_random properly.
tblInfo.AutoRandomBits = 3
}
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tblInfo)
require.NoError(t, err)

encoder, err := NewTableKVEncoder(tbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
SysVars: map[string]string{
"tidb_row_format_version": "2",
},
})
require.NoError(t, err)

realRowID := encoder.(*tableKVEncoder).autoIDFn(rowID)

var nullDatum types.Datum
nullDatum.SetNull()

expectIDDatum := types.NewIntDatum(realRowID)
actualIDDatum, err := encoder.(*tableKVEncoder).getActualDatum(rowID, 0, nil)
require.NoError(t, err)
require.Equal(t, expectIDDatum, actualIDDatum)

actualIDDatum, err = encoder.(*tableKVEncoder).getActualDatum(rowID, 0, &nullDatum)
require.NoError(t, err)
require.Equal(t, expectIDDatum, actualIDDatum)

pairsExpect, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(realRowID),
}, rowID, []int{0}, "1.csv", 1234)
require.NoError(t, err)

// test insert a NULL value on auto_xxxx column, and it is set to NOT NULL
pairs, err := encoder.Encode(logger, []types.Datum{
nullDatum,
}, rowID, []int{0}, "1.csv", 1234)
require.NoError(t, err)
require.Equalf(t, pairsExpect, pairs, "test table info: %+v", testTblInfo)
require.Equalf(t, rowID, tbl.Allocators(encoder.(*tableKVEncoder).se).Get(testTblInfo.AllocType).Base(), "test table info: %+v", testTblInfo)

// test insert a row without specifying the auto_xxxx column
pairs, err = encoder.Encode(logger, []types.Datum{}, rowID, []int{0}, "1.csv", 1234)
require.NoError(t, err)
require.Equalf(t, pairsExpect, pairs, "test table info: %+v", testTblInfo)
require.Equalf(t, rowID, tbl.Allocators(encoder.(*tableKVEncoder).se).Get(testTblInfo.AllocType).Base(), "test table info: %+v", testTblInfo)

}
}

func mockTableInfo(t *testing.T, createSQL string) *model.TableInfo {
parser := parser.New()
node, err := parser.ParseOneStmt(createSQL, "", "")
Expand Down
2 changes: 2 additions & 0 deletions br/tests/lightning_auto_columns/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[tikv-importer]
backend = "local"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create schema lightning_auto_cols;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE TABLE t_auto_incr (
id bigint PRIMARY KEY AUTO_INCREMENT,
c char(40) NOT NULL DEFAULT '');
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
INSERT INTO t_auto_incr (id, c) VALUES
(1, 'normal_pk_01');
INSERT INTO t_auto_incr VALUES
(NULL, 'null_pk_02');
INSERT INTO t_auto_incr VALUES
(NULL, 'null_pk_03');
INSERT INTO t_auto_incr (id, c) VALUES
(4, 'normal_pk_04');
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE TABLE t_auto_random (
id bigint PRIMARY KEY AUTO_RANDOM(3),
c char(40) NOT NULL DEFAULT '');
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
INSERT INTO t_auto_random (id, c) VALUES
(1, 'normal_pk_01');
INSERT INTO t_auto_random VALUES
(NULL, 'null_pk_02');
INSERT INTO t_auto_random VALUES
(NULL, 'null_pk_03');
INSERT INTO t_auto_random (id, c) VALUES
(4, 'normal_pk_04');
38 changes: 38 additions & 0 deletions br/tests/lightning_auto_columns/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/bin/sh
#
# Copyright 2020 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eux

run_sql 'DROP DATABASE IF EXISTS lightning_auto_cols;'
run_lightning

run_sql "SELECT CONCAT_WS(':', id, c) AS row_data FROM lightning_auto_cols.t_auto_incr;"
check_contains "row_data: 1:normal_pk_01"
check_contains "row_data: 2:null_pk_02"
check_contains "row_data: 3:null_pk_03"
check_contains "row_data: 4:normal_pk_04"
run_sql "SELECT COUNT(*) AS row_count FROM lightning_auto_cols.t_auto_incr;"
check_contains "row_count: 4"

run_sql "SELECT CONCAT_WS(':', id, c) AS row_data FROM lightning_auto_cols.t_auto_random;"
check_contains "row_data: 1:normal_pk_01"
check_contains ":null_pk_02"
check_not_contains "row_data: 0:null_pk_02"
check_contains ":null_pk_03"
check_not_contains "row_data: 0:null_pk_03"
check_contains "row_data: 4:normal_pk_04"
run_sql "SELECT COUNT(*) AS row_count FROM lightning_auto_cols.t_auto_random;"
check_contains "row_count: 4"