Skip to content

Commit

Permalink
Merge branch 'release-7.1' into cherry-pick-51750-to-release-7.1
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros authored Apr 16, 2024
2 parents 221270a + 5bf91c6 commit 90fc497
Show file tree
Hide file tree
Showing 102 changed files with 1,462 additions and 448 deletions.
2 changes: 1 addition & 1 deletion br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ func BuildBackupSchemas(
// Treat cached table as normal table.
tableInfo.TableCacheStatusType = model.TableCacheStatusDisable

if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() {
if tableInfo.ContainsAutoRandomBits() {
// this table has auto_random id, we need backup and rebase in restoration
var globalAutoRandID int64
globalAutoRandID, err = autoIDAccess.RandomID().Get()
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ type CheckCtx struct {

// TargetInfoGetter defines the interfaces to get target information.
type TargetInfoGetter interface {
// FetchRemoteDBModels obtains the models of all databases. Currently, only
// the database name is filled.
FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error)

// FetchRemoteTableModels obtains the models of all tables given the schema
// name. The returned table info does not need to be precise if the encoder,
// is not requiring them, but must at least fill in the following fields for
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//tablecodec",
"//types",
"//util/chunk",
"//util/codec",
"//util/mathutil",
"//util/topsql/stmtstats",
"@com_github_docker_go_units//:go-units",
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/kv/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -204,7 +205,7 @@ func (e *BaseKVEncoder) Record2KV(record, originalRow []types.Datum, rowID int64
kvPairs := e.SessionCtx.TakeKvPairs()
for i := 0; i < len(kvPairs.Pairs); i++ {
var encoded [9]byte // The max length of encoded int64 is 9.
kvPairs.Pairs[i].RowID = common.EncodeIntRowIDToBuf(encoded[:0], rowID)
kvPairs.Pairs[i].RowID = codec.EncodeComparableVarint(encoded[:0], rowID)
}
e.recordCache = record[:0]
return kvPairs, nil
Expand Down
23 changes: 1 addition & 22 deletions br/pkg/lightning/backend/local/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ import (

// Iter abstract iterator method for Ingester.
type Iter interface {
// Seek seek to specify position.
// if key not found, seeks next key position in iter.
Seek(key []byte) bool
// Error return current error on this iter.
Error() error
// First moves this iter to the first key.
Expand Down Expand Up @@ -88,15 +85,6 @@ type DupDetectOpt struct {
ReportErrOnDup bool
}

func (d *dupDetectIter) Seek(key []byte) bool {
rawKey := d.keyAdapter.Encode(nil, key, ZeroRowID)
if d.err != nil || !d.iter.SeekGE(rawKey) {
return false
}
d.fill()
return d.err == nil
}

func (d *dupDetectIter) First() bool {
if d.err != nil || !d.iter.First() {
return false
Expand Down Expand Up @@ -155,7 +143,7 @@ func (d *dupDetectIter) Next() bool {
}
if d.option.ReportErrOnDup {
dupKey := make([]byte, len(d.curKey))
dupVal := make([]byte, len(d.iter.Value()))
dupVal := make([]byte, len(d.curVal))
copy(dupKey, d.curKey)
copy(dupVal, d.curVal)
d.err = common.ErrFoundDuplicateKeys.FastGenByArgs(dupKey, dupVal)
Expand Down Expand Up @@ -225,15 +213,6 @@ type dupDBIter struct {
err error
}

func (d *dupDBIter) Seek(key []byte) bool {
rawKey := d.keyAdapter.Encode(nil, key, ZeroRowID)
if d.err != nil || !d.iter.SeekGE(rawKey) {
return false
}
d.curKey, d.err = d.keyAdapter.Decode(d.curKey[:0], d.iter.Key())
return d.err == nil
}

func (d *dupDBIter) Error() error {
if d.err != nil {
return d.err
Expand Down
49 changes: 0 additions & 49 deletions br/pkg/lightning/backend/local/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,55 +178,6 @@ func TestDupDetectIterator(t *testing.T) {
}
}

func TestDupDetectIterSeek(t *testing.T) {
pairs := []common.KvPair{
{
Key: []byte{1, 2, 3, 0},
Val: randBytes(128),
RowID: common.EncodeIntRowID(1),
},
{
Key: []byte{1, 2, 3, 1},
Val: randBytes(128),
RowID: common.EncodeIntRowID(2),
},
{
Key: []byte{1, 2, 3, 1},
Val: randBytes(128),
RowID: common.EncodeIntRowID(3),
},
{
Key: []byte{1, 2, 3, 2},
Val: randBytes(128),
RowID: common.EncodeIntRowID(4),
},
}

storeDir := t.TempDir()
db, err := pebble.Open(filepath.Join(storeDir, "kv"), &pebble.Options{})
require.NoError(t, err)

keyAdapter := dupDetectKeyAdapter{}
wb := db.NewBatch()
for _, p := range pairs {
key := keyAdapter.Encode(nil, p.Key, p.RowID)
require.NoError(t, wb.Set(key, p.Val, nil))
}
require.NoError(t, wb.Commit(pebble.Sync))

dupDB, err := pebble.Open(filepath.Join(storeDir, "duplicates"), &pebble.Options{})
require.NoError(t, err)
iter := newDupDetectIter(db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L(), DupDetectOpt{})

require.True(t, iter.Seek([]byte{1, 2, 3, 1}))
require.Equal(t, pairs[1].Val, iter.Value())
require.True(t, iter.Next())
require.Equal(t, pairs[3].Val, iter.Value())
require.NoError(t, iter.Close())
require.NoError(t, db.Close())
require.NoError(t, dupDB.Close())
}

func TestKeyAdapterEncoding(t *testing.T) {
keyAdapter := dupDetectKeyAdapter{}
srcKey := []byte{1, 2, 3}
Expand Down
8 changes: 2 additions & 6 deletions br/pkg/lightning/backend/local/key_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
package local

import (
"math"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/util/codec"
)

Expand Down Expand Up @@ -102,8 +99,7 @@ func (dupDetectKeyAdapter) EncodedLen(key []byte, rowID []byte) int {

var _ KeyAdapter = dupDetectKeyAdapter{}

// static vars for rowID
var (
MinRowID = common.EncodeIntRowID(math.MinInt64)
ZeroRowID = common.EncodeIntRowID(0)
// MinRowID is the minimum rowID value after DupDetectKeyAdapter.Encode().
MinRowID = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0}
)
57 changes: 55 additions & 2 deletions br/pkg/lightning/backend/local/key_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ import (
"math"
"sort"
"testing"
"time"
"unsafe"

"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/stretchr/testify/require"
)

Expand All @@ -35,8 +40,9 @@ func randBytes(n int) []byte {
func TestNoopKeyAdapter(t *testing.T) {
keyAdapter := noopKeyAdapter{}
key := randBytes(32)
require.Len(t, key, keyAdapter.EncodedLen(key, ZeroRowID))
encodedKey := keyAdapter.Encode(nil, key, ZeroRowID)
rowID := randBytes(8)
require.Len(t, key, keyAdapter.EncodedLen(key, rowID))
encodedKey := keyAdapter.Encode(nil, key, rowID)
require.Equal(t, key, encodedKey)

decodedKey, err := keyAdapter.Decode(nil, encodedKey)
Expand Down Expand Up @@ -160,3 +166,50 @@ func TestDecodeKeyDstIsInsufficient(t *testing.T) {
require.Equal(t, key, buf2[4:])
}
}

func TestMinRowID(t *testing.T) {
keyApapter := dupDetectKeyAdapter{}
key := []byte("key")
val := []byte("val")
shouldBeMin := keyApapter.Encode(key, val, MinRowID)

rowIDs := make([][]byte, 0, 20)

// DDL

rowIDs = append(rowIDs, kv.IntHandle(math.MinInt64).Encoded())
rowIDs = append(rowIDs, kv.IntHandle(-1).Encoded())
rowIDs = append(rowIDs, kv.IntHandle(0).Encoded())
rowIDs = append(rowIDs, kv.IntHandle(math.MaxInt64).Encoded())
handleData := []types.Datum{
types.NewIntDatum(math.MinInt64),
types.NewIntDatum(-1),
types.NewIntDatum(0),
types.NewIntDatum(math.MaxInt64),
types.NewBytesDatum(make([]byte, 1)),
types.NewBytesDatum(make([]byte, 7)),
types.NewBytesDatum(make([]byte, 8)),
types.NewBytesDatum(make([]byte, 9)),
types.NewBytesDatum(make([]byte, 100)),
}
for _, d := range handleData {
sc := &stmtctx.StatementContext{TimeZone: time.Local}
encodedKey, err := codec.EncodeKey(sc, nil, d)
require.NoError(t, err)
ch, err := kv.NewCommonHandle(encodedKey)
require.NoError(t, err)
rowIDs = append(rowIDs, ch.Encoded())
}

// lightning, IMPORT INTO, ...

numRowIDs := []int64{math.MinInt64, -1, 0, math.MaxInt64}
for _, id := range numRowIDs {
rowIDs = append(rowIDs, codec.EncodeComparableVarint(nil, id))
}

for _, id := range rowIDs {
bs := keyApapter.Encode(key, val, id)
require.True(t, bytes.Compare(bs, shouldBeMin) >= 0)
}
}
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,11 @@ func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdCli pd.Client) backend.T
}
}

// FetchRemoteDBModels implements the `backend.TargetInfoGetter` interface.
func (g *targetInfoGetter) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) {
return tikv.FetchRemoteDBModelsFromTLS(ctx, g.tls)
}

// FetchRemoteTableModels obtains the models of all tables given the schema name.
// It implements the `TargetInfoGetter` interface.
func (g *targetInfoGetter) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
Expand Down
32 changes: 32 additions & 0 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,38 @@ func NewTargetInfoGetter(db *sql.DB) backend.TargetInfoGetter {
}
}

// FetchRemoteDBModels implements the `backend.TargetInfoGetter` interface.
func (b *targetInfoGetter) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) {
results := []*model.DBInfo{}
logger := log.FromContext(ctx)
s := common.SQLWithRetry{
DB: b.db,
Logger: logger,
}
err := s.Transact(ctx, "fetch db models", func(_ context.Context, tx *sql.Tx) error {
results = results[:0]

rows, e := tx.Query("SHOW DATABASES")
if e != nil {
return e
}
defer rows.Close()

for rows.Next() {
var dbName string
if e := rows.Scan(&dbName); e != nil {
return e
}
dbInfo := &model.DBInfo{
Name: model.NewCIStr(dbName),
}
results = append(results, dbInfo)
}
return rows.Err()
})
return results, err
}

// FetchRemoteTableModels obtains the models of all tables given the schema name.
// It implements the `backend.TargetInfoGetter` interface.
// TODO: refactor
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ const WholeTableEngineID = math.MaxInt32
// remember to increase the version number in case of incompatible change.
const (
CheckpointTableNameTask = "task_v2"
CheckpointTableNameTable = "table_v8"
CheckpointTableNameTable = "table_v9"
CheckpointTableNameEngine = "engine_v5"
CheckpointTableNameChunk = "chunk_v5"
)
Expand Down Expand Up @@ -106,7 +106,7 @@ const (
status tinyint unsigned DEFAULT 30,
alloc_base bigint NOT NULL DEFAULT 0,
table_id bigint NOT NULL DEFAULT 0,
table_info text NOT NULL,
table_info longtext NOT NULL,
create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
kv_bytes bigint unsigned NOT NULL DEFAULT 0,
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ go_test(
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_x_time//rate",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_multierr//:multierr",
],
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var retryableErrorMsgList = []string{
// this error happens on when distsql.Checksum calls TiKV
// see https://github.com/pingcap/tidb/blob/2c3d4f1ae418881a95686e8b93d4237f2e76eec6/store/copr/coprocessor.go#L941
"coprocessor task terminated due to exceeding the deadline",
// fix https://github.com/pingcap/tidb/issues/51383
"rate: wait",
}

func isRetryableFromErrorMessage(err error) bool {
Expand Down
11 changes: 11 additions & 0 deletions br/pkg/lightning/common/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
"net"
"net/url"
"testing"
"time"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
tmysql "github.com/pingcap/tidb/errno"
drivererr "github.com/pingcap/tidb/store/driver/error"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
"golang.org/x/time/rate"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -117,4 +119,13 @@ func TestIsRetryableError(t *testing.T) {
require.False(t, IsRetryableError(multierr.Combine(context.Canceled, &net.DNSError{IsTimeout: true})))

require.True(t, IsRetryableError(errors.New("other error: Coprocessor task terminated due to exceeding the deadline")))

// error from limiter
l := rate.NewLimiter(rate.Limit(1), 1)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// context has 1 second timeout, can't wait for 10 seconds
err = l.WaitN(ctx, 10)
require.Error(t, err)
require.True(t, IsRetryableError(err))
}
11 changes: 7 additions & 4 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,16 @@ type KvPair struct {
Key []byte
// Val is the value of the KV pair
Val []byte
// RowID is the row id of the KV pair.
// RowID identifies a KvPair in case two KvPairs are equal in Key and Val. It has
// two sources:
//
// When the KvPair is generated from ADD INDEX, the RowID is the encoded handle.
//
// Otherwise, the RowID is related to the row number in the source files, and
// encode the number with `codec.EncodeComparableVarint`.
RowID []byte
}

// EncodeIntRowIDToBuf encodes an int64 row id to a buffer.
var EncodeIntRowIDToBuf = codec.EncodeComparableVarint

// EncodeIntRowID encodes an int64 row id.
func EncodeIntRowID(rowID int64) []byte {
return codec.EncodeComparableVarint(nil, rowID)
Expand Down
Loading

0 comments on commit 90fc497

Please sign in to comment.