Skip to content

Commit

Permalink
fix conflict
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <[email protected]>
  • Loading branch information
Yisaer committed Dec 30, 2021
2 parents 4938813 + 57ef62b commit e65c1df
Show file tree
Hide file tree
Showing 166 changed files with 7,295 additions and 2,840 deletions.
11 changes: 11 additions & 0 deletions bindinfo/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ func (br *BindRecord) HasUsingBinding() bool {
return false
}

// FindUsingBinding gets the using binding.
// There is at most one binding that can be used now
func (br *BindRecord) FindUsingBinding() *Binding {
for _, binding := range br.Bindings {
if binding.Status == Using {
return &binding
}
}
return nil
}

// FindBinding find bindings in BindRecord.
func (br *BindRecord) FindBinding(hint string) *Binding {
for i := range br.Bindings {
Expand Down
113 changes: 47 additions & 66 deletions br/pkg/checksum/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,124 +7,105 @@ import (
"math"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/br/pkg/backup"
"github.com/pingcap/tidb/br/pkg/checksum"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)

func TestT(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testChecksumSuite{})

type testChecksumSuite struct {
mock *mock.Cluster
}

func (s *testChecksumSuite) SetUpSuite(c *C) {
var err error
s.mock, err = mock.NewCluster()
c.Assert(err, IsNil)
}

func (s *testChecksumSuite) TearDownSuite(c *C) {
testleak.AfterTest(c)()
}

func (s *testChecksumSuite) getTableInfo(c *C, db, table string) *model.TableInfo {
info, err := s.mock.Domain.GetSnapshotInfoSchema(math.MaxUint64)
c.Assert(err, IsNil)
func getTableInfo(t *testing.T, mock *mock.Cluster, db, table string) *model.TableInfo {
info, err := mock.Domain.GetSnapshotInfoSchema(math.MaxUint64)
require.NoError(t, err)
cDBName := model.NewCIStr(db)
cTableName := model.NewCIStr(table)
tableInfo, err := info.TableByName(cDBName, cTableName)
c.Assert(err, IsNil)
require.NoError(t, err)
return tableInfo.Meta()
}

func (s *testChecksumSuite) TestChecksum(c *C) {
c.Assert(s.mock.Start(), IsNil)
defer s.mock.Stop()
func TestChecksum(t *testing.T) {
mock, err := mock.NewCluster()
require.NoError(t, err)
require.NoError(t, mock.Start())
defer mock.Stop()

tk := testkit.NewTestKit(c, s.mock.Storage)
tk := testkit.NewTestKit(t, mock.Storage)
tk.MustExec("use test")

tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1 (a int);")
tk.MustExec("insert into t1 values (10);")
tableInfo1 := s.getTableInfo(c, "test", "t1")
tableInfo1 := getTableInfo(t, mock, "test", "t1")
exe1, err := checksum.NewExecutorBuilder(tableInfo1, math.MaxUint64).
SetConcurrency(variable.DefChecksumTableConcurrency).
Build()
c.Assert(err, IsNil)
c.Assert(exe1.Each(func(r *kv.Request) error {
c.Assert(r.NotFillCache, IsTrue)
c.Assert(r.Concurrency, Equals, variable.DefChecksumTableConcurrency)
require.NoError(t, err)
require.NoError(t, exe1.Each(func(r *kv.Request) error {
require.True(t, r.NotFillCache)
require.Equal(t, variable.DefChecksumTableConcurrency, r.Concurrency)
return nil
}), IsNil)
c.Assert(exe1.Len(), Equals, 1)
resp, err := exe1.Execute(context.TODO(), s.mock.Storage.GetClient(), func() {})
c.Assert(err, IsNil)
}))
require.Equal(t, 1, exe1.Len())
resp, err := exe1.Execute(context.TODO(), mock.Storage.GetClient(), func() {})
require.NoError(t, err)
// Cluster returns a dummy checksum (all fields are 1).
c.Assert(resp.Checksum, Equals, uint64(1), Commentf("%v", resp))
c.Assert(resp.TotalKvs, Equals, uint64(1), Commentf("%v", resp))
c.Assert(resp.TotalBytes, Equals, uint64(1), Commentf("%v", resp))
require.Equalf(t, uint64(1), resp.Checksum, "%v", resp)
require.Equalf(t, uint64(1), resp.TotalKvs, "%v", resp)
require.Equalf(t, uint64(1), resp.TotalBytes, "%v", resp)

tk.MustExec("drop table if exists t2;")
tk.MustExec("create table t2 (a int);")
tk.MustExec("alter table t2 add index i2(a);")
tk.MustExec("insert into t2 values (10);")
tableInfo2 := s.getTableInfo(c, "test", "t2")
tableInfo2 := getTableInfo(t, mock, "test", "t2")
exe2, err := checksum.NewExecutorBuilder(tableInfo2, math.MaxUint64).Build()
c.Assert(err, IsNil)
c.Assert(exe2.Len(), Equals, 2, Commentf("%v", tableInfo2))
resp2, err := exe2.Execute(context.TODO(), s.mock.Storage.GetClient(), func() {})
c.Assert(err, IsNil)
c.Assert(resp2.Checksum, Equals, uint64(0), Commentf("%v", resp2))
c.Assert(resp2.TotalKvs, Equals, uint64(2), Commentf("%v", resp2))
c.Assert(resp2.TotalBytes, Equals, uint64(2), Commentf("%v", resp2))
require.NoError(t, err)
require.Equalf(t, 2, exe2.Len(), "%v", tableInfo2)
resp2, err := exe2.Execute(context.TODO(), mock.Storage.GetClient(), func() {})
require.NoError(t, err)
require.Equalf(t, uint64(0), resp2.Checksum, "%v", resp2)
require.Equalf(t, uint64(2), resp2.TotalKvs, "%v", resp2)
require.Equalf(t, uint64(2), resp2.TotalBytes, "%v", resp2)

// Test rewrite rules
tk.MustExec("alter table t1 add index i2(a);")
tableInfo1 = s.getTableInfo(c, "test", "t1")
tableInfo1 = getTableInfo(t, mock, "test", "t1")
oldTable := metautil.Table{Info: tableInfo1}
exe2, err = checksum.NewExecutorBuilder(tableInfo2, math.MaxUint64).
SetOldTable(&oldTable).Build()
c.Assert(err, IsNil)
c.Assert(exe2.Len(), Equals, 2)
require.NoError(t, err)
require.Equal(t, 2, exe2.Len())
rawReqs, err := exe2.RawRequests()
c.Assert(err, IsNil)
c.Assert(rawReqs, HasLen, 2)
require.NoError(t, err)
require.Len(t, rawReqs, 2)
for _, rawReq := range rawReqs {
c.Assert(rawReq.Rule, NotNil)
require.NotNil(t, rawReq.Rule)
}
resp2, err = exe2.Execute(context.TODO(), s.mock.Storage.GetClient(), func() {})
c.Assert(err, IsNil)
c.Assert(resp2, NotNil)
resp2, err = exe2.Execute(context.TODO(), mock.Storage.GetClient(), func() {})
require.NoError(t, err)
require.NotNil(t, resp2)

// Test commonHandle ranges

tk.MustExec("drop table if exists t3;")
tk.MustExec("create table t3 (a char(255), b int, primary key(a) CLUSTERED);")
tk.MustExec("insert into t3 values ('fffffffff', 1), ('010101010', 2), ('394393fj39efefe', 3);")
tableInfo3 := s.getTableInfo(c, "test", "t3")
tableInfo3 := getTableInfo(t, mock, "test", "t3")
exe3, err := checksum.NewExecutorBuilder(tableInfo3, math.MaxUint64).Build()
c.Assert(err, IsNil)
require.NoError(t, err)
first := true
c.Assert(exe3.Each(func(req *kv.Request) error {
require.NoError(t, exe3.Each(func(req *kv.Request) error {
if first {
first = false
ranges, err := backup.BuildTableRanges(tableInfo3)
c.Assert(err, IsNil)
c.Assert(req.KeyRanges, DeepEquals, ranges[:1], Commentf("%v", req.KeyRanges))
require.NoError(t, err)
require.Equalf(t, ranges[:1], req.KeyRanges, "%v", req.KeyRanges)
}
return nil
}), IsNil)
}))
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package stmtstatstest
package checksum

import (
"testing"
Expand All @@ -22,10 +22,11 @@ import (
)

func TestMain(m *testing.M) {
testbridge.SetupForCommonTest()
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/klauspost/compress/zstd.(*blockDec).startDecoder"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
testbridge.SetupForCommonTest()
goleak.VerifyTestMain(m, opts...)
}
1 change: 0 additions & 1 deletion br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,6 @@ func (kvcodec *tableKVEncoder) Encode(
kvPairs := kvcodec.se.takeKvPairs()
for i := 0; i < len(kvPairs.pairs); i++ {
kvPairs.pairs[i].RowID = rowID
kvPairs.pairs[i].Offset = offset
}
kvcodec.recordCache = record[:0]
return kvPairs, nil
Expand Down
72 changes: 30 additions & 42 deletions br/pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,9 @@ func (s *kvSuite) TestEncode(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []uint8{0x8, 0x2, 0x8, 0x2},
RowID: 2,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []uint8{0x8, 0x2, 0x8, 0x2},
RowID: 2,
},
}})

Expand All @@ -136,10 +135,9 @@ func (s *kvSuite) TestEncode(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []uint8{0x8, 0x2, 0x8, 0xfe, 0x1},
RowID: 1,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []uint8{0x8, 0x2, 0x8, 0xfe, 0x1},
RowID: 1,
},
}})
}
Expand Down Expand Up @@ -262,8 +260,7 @@ func (s *kvSuite) TestEncodeRowFormatV2(c *C) {
0x1, 0x0, // not null offsets = [1]
0x7f, // column version = 127 (10000000 clamped to TINYINT)
},
RowID: 1,
Offset: 1234,
RowID: 1,
},
}})
}
Expand Down Expand Up @@ -300,10 +297,9 @@ func (s *kvSuite) TestEncodeTimestamp(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x8, 0x2, 0x9, 0x80, 0x80, 0x80, 0xf0, 0xfd, 0x8e, 0xf7, 0xc0, 0x19},
RowID: 70,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x8, 0x2, 0x9, 0x80, 0x80, 0x80, 0xf0, 0xfd, 0x8e, 0xf7, 0xc0, 0x19},
RowID: 70,
},
}})
}
Expand All @@ -328,16 +324,14 @@ func (s *kvSuite) TestEncodeDoubleAutoIncrement(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x8, 0x0, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
RowID: 70,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x8, 0x0, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
RowID: 70,
},
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Val: []uint8{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
RowID: 70,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Val: []uint8{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
RowID: 70,
},
}})
c.Assert(tbl.Allocators(encoder.(*tableKVEncoder).se).Get(autoid.AutoIncrementType).Base(), Equals, int64(70))
Expand Down Expand Up @@ -372,10 +366,9 @@ func (s *kvSuite) TestDefaultAutoRandoms(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
RowID: 70,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
RowID: 70,
},
}})
c.Assert(tbl.Allocators(encoder.(*tableKVEncoder).se).Get(autoid.AutoRandomType).Base(), Equals, int64(70))
Expand All @@ -384,10 +377,9 @@ func (s *kvSuite) TestDefaultAutoRandoms(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x47},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
RowID: 71,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x47},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
RowID: 71,
},
}})
c.Assert(tbl.Allocators(encoder.(*tableKVEncoder).se).Get(autoid.AutoRandomType).Base(), Equals, int64(71))
Expand Down Expand Up @@ -424,24 +416,20 @@ func (s *kvSuite) TestShardRowId(c *C) {
func (s *kvSuite) TestSplitIntoChunks(c *C) {
pairs := []common.KvPair{
{
Key: []byte{1, 2, 3},
Val: []byte{4, 5, 6},
Offset: 1000,
Key: []byte{1, 2, 3},
Val: []byte{4, 5, 6},
},
{
Key: []byte{7, 8},
Val: []byte{9, 0},
Offset: 2000,
Key: []byte{7, 8},
Val: []byte{9, 0},
},
{
Key: []byte{1, 2, 3, 4},
Val: []byte{5, 6, 7, 8},
Offset: 3000,
Key: []byte{1, 2, 3, 4},
Val: []byte{5, 6, 7, 8},
},
{
Key: []byte{9, 0},
Val: []byte{1, 2},
Offset: 4000,
Key: []byte{9, 0},
Val: []byte{1, 2},
},
}

Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func NewDuplicateManager(local *local, ts uint64, opts *kv.SessionOptions) (*Dup
regionConcurrency: local.tcpConcurrency,
splitCli: local.splitCli,
tikvCli: local.tikvCli,
keyAdapter: duplicateKeyAdapter{},
keyAdapter: dupDetectKeyAdapter{},
ts: ts,
connPool: common.NewGRPCConns(),
// TODO: not sure what is the correct concurrency value.
Expand Down Expand Up @@ -495,7 +495,7 @@ func (manager *DuplicateManager) CollectDuplicateRowsFromLocalIndex(

for iter.First(); iter.Valid(); iter.Next() {
hasDataConflict = true
rawKey, _, _, err := manager.keyAdapter.Decode(nil, iter.Key())
rawKey, err := manager.keyAdapter.Decode(nil, iter.Key())
if err != nil {
return err
}
Expand Down Expand Up @@ -570,7 +570,7 @@ func (manager *DuplicateManager) CollectDuplicateRowsFromLocalIndex(

for iter.First(); iter.Valid(); iter.Next() {
hasDataConflict = true
rawKey, _, _, err := manager.keyAdapter.Decode(nil, iter.Key())
rawKey, err := manager.keyAdapter.Decode(nil, iter.Key())
if err != nil {
indexLogger.Error(
"[detect-dupe] decode key error when query handle for duplicate index",
Expand Down
Loading

0 comments on commit e65c1df

Please sign in to comment.