From 368119b8df10d65a7575a90de8d77def660ac5ca Mon Sep 17 00:00:00 2001 From: kennytm Date: Tue, 16 Jul 2019 07:39:17 +0800 Subject: [PATCH] =?UTF-8?q?executor:=20compute=20ADMIN=20CHECKSUM=20for=20?= =?UTF-8?q?partitioned=20tables=20correc=E2=80=A6=20(#11089)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- executor/checksum.go | 44 ++++++++++++++++++++++++++++----------- executor/executor_test.go | 11 ++++++++++ 2 files changed, 43 insertions(+), 12 deletions(-) diff --git a/executor/checksum.go b/executor/checksum.go index 9ca8692088825..c84579fe85ee8 100644 --- a/executor/checksum.go +++ b/executor/checksum.go @@ -187,27 +187,47 @@ func newChecksumContext(db *model.DBInfo, table *model.TableInfo, startTs uint64 } func (c *checksumContext) BuildRequests(ctx sessionctx.Context) ([]*kv.Request, error) { - reqs := make([]*kv.Request, 0, len(c.TableInfo.Indices)+1) - req, err := c.buildTableRequest(ctx) - if err != nil { + var partDefs []model.PartitionDefinition + if part := c.TableInfo.Partition; part != nil { + partDefs = part.Definitions + } + + reqs := make([]*kv.Request, 0, (len(c.TableInfo.Indices)+1)*(len(partDefs)+1)) + if err := c.appendRequest(ctx, c.TableInfo.ID, &reqs); err != nil { return nil, err } - reqs = append(reqs, req) + + for _, partDef := range partDefs { + if err := c.appendRequest(ctx, partDef.ID, &reqs); err != nil { + return nil, err + } + } + + return reqs, nil +} + +func (c *checksumContext) appendRequest(ctx sessionctx.Context, tableID int64, reqs *[]*kv.Request) error { + req, err := c.buildTableRequest(ctx, tableID) + if err != nil { + return err + } + + *reqs = append(*reqs, req) for _, indexInfo := range c.TableInfo.Indices { if indexInfo.State != model.StatePublic { continue } - req, err = c.buildIndexRequest(ctx, indexInfo) + req, err = c.buildIndexRequest(ctx, tableID, indexInfo) if err != nil { - return nil, err + return err } - reqs = append(reqs, req) + *reqs = append(*reqs, req) } - return reqs, nil + return nil } -func (c *checksumContext) buildTableRequest(ctx sessionctx.Context) (*kv.Request, error) { +func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int64) (*kv.Request, error) { checksum := &tipb.ChecksumRequest{ StartTs: c.StartTs, ScanOn: tipb.ChecksumScanOn_Table, @@ -217,13 +237,13 @@ func (c *checksumContext) buildTableRequest(ctx sessionctx.Context) (*kv.Request ranges := ranger.FullIntRange(false) var builder distsql.RequestBuilder - return builder.SetTableRanges(c.TableInfo.ID, ranges, nil). + return builder.SetTableRanges(tableID, ranges, nil). SetChecksumRequest(checksum). SetConcurrency(ctx.GetSessionVars().DistSQLScanConcurrency). Build() } -func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, indexInfo *model.IndexInfo) (*kv.Request, error) { +func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, tableID int64, indexInfo *model.IndexInfo) (*kv.Request, error) { checksum := &tipb.ChecksumRequest{ StartTs: c.StartTs, ScanOn: tipb.ChecksumScanOn_Index, @@ -233,7 +253,7 @@ func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, indexInfo *m ranges := ranger.FullRange() var builder distsql.RequestBuilder - return builder.SetIndexRanges(ctx.GetSessionVars().StmtCtx, c.TableInfo.ID, indexInfo.ID, ranges). + return builder.SetIndexRanges(ctx.GetSessionVars().StmtCtx, tableID, indexInfo.ID, ranges). SetChecksumRequest(checksum). SetConcurrency(ctx.GetSessionVars().DistSQLScanConcurrency). Build() diff --git a/executor/executor_test.go b/executor/executor_test.go index e860a6e200d52..b68bebd4bfefb 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -412,6 +412,17 @@ func (s *testSuite) TestAdmin(c *C) { c.Assert(historyJobs, DeepEquals, historyJobs2) } +func (s *testSuite) TestAdminChecksumOfPartitionedTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("USE test;") + tk.MustExec("DROP TABLE IF EXISTS admin_checksum_partition_test;") + tk.MustExec("CREATE TABLE admin_checksum_partition_test (a INT) PARTITION BY HASH(a) PARTITIONS 4;") + tk.MustExec("INSERT INTO admin_checksum_partition_test VALUES (1), (2);") + + r := tk.MustQuery("ADMIN CHECKSUM TABLE admin_checksum_partition_test;") + r.Check(testkit.Rows("test admin_checksum_partition_test 1 5 5")) +} + func (s *testSuite) fillData(tk *testkit.TestKit, table string) { tk.MustExec("use test") tk.MustExec(fmt.Sprintf("create table %s(id int not null default 1, name varchar(255), PRIMARY KEY(id));", table))