From 9af86df8152d9edd7bbc7b50eb37aec0e10ff73c Mon Sep 17 00:00:00 2001 From: kennytm Date: Fri, 5 Jul 2019 03:47:01 +0800 Subject: [PATCH] executor: compute ADMIN CHECKSUM for partitioned tables correctly --- executor/checksum.go | 44 ++++++++++++++++++++++++++++----------- executor/executor_test.go | 11 ++++++++++ 2 files changed, 43 insertions(+), 12 deletions(-) diff --git a/executor/checksum.go b/executor/checksum.go index 9ca8692088825..c84579fe85ee8 100644 --- a/executor/checksum.go +++ b/executor/checksum.go @@ -187,27 +187,47 @@ func newChecksumContext(db *model.DBInfo, table *model.TableInfo, startTs uint64 } func (c *checksumContext) BuildRequests(ctx sessionctx.Context) ([]*kv.Request, error) { - reqs := make([]*kv.Request, 0, len(c.TableInfo.Indices)+1) - req, err := c.buildTableRequest(ctx) - if err != nil { + var partDefs []model.PartitionDefinition + if part := c.TableInfo.Partition; part != nil { + partDefs = part.Definitions + } + + reqs := make([]*kv.Request, 0, (len(c.TableInfo.Indices)+1)*(len(partDefs)+1)) + if err := c.appendRequest(ctx, c.TableInfo.ID, &reqs); err != nil { return nil, err } - reqs = append(reqs, req) + + for _, partDef := range partDefs { + if err := c.appendRequest(ctx, partDef.ID, &reqs); err != nil { + return nil, err + } + } + + return reqs, nil +} + +func (c *checksumContext) appendRequest(ctx sessionctx.Context, tableID int64, reqs *[]*kv.Request) error { + req, err := c.buildTableRequest(ctx, tableID) + if err != nil { + return err + } + + *reqs = append(*reqs, req) for _, indexInfo := range c.TableInfo.Indices { if indexInfo.State != model.StatePublic { continue } - req, err = c.buildIndexRequest(ctx, indexInfo) + req, err = c.buildIndexRequest(ctx, tableID, indexInfo) if err != nil { - return nil, err + return err } - reqs = append(reqs, req) + *reqs = append(*reqs, req) } - return reqs, nil + return nil } -func (c *checksumContext) buildTableRequest(ctx sessionctx.Context) (*kv.Request, error) { +func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int64) (*kv.Request, error) { checksum := &tipb.ChecksumRequest{ StartTs: c.StartTs, ScanOn: tipb.ChecksumScanOn_Table, @@ -217,13 +237,13 @@ func (c *checksumContext) buildTableRequest(ctx sessionctx.Context) (*kv.Request ranges := ranger.FullIntRange(false) var builder distsql.RequestBuilder - return builder.SetTableRanges(c.TableInfo.ID, ranges, nil). + return builder.SetTableRanges(tableID, ranges, nil). SetChecksumRequest(checksum). SetConcurrency(ctx.GetSessionVars().DistSQLScanConcurrency). Build() } -func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, indexInfo *model.IndexInfo) (*kv.Request, error) { +func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, tableID int64, indexInfo *model.IndexInfo) (*kv.Request, error) { checksum := &tipb.ChecksumRequest{ StartTs: c.StartTs, ScanOn: tipb.ChecksumScanOn_Index, @@ -233,7 +253,7 @@ func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, indexInfo *m ranges := ranger.FullRange() var builder distsql.RequestBuilder - return builder.SetIndexRanges(ctx.GetSessionVars().StmtCtx, c.TableInfo.ID, indexInfo.ID, ranges). + return builder.SetIndexRanges(ctx.GetSessionVars().StmtCtx, tableID, indexInfo.ID, ranges). SetChecksumRequest(checksum). SetConcurrency(ctx.GetSessionVars().DistSQLScanConcurrency). Build() diff --git a/executor/executor_test.go b/executor/executor_test.go index d63ef2fec49e9..7821e9b26802e 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -411,6 +411,17 @@ func (s *testSuite) TestAdmin(c *C) { c.Assert(historyJobs, DeepEquals, historyJobs2) } +func (s *testSuite) TestAdminChecksumOfPartitionedTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("USE test;") + tk.MustExec("DROP TABLE IF EXISTS admin_checksum_partition_test;") + tk.MustExec("CREATE TABLE admin_checksum_partition_test (a INT) PARTITION BY HASH(a) PARTITIONS 4;") + tk.MustExec("INSERT INTO admin_checksum_partition_test VALUES (1), (2);") + + r := tk.MustQuery("ADMIN CHECKSUM TABLE admin_checksum_partition_test;") + r.Check(testkit.Rows("test admin_checksum_partition_test 1 5 5")) +} + func (s *testSuite) fillData(tk *testkit.TestKit, table string) { tk.MustExec("use test") tk.MustExec(fmt.Sprintf("create table %s(id int not null default 1, name varchar(255), PRIMARY KEY(id));", table))