Skip to content

Commit

Permalink
executor: compute ADMIN CHECKSUM for partitioned tables correc… (#11089)
Browse files Browse the repository at this point in the history
  • Loading branch information
kennytm authored and zz-jason committed Jul 15, 2019
1 parent 3f1d234 commit 368119b
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 12 deletions.
44 changes: 32 additions & 12 deletions executor/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,27 +187,47 @@ func newChecksumContext(db *model.DBInfo, table *model.TableInfo, startTs uint64
}

func (c *checksumContext) BuildRequests(ctx sessionctx.Context) ([]*kv.Request, error) {
reqs := make([]*kv.Request, 0, len(c.TableInfo.Indices)+1)
req, err := c.buildTableRequest(ctx)
if err != nil {
var partDefs []model.PartitionDefinition
if part := c.TableInfo.Partition; part != nil {
partDefs = part.Definitions
}

reqs := make([]*kv.Request, 0, (len(c.TableInfo.Indices)+1)*(len(partDefs)+1))
if err := c.appendRequest(ctx, c.TableInfo.ID, &reqs); err != nil {
return nil, err
}
reqs = append(reqs, req)

for _, partDef := range partDefs {
if err := c.appendRequest(ctx, partDef.ID, &reqs); err != nil {
return nil, err
}
}

return reqs, nil
}

func (c *checksumContext) appendRequest(ctx sessionctx.Context, tableID int64, reqs *[]*kv.Request) error {
req, err := c.buildTableRequest(ctx, tableID)
if err != nil {
return err
}

*reqs = append(*reqs, req)
for _, indexInfo := range c.TableInfo.Indices {
if indexInfo.State != model.StatePublic {
continue
}
req, err = c.buildIndexRequest(ctx, indexInfo)
req, err = c.buildIndexRequest(ctx, tableID, indexInfo)
if err != nil {
return nil, err
return err
}
reqs = append(reqs, req)
*reqs = append(*reqs, req)
}

return reqs, nil
return nil
}

func (c *checksumContext) buildTableRequest(ctx sessionctx.Context) (*kv.Request, error) {
func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int64) (*kv.Request, error) {
checksum := &tipb.ChecksumRequest{
StartTs: c.StartTs,
ScanOn: tipb.ChecksumScanOn_Table,
Expand All @@ -217,13 +237,13 @@ func (c *checksumContext) buildTableRequest(ctx sessionctx.Context) (*kv.Request
ranges := ranger.FullIntRange(false)

var builder distsql.RequestBuilder
return builder.SetTableRanges(c.TableInfo.ID, ranges, nil).
return builder.SetTableRanges(tableID, ranges, nil).
SetChecksumRequest(checksum).
SetConcurrency(ctx.GetSessionVars().DistSQLScanConcurrency).
Build()
}

func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, indexInfo *model.IndexInfo) (*kv.Request, error) {
func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, tableID int64, indexInfo *model.IndexInfo) (*kv.Request, error) {
checksum := &tipb.ChecksumRequest{
StartTs: c.StartTs,
ScanOn: tipb.ChecksumScanOn_Index,
Expand All @@ -233,7 +253,7 @@ func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, indexInfo *m
ranges := ranger.FullRange()

var builder distsql.RequestBuilder
return builder.SetIndexRanges(ctx.GetSessionVars().StmtCtx, c.TableInfo.ID, indexInfo.ID, ranges).
return builder.SetIndexRanges(ctx.GetSessionVars().StmtCtx, tableID, indexInfo.ID, ranges).
SetChecksumRequest(checksum).
SetConcurrency(ctx.GetSessionVars().DistSQLScanConcurrency).
Build()
Expand Down
11 changes: 11 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,17 @@ func (s *testSuite) TestAdmin(c *C) {
c.Assert(historyJobs, DeepEquals, historyJobs2)
}

func (s *testSuite) TestAdminChecksumOfPartitionedTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("USE test;")
tk.MustExec("DROP TABLE IF EXISTS admin_checksum_partition_test;")
tk.MustExec("CREATE TABLE admin_checksum_partition_test (a INT) PARTITION BY HASH(a) PARTITIONS 4;")
tk.MustExec("INSERT INTO admin_checksum_partition_test VALUES (1), (2);")

r := tk.MustQuery("ADMIN CHECKSUM TABLE admin_checksum_partition_test;")
r.Check(testkit.Rows("test admin_checksum_partition_test 1 5 5"))
}

func (s *testSuite) fillData(tk *testkit.TestKit, table string) {
tk.MustExec("use test")
tk.MustExec(fmt.Sprintf("create table %s(id int not null default 1, name varchar(255), PRIMARY KEY(id));", table))
Expand Down

0 comments on commit 368119b

Please sign in to comment.