Skip to content

Commit

Permalink
*: Move StartTs from DAGRequest to Coprocessor.Request (#13823)
Browse files Browse the repository at this point in the history
  • Loading branch information
breezewish authored and sre-bot committed Dec 3, 2019
1 parent 236f5ef commit a7d5b98
Show file tree
Hide file tree
Showing 14 changed files with 87 additions and 39 deletions.
6 changes: 3 additions & 3 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,8 @@ func constructLimitPB(count uint64) *tipb.Executor {
return &tipb.Executor{Tp: tipb.ExecType_TypeLimit, Limit: limitExec}
}

func buildDescTableScanDAG(ctx sessionctx.Context, startTS uint64, tbl table.PhysicalTable, columns []*model.ColumnInfo, limit uint64) (*tipb.DAGRequest, error) {
func buildDescTableScanDAG(ctx sessionctx.Context, tbl table.PhysicalTable, columns []*model.ColumnInfo, limit uint64) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = startTS
_, timeZoneOffset := time.Now().In(time.UTC).Zone()
dagReq.TimeZoneOffset = int64(timeZoneOffset)
for i := range columns {
Expand All @@ -249,14 +248,15 @@ func getColumnsTypes(columns []*model.ColumnInfo) []*types.FieldType {
// buildDescTableScan builds a desc table scan upon tblInfo.
func (d *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl table.PhysicalTable, columns []*model.ColumnInfo, limit uint64) (distsql.SelectResult, error) {
sctx := newContext(d.store)
dagPB, err := buildDescTableScanDAG(sctx, startTS, tbl, columns, limit)
dagPB, err := buildDescTableScanDAG(sctx, tbl, columns, limit)
if err != nil {
return nil, errors.Trace(err)
}
ranges := ranger.FullIntRange(false)
var builder distsql.RequestBuilder
builder.SetTableRanges(tbl.GetPhysicalID(), ranges, nil).
SetDAGRequest(dagPB).
SetStartTS(startTS).
SetKeepOrder(true).
SetConcurrency(1).SetDesc(true)

Expand Down
9 changes: 6 additions & 3 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func (builder *RequestBuilder) SetTableHandles(tid int64, handles []int64) *Requ
func (builder *RequestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *RequestBuilder {
if builder.err == nil {
builder.Request.Tp = kv.ReqTypeDAG
builder.Request.StartTs = dag.StartTs
builder.Request.Data, builder.err = dag.Marshal()
}

Expand All @@ -87,7 +86,6 @@ func (builder *RequestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *RequestBuild
func (builder *RequestBuilder) SetAnalyzeRequest(ana *tipb.AnalyzeReq) *RequestBuilder {
if builder.err == nil {
builder.Request.Tp = kv.ReqTypeAnalyze
builder.Request.StartTs = ana.StartTs
builder.Request.Data, builder.err = ana.Marshal()
builder.Request.NotFillCache = true
builder.Request.IsolationLevel = kv.RC
Expand All @@ -101,7 +99,6 @@ func (builder *RequestBuilder) SetAnalyzeRequest(ana *tipb.AnalyzeReq) *RequestB
func (builder *RequestBuilder) SetChecksumRequest(checksum *tipb.ChecksumRequest) *RequestBuilder {
if builder.err == nil {
builder.Request.Tp = kv.ReqTypeChecksum
builder.Request.StartTs = checksum.StartTs
builder.Request.Data, builder.err = checksum.Marshal()
builder.Request.NotFillCache = true
}
Expand All @@ -115,6 +112,12 @@ func (builder *RequestBuilder) SetKeyRanges(keyRanges []kv.KeyRange) *RequestBui
return builder
}

// SetStartTS sets "StartTS" for "kv.Request".
func (builder *RequestBuilder) SetStartTS(startTS uint64) *RequestBuilder {
builder.Request.StartTs = startTS
return builder
}

// SetDesc sets "Desc" for "kv.Request".
func (builder *RequestBuilder) SetDesc(desc bool) *RequestBuilder {
builder.Request.Desc = desc
Expand Down
12 changes: 6 additions & 6 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (s *testSuite) TestRequestBuilder1(c *C) {
expect := &kv.Request{
Tp: 103,
StartTs: 0x0,
Data: []uint8{0x8, 0x0, 0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0},
Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0},
KeyRanges: []kv.KeyRange{
{
StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xc, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Expand Down Expand Up @@ -362,7 +362,7 @@ func (s *testSuite) TestRequestBuilder2(c *C) {
expect := &kv.Request{
Tp: 103,
StartTs: 0x0,
Data: []uint8{0x8, 0x0, 0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0},
Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0},
KeyRanges: []kv.KeyRange{
{
StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xc, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x3, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Expand Down Expand Up @@ -411,7 +411,7 @@ func (s *testSuite) TestRequestBuilder3(c *C) {
expect := &kv.Request{
Tp: 103,
StartTs: 0x0,
Data: []uint8{0x8, 0x0, 0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0},
Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0},
KeyRanges: []kv.KeyRange{
{
StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Expand Down Expand Up @@ -474,7 +474,7 @@ func (s *testSuite) TestRequestBuilder4(c *C) {
expect := &kv.Request{
Tp: 103,
StartTs: 0x0,
Data: []uint8{0x8, 0x0, 0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0},
Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0},
KeyRanges: keyRanges,
KeepOrder: false,
Desc: false,
Expand Down Expand Up @@ -518,7 +518,7 @@ func (s *testSuite) TestRequestBuilder5(c *C) {
expect := &kv.Request{
Tp: 104,
StartTs: 0x0,
Data: []uint8{0x8, 0x0, 0x10, 0x0, 0x18, 0x0, 0x20, 0x0},
Data: []uint8{0x8, 0x0, 0x18, 0x0, 0x20, 0x0},
KeyRanges: keyRanges,
KeepOrder: true,
Desc: false,
Expand Down Expand Up @@ -551,7 +551,7 @@ func (s *testSuite) TestRequestBuilder6(c *C) {
expect := &kv.Request{
Tp: 105,
StartTs: 0x0,
Data: []uint8{0x8, 0x0, 0x10, 0x0, 0x18, 0x0},
Data: []uint8{0x10, 0x0, 0x18, 0x0},
KeyRanges: keyRanges,
KeepOrder: false,
Desc: false,
Expand Down
16 changes: 8 additions & 8 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,14 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error {
return err
}
sc := e.ctx.GetSessionVars().StmtCtx
txn, err := e.ctx.Txn(true)
if err != nil {
return nil
}
var builder distsql.RequestBuilder
kvReq, err := builder.SetIndexRanges(sc, e.table.ID, e.index.ID, ranger.FullRange()).
SetDAGRequest(dagPB).
SetStartTS(txn.StartTS()).
SetKeepOrder(true).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
Expand All @@ -128,11 +133,6 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error {

func (e *CheckIndexRangeExec) buildDAGPB() (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
txn, err := e.ctx.Txn(true)
if err != nil {
return nil, err
}
dagReq.StartTs = txn.StartTS()
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(e.ctx.GetSessionVars().Location())
sc := e.ctx.GetSessionVars().StmtCtx
dagReq.Flags = sc.PushDownFlags()
Expand All @@ -142,7 +142,7 @@ func (e *CheckIndexRangeExec) buildDAGPB() (*tipb.DAGRequest, error) {
execPB := e.constructIndexScanPB()
dagReq.Executors = append(dagReq.Executors, execPB)

err = plannercore.SetPBColumnsDefaultValue(e.ctx, dagReq.Executors[0].IdxScan.Columns, e.cols)
err := plannercore.SetPBColumnsDefaultValue(e.ctx, dagReq.Executors[0].IdxScan.Columns, e.cols)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -231,7 +231,6 @@ func (e *RecoverIndexExec) constructLimitPB(count uint64) *tipb.Executor {

func (e *RecoverIndexExec) buildDAGPB(txn kv.Transaction, limitCnt uint64) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = txn.StartTS()
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(e.ctx.GetSessionVars().Location())
sc := e.ctx.GetSessionVars().StmtCtx
dagReq.Flags = sc.PushDownFlags()
Expand Down Expand Up @@ -264,6 +263,7 @@ func (e *RecoverIndexExec) buildTableScan(ctx context.Context, txn kv.Transactio
var builder distsql.RequestBuilder
kvReq, err := builder.SetTableRanges(tblInfo.ID, ranges, nil).
SetDAGRequest(dagPB).
SetStartTS(txn.StartTS()).
SetKeepOrder(true).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
Expand Down Expand Up @@ -631,6 +631,7 @@ func (e *CleanupIndexExec) buildIndexScan(ctx context.Context, txn kv.Transactio
ranges := ranger.FullRange()
kvReq, err := builder.SetIndexRanges(sc, e.table.Meta().ID, e.index.Meta().ID, ranges).
SetDAGRequest(dagPB).
SetStartTS(txn.StartTS()).
SetKeepOrder(true).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
Expand Down Expand Up @@ -668,7 +669,6 @@ func (e *CleanupIndexExec) Open(ctx context.Context) error {

func (e *CleanupIndexExec) buildIdxDAGPB(txn kv.Transaction) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = txn.StartTS()
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(e.ctx.GetSessionVars().Location())
sc := e.ctx.GetSessionVars().StmtCtx
dagReq.Flags = sc.PushDownFlags()
Expand Down
2 changes: 2 additions & 0 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang
var builder distsql.RequestBuilder
kvReq, err := builder.SetIndexRanges(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.idxInfo.ID, ranges).
SetAnalyzeRequest(e.analyzePB).
SetStartTS(math.MaxUint64).
SetKeepOrder(true).
SetConcurrency(e.concurrency).
Build()
Expand Down Expand Up @@ -429,6 +430,7 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectRe
// correct `correlation` of columns.
kvReq, err := builder.SetTableRanges(e.physicalTableID, ranges, nil).
SetAnalyzeRequest(e.analyzePB).
SetStartTS(math.MaxUint64).
SetKeepOrder(true).
SetConcurrency(e.concurrency).
Build()
Expand Down
26 changes: 20 additions & 6 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1452,7 +1452,6 @@ func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeInde
concurrency: b.ctx.GetSessionVars().IndexSerialScanConcurrency,
analyzePB: &tipb.AnalyzeReq{
Tp: tipb.AnalyzeType_TypeIndex,
StartTs: math.MaxUint64,
Flags: sc.PushDownFlags(),
TimeZoneOffset: offset,
},
Expand Down Expand Up @@ -1520,7 +1519,6 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeCo
concurrency: b.ctx.GetSessionVars().DistSQLScanConcurrency,
analyzePB: &tipb.AnalyzeReq{
Tp: tipb.AnalyzeType_TypeColumn,
StartTs: math.MaxUint64,
Flags: sc.PushDownFlags(),
TimeZoneOffset: offset,
},
Expand Down Expand Up @@ -1698,10 +1696,6 @@ func constructDistExec(sctx sessionctx.Context, plans []plannercore.PhysicalPlan

func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) {
dagReq = &tipb.DAGRequest{}
dagReq.StartTs, err = b.getStartTS()
if err != nil {
return nil, false, err
}
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(b.ctx.GetSessionVars().Location())
sc := b.ctx.GetSessionVars().StmtCtx
dagReq.Flags = sc.PushDownFlags()
Expand Down Expand Up @@ -1935,9 +1929,14 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
pt := tbl.(table.PartitionedTable)
tbl = pt.GetPartition(physicalTableID)
}
startTS, err := b.getStartTS()
if err != nil {
return nil, err
}
e := &TableReaderExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
dagPB: dagReq,
startTS: startTS,
table: tbl,
keepOrder: ts.KeepOrder,
desc: ts.Desc,
Expand Down Expand Up @@ -1997,9 +1996,14 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea
} else {
physicalTableID = is.Table.ID
}
startTS, err := b.getStartTS()
if err != nil {
return nil, err
}
e := &IndexReaderExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
dagPB: dagReq,
startTS: startTS,
physicalTableID: physicalTableID,
table: tbl,
index: is.Index,
Expand Down Expand Up @@ -2068,9 +2072,14 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
pt := tbl.(table.PartitionedTable)
tbl = pt.GetPartition(physicalTableID)
}
startTS, err := b.getStartTS()
if err != nil {
return nil, err
}
e := &IndexLookUpExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
dagPB: indexReq,
startTS: startTS,
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
Expand Down Expand Up @@ -2199,11 +2208,16 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex
colExec := true
e.dagPB.CollectExecutionSummaries = &colExec
}
startTS, err := builder.getStartTS()
if err != nil {
return nil, err
}

sort.Sort(sortutil.Int64Slice(handles))
var b distsql.RequestBuilder
kvReq, err := b.SetTableHandles(getPhysicalTableID(e.table), handles).
SetDAGRequest(e.dagPB).
SetStartTS(startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
Expand Down
4 changes: 2 additions & 2 deletions executor/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ func (c *checksumContext) appendRequest(ctx sessionctx.Context, tableID int64, r

func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int64) (*kv.Request, error) {
checksum := &tipb.ChecksumRequest{
StartTs: c.StartTs,
ScanOn: tipb.ChecksumScanOn_Table,
Algorithm: tipb.ChecksumAlgorithm_Crc64_Xor,
}
Expand All @@ -239,13 +238,13 @@ func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int6
var builder distsql.RequestBuilder
return builder.SetTableRanges(tableID, ranges, nil).
SetChecksumRequest(checksum).
SetStartTS(c.StartTs).
SetConcurrency(ctx.GetSessionVars().DistSQLScanConcurrency).
Build()
}

func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, tableID int64, indexInfo *model.IndexInfo) (*kv.Request, error) {
checksum := &tipb.ChecksumRequest{
StartTs: c.StartTs,
ScanOn: tipb.ChecksumScanOn_Index,
Algorithm: tipb.ChecksumAlgorithm_Crc64_Xor,
}
Expand All @@ -255,6 +254,7 @@ func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, tableID int6
var builder distsql.RequestBuilder
return builder.SetIndexRanges(ctx.GetSessionVars().StmtCtx, tableID, indexInfo.ID, ranges).
SetChecksumRequest(checksum).
SetStartTS(c.StartTs).
SetConcurrency(ctx.GetSessionVars().DistSQLScanConcurrency).
Build()
}
Expand Down
5 changes: 5 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ type IndexReaderExecutor struct {
// kvRanges are only used for union scan.
kvRanges []kv.KeyRange
dagPB *tipb.DAGRequest
startTS uint64

// result returns one or more distsql.PartialResult and each PartialResult is returned by one region.
result distsql.SelectResult
Expand Down Expand Up @@ -292,6 +293,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
var builder distsql.RequestBuilder
kvReq, err := builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
Expand Down Expand Up @@ -321,6 +323,7 @@ type IndexLookUpExecutor struct {
desc bool
ranges []*ranger.Range
dagPB *tipb.DAGRequest
startTS uint64
// handleIdx is the index of handle, which is only used for case of keeping order.
handleIdx int
tableRequest *tipb.DAGRequest
Expand Down Expand Up @@ -438,6 +441,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
var builder distsql.RequestBuilder
kvReq, err := builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetStreaming(e.indexStreaming).
Expand Down Expand Up @@ -528,6 +532,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []in
baseExecutor: newBaseExecutor(e.ctx, e.schema, stringutil.MemoizeStr(func() string { return e.id.String() + "_tableReader" })),
table: e.table,
dagPB: e.tableRequest,
startTS: e.startTS,
columns: e.columns,
streaming: e.tableStreaming,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
Expand Down
2 changes: 2 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type TableReaderExecutor struct {
// kvRanges are only use for union scan.
kvRanges []kv.KeyRange
dagPB *tipb.DAGRequest
startTS uint64
// columns are only required by union scan and virtual column.
columns []*model.ColumnInfo

Expand Down Expand Up @@ -207,6 +208,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
var builder distsql.RequestBuilder
kvReq, err := builder.SetTableRanges(getPhysicalTableID(e.table), ranges, e.feedback).
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20191121022655-4c654046831d
github.com/pingcap/kvproto v0.0.0-20191202044712-32be31591b03
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191127110312-37cd7d635816
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
Expand Down Expand Up @@ -78,3 +78,5 @@ require (
go 1.13

replace github.com/pingcap/check => github.com/tiancaiamao/check v0.0.0-20191119042138-8e73d07b629d

replace github.com/pingcap/tipb => github.com/pingcap/tipb v0.0.0-20191129094129-04ea15eb054e
Loading

0 comments on commit a7d5b98

Please sign in to comment.