Skip to content

Commit

Permalink
*: add resource group name for cancel/establish req for tiflash (#47261)
Browse files Browse the repository at this point in the history
* *: add resource group name for cancel/establish req for tiflash
---------

Signed-off-by: guo-shaoge <[email protected]>
  • Loading branch information
guo-shaoge authored Sep 26, 2023
1 parent 7c1d60b commit ee719a4
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 20 deletions.
14 changes: 8 additions & 6 deletions executor/internal/mpp/local_mpp_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,18 +208,19 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *plannercore.Fragment) err
return errors.Trace(err)
}

rgName := c.sessionCtx.GetSessionVars().ResourceGroupName
if !variable.EnableResourceControl.Load() {
rgName = ""
}
logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs),
zap.Int64("ID", mppTask.ID), zap.Uint64("QueryTs", mppTask.MppQueryID.QueryTs), zap.Uint64("LocalQueryId", mppTask.MppQueryID.LocalQueryID),
zap.Uint64("ServerID", mppTask.MppQueryID.ServerID), zap.String("address", mppTask.Meta.GetAddress()),
zap.String("plan", plannercore.ToString(pf.ExchangeSender)),
zap.Int64("mpp-version", mppTask.MppVersion.ToInt64()),
zap.String("exchange-compression-mode", pf.ExchangeSender.CompressionMode.Name()),
zap.Uint64("GatherID", c.gatherID),
zap.String("resource_group", rgName),
)
rgName := c.sessionCtx.GetSessionVars().ResourceGroupName
if !variable.EnableResourceControl.Load() {
rgName = ""
}
req := &kv.MPPDispatchRequest{
Data: pbData,
Meta: mppTask.Meta,
Expand Down Expand Up @@ -455,8 +456,9 @@ func (c *localMppCoordinator) handleDispatchReq(ctx context.Context, bo *backoff
}
// only root task should establish a stream conn with tiFlash to receive result.
taskMeta := &mpp.TaskMeta{StartTs: req.StartTs, GatherId: c.gatherID, QueryTs: req.MppQueryID.QueryTs, LocalQueryId: req.MppQueryID.LocalQueryID, TaskId: req.ID, ServerId: req.MppQueryID.ServerID,
Address: req.Meta.GetAddress(),
MppVersion: req.MppVersion.ToInt64(),
Address: req.Meta.GetAddress(),
MppVersion: req.MppVersion.ToInt64(),
ResourceGroupName: req.ResourceGroupName,
}
c.receiveResults(req, taskMeta, bo)
}
Expand Down
24 changes: 10 additions & 14 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -102,18 +101,14 @@ func (c *MPPClient) DispatchMPPTask(param kv.DispatchMPPTaskParam) (resp *mpp.Di
}
}

rgName := req.ResourceGroupName
if !variable.EnableResourceControl.Load() {
rgName = ""
}
// meta for current task.
taskMeta := &mpp.TaskMeta{StartTs: req.StartTs, QueryTs: req.MppQueryID.QueryTs, LocalQueryId: req.MppQueryID.LocalQueryID, TaskId: req.ID, ServerId: req.MppQueryID.ServerID,
GatherId: req.GatherID,
Address: req.Meta.GetAddress(),
CoordinatorAddress: req.CoordinatorAddress,
ReportExecutionSummary: req.ReportExecutionSummary,
MppVersion: req.MppVersion.ToInt64(),
ResourceGroupName: rgName,
ResourceGroupName: req.ResourceGroupName,
}

mppReq := &mpp.DispatchTaskRequest{
Expand Down Expand Up @@ -203,7 +198,7 @@ func (c *MPPClient) CancelMPPTasks(param kv.CancelMPPTasksParam) {

firstReq := reqs[0]
killReq := &mpp.CancelTaskRequest{
Meta: &mpp.TaskMeta{StartTs: firstReq.StartTs, GatherId: firstReq.GatherID, QueryTs: firstReq.MppQueryID.QueryTs, LocalQueryId: firstReq.MppQueryID.LocalQueryID, ServerId: firstReq.MppQueryID.ServerID, MppVersion: firstReq.MppVersion.ToInt64()},
Meta: &mpp.TaskMeta{StartTs: firstReq.StartTs, GatherId: firstReq.GatherID, QueryTs: firstReq.MppQueryID.QueryTs, LocalQueryId: firstReq.MppQueryID.LocalQueryID, ServerId: firstReq.MppQueryID.ServerID, MppVersion: firstReq.MppVersion.ToInt64(), ResourceGroupName: firstReq.ResourceGroupName},
}

wrappedReq := tikvrpc.NewRequest(tikvrpc.CmdMPPCancel, killReq, kvrpcpb.Context{})
Expand Down Expand Up @@ -239,13 +234,14 @@ func (c *MPPClient) EstablishMPPConns(param kv.EstablishMPPConnsParam) (*tikvrpc
connReq := &mpp.EstablishMPPConnectionRequest{
SenderMeta: taskMeta,
ReceiverMeta: &mpp.TaskMeta{
StartTs: req.StartTs,
GatherId: req.GatherID,
QueryTs: req.MppQueryID.QueryTs,
LocalQueryId: req.MppQueryID.LocalQueryID,
ServerId: req.MppQueryID.ServerID,
MppVersion: req.MppVersion.ToInt64(),
TaskId: -1,
StartTs: req.StartTs,
GatherId: req.GatherID,
QueryTs: req.MppQueryID.QueryTs,
LocalQueryId: req.MppQueryID.LocalQueryID,
ServerId: req.MppQueryID.ServerID,
MppVersion: req.MppVersion.ToInt64(),
TaskId: -1,
ResourceGroupName: req.ResourceGroupName,
},
}

Expand Down

0 comments on commit ee719a4

Please sign in to comment.