Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: fix build batchCop in disaggregated tiflash mode #40008

Merged
merged 22 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
727a0a7
store/copr: fix build batchCop in disaggregated tiflash mode
guo-shaoge Dec 17, 2022
281ddbe
update go.mod
guo-shaoge Dec 19, 2022
6405b32
Merge branch 'master' into disaggregated_batch_copr
guo-shaoge Dec 19, 2022
269a002
update bazel
guo-shaoge Dec 19, 2022
1441516
update err msg
guo-shaoge Dec 19, 2022
6e0b747
Merge branch 'disaggregated_batch_copr' of github.com:guo-shaoge/tidb…
guo-shaoge Dec 19, 2022
697b5fe
Merge branch 'master' of github.com:pingcap/tidb into disaggregated_b…
guo-shaoge Dec 26, 2022
e19720c
fix
guo-shaoge Dec 26, 2022
0176ad5
fix
guo-shaoge Dec 26, 2022
d9fb7e3
add failpoint
guo-shaoge Dec 29, 2022
55d3ddf
Merge branch 'master' of github.com:pingcap/tidb into disaggregated_b…
guo-shaoge Dec 29, 2022
058e2b2
update bazel
guo-shaoge Dec 29, 2022
f9bee04
Merge branch 'master' of github.com:pingcap/tidb into disaggregated_b…
guo-shaoge Jan 2, 2023
298cfbb
Merge branch 'master' into disaggregated_batch_copr
guo-shaoge Jan 3, 2023
110af29
trivial fix
guo-shaoge Jan 3, 2023
5fefec7
Merge branch 'disaggregated_batch_copr' of github.com:guo-shaoge/tidb…
guo-shaoge Jan 3, 2023
bd76d19
Merge branch 'master' into disaggregated_batch_copr
guo-shaoge Jan 3, 2023
0b25423
Merge branch 'master' into disaggregated_batch_copr
breezewish Jan 4, 2023
72f26a8
Merge branch 'master' into disaggregated_batch_copr
ti-chi-bot Jan 4, 2023
3e7b41c
Merge branch 'master' into disaggregated_batch_copr
ti-chi-bot Jan 4, 2023
0a4261e
Merge branch 'master' into disaggregated_batch_copr
ti-chi-bot Jan 4, 2023
3ffc655
Merge branch 'master' into disaggregated_batch_copr
ti-chi-bot Jan 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ go_library(
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_pingcap_log//:log",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_stathat_consistent//:consistent",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//metrics",
Expand Down
216 changes: 147 additions & 69 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"math"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -35,7 +36,6 @@ import (
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/util/logutil"
"github.com/stathat/consistent"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
Expand Down Expand Up @@ -323,40 +323,15 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
storeTaskMap[taskStoreID] = batchTask
}
} else {
logutil.BgLogger().Info("detecting available mpp stores")
// decide the available stores
stores := cache.RegionCache.GetTiFlashStores()
var wg sync.WaitGroup
var mu sync.Mutex
wg.Add(len(stores))
for i := range stores {
go func(idx int) {
defer wg.Done()
s := stores[idx]

// check if store is failed already.
ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl)
if !ok {
return
}

tikvClient := kvStore.GetTiKVClient()
ok = detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit)
if !ok {
GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient)
return
}

mu.Lock()
defer mu.Unlock()
storeTaskMap[s.StoreID()] = &batchCopTask{
storeAddr: s.GetAddr(),
cmdType: originalTasks[0].cmdType,
ctx: &tikv.RPCContext{Addr: s.GetAddr(), Store: s},
}
}(i)
aliveStores := filterAliveStores(ctx, stores, ttl, kvStore)
for _, s := range aliveStores {
storeTaskMap[s.StoreID()] = &batchCopTask{
storeAddr: s.GetAddr(),
cmdType: originalTasks[0].cmdType,
ctx: &tikv.RPCContext{Addr: s.GetAddr(), Store: s},
}
}
wg.Wait()
}

var candidateRegionInfos []RegionInfo
Expand Down Expand Up @@ -513,7 +488,7 @@ func buildBatchCopTasksForNonPartitionedTable(bo *backoff.Backoffer,
balanceWithContinuity bool,
balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
if config.GetGlobalConfig().DisaggregatedTiFlash {
return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, ttl)
}
return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
}
Expand All @@ -528,7 +503,7 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer,
balanceContinuousRegionCount int64,
partitionIDs []int64) (batchTasks []*batchCopTask, err error) {
if config.GetGlobalConfig().DisaggregatedTiFlash {
batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, ttl)
} else {
batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
}
Expand All @@ -540,49 +515,152 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer,
return batchTasks, nil
}

func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
if err != nil {
return nil, err
}
cache := store.GetRegionCache()
stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer())
if err != nil {
return nil, err
}
if len(stores) == 0 {
return nil, errors.New("No available tiflash_compute node")
}
func filterAliveStores(ctx context.Context, stores []*tikv.Store, ttl time.Duration, kvStore *kvStore) []*tikv.Store {
var aliveStores []*tikv.Store
var wg sync.WaitGroup
var mu sync.Mutex
wg.Add(len(stores))
for i := range stores {
go func(idx int) {
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
defer wg.Done()
s := stores[idx]

// Check if store is failed already.
if ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl); !ok {
return
}

tikvClient := kvStore.GetTiKVClient()
if ok := detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit); !ok {
GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient)
return
}

hasher := consistent.New()
for _, store := range stores {
hasher.Add(store.GetAddr())
mu.Lock()
defer mu.Unlock()
aliveStores = append(aliveStores, s)
}(i)
}
for _, task := range batchTasks {
addr, err := hasher.Get(task.storeAddr)
wg.Wait()

logutil.BgLogger().Info("detecting available mpp stores", zap.Any("total", len(stores)), zap.Any("alive", len(aliveStores)))
return aliveStores
}

// 1. Split range by region location to build copTasks.
// 2. For each copTask build its rpcCtx , the target tiflash_compute node will be chosen using consistent hash.
// 3. All copTasks that will be sent to one tiflash_compute node are put in one batchCopTask.
func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer,
kvStore *kvStore,
rangesForEachPhysicalTable []*KeyRanges,
storeType kv.StoreType,
ttl time.Duration) (res []*batchCopTask, err error) {
const cmdType = tikvrpc.CmdBatchCop
var retryNum int
cache := kvStore.GetRegionCache()

for {
retryNum++
var rangesLen int
tasks := make([]*copTask, 0)
regionIDs := make([]tikv.RegionVerID, 0)

for i, ranges := range rangesForEachPhysicalTable {
rangesLen += ranges.Len()
locations, err := cache.SplitKeyRangesByLocations(bo, ranges)
if err != nil {
return nil, errors.Trace(err)
}
for _, lo := range locations {
tasks = append(tasks, &copTask{
region: lo.Location.Region,
ranges: lo.Ranges,
cmdType: cmdType,
storeType: storeType,
partitionIndex: int64(i),
})
regionIDs = append(regionIDs, lo.Location.Region)
}
}

stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer())
if err != nil {
return nil, err
}
var store *tikv.Store
for _, s := range stores {
if s.GetAddr() == addr {
store = s
break
stores = filterAliveStores(bo.GetCtx(), stores, ttl, kvStore)
if len(stores) == 0 {
return nil, errors.New("tiflash_compute node is unavailable")
}

rpcCtxs, err := cache.GetTiFlashComputeRPCContextByConsistentHash(bo.TiKVBackoffer(), regionIDs, stores)
if err != nil {
return nil, err
}
if rpcCtxs == nil {
logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because rcpCtx is nil", zap.Int("retryNum", retryNum))
err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer"))
if err != nil {
return nil, errors.Trace(err)
}
continue
}
if store == nil {
return nil, errors.New("cannot find tiflash_compute store: " + addr)
if len(rpcCtxs) != len(tasks) {
return nil, errors.Errorf("length should be equal, len(rpcCtxs): %d, len(tasks): %d", len(rpcCtxs), len(tasks))
}

task.storeAddr = addr
task.ctx.Store = store
task.ctx.Addr = addr
}
logutil.BgLogger().Info("build batchCop tasks for disaggregated tiflash using ConsistentHash done.", zap.Int("len(tasks)", len(batchTasks)))
for _, task := range batchTasks {
logutil.BgLogger().Debug("batchTasks detailed info", zap.String("addr", task.storeAddr), zap.Int("RegionInfo number", len(task.regionInfos)))
taskMap := make(map[string]*batchCopTask)
for i, rpcCtx := range rpcCtxs {
regionInfo := RegionInfo{
// tasks and rpcCtxs are correspond to each other.
Region: tasks[i].region,
Meta: rpcCtx.Meta,
Ranges: tasks[i].ranges,
AllStores: []uint64{rpcCtx.Store.StoreID()},
PartitionIndex: tasks[i].partitionIndex,
}
if batchTask, ok := taskMap[rpcCtx.Addr]; ok {
batchTask.regionInfos = append(batchTask.regionInfos, regionInfo)
} else {
batchTask := &batchCopTask{
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
ctx: rpcCtx,
regionInfos: []RegionInfo{regionInfo},
}
taskMap[rpcCtx.Addr] = batchTask
res = append(res, batchTask)
}
}
logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores)))
break
}
return batchTasks, nil

failpointCheckForConsistentHash(res)
return res, nil
}

func failpointCheckForConsistentHash(tasks []*batchCopTask) {
failpoint.Inject("checkOnlyDispatchToTiFlashComputeNodes", func(val failpoint.Value) {
logutil.BgLogger().Debug("in checkOnlyDispatchToTiFlashComputeNodes")

// This failpoint will be tested in test-infra case, because we needs setup a cluster.
// All tiflash_compute nodes addrs are stored in val, separated by semicolon.
str := val.(string)
addrs := strings.Split(str, ";")
if len(addrs) < 1 {
err := fmt.Sprintf("unexpected length of tiflash_compute node addrs: %v, %s", len(addrs), str)
panic(err)
}
addrMap := make(map[string]struct{})
for _, addr := range addrs {
addrMap[addr] = struct{}{}
}
for _, batchTask := range tasks {
if _, ok := addrMap[batchTask.storeAddr]; !ok {
err := errors.Errorf("batchCopTask send to node which is not tiflash_compute: %v(tiflash_compute nodes: %s)", batchTask.storeAddr, str)
panic(err)
}
}
})
}

// When `partitionIDs != nil`, it means that buildBatchCopTasksCore is constructing a batch cop tasks for PartitionTableScan.
Expand Down
30 changes: 12 additions & 18 deletions store/copr/batch_request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/config"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
Expand All @@ -36,7 +35,6 @@ type RegionInfo struct {
Ranges *KeyRanges
AllStores []uint64
PartitionIndex int64 // used by PartitionTableScan, indicates the n-th partition of the partition table
Addr string
}

func (ri *RegionInfo) toCoprocessorRegionInfo() *coprocessor.RegionInfo {
Expand Down Expand Up @@ -100,22 +98,18 @@ func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx
return tikverr.ErrTiDBShuttingDown
}

if config.GetGlobalConfig().DisaggregatedTiFlash {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this if branch because there will be no batchCop protocol in disaggregated tiflash mode. Also mpp error handling will be handled in mpp.go

ss.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err)
} else {
// The reload region param is always true. Because that every time we try, we must
// re-build the range then re-create the batch sender. As a result, the len of "failStores"
// will change. If tiflash's replica is more than two, the "reload region" will always be false.
// Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time
// when meeting io error.
rc := RegionCache{ss.GetRegionCache()}
rc.OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err)
// The reload region param is always true. Because that every time we try, we must
// re-build the range then re-create the batch sender. As a result, the len of "failStores"
// will change. If tiflash's replica is more than two, the "reload region" will always be false.
// Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time
// when meeting io error.
rc := RegionCache{ss.GetRegionCache()}
rc.OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err)

// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
// TODO: the number of retry time should be limited:since region may be unavailable
// when some unrecoverable disaster happened.
err = bo.Backoff(tikv.BoTiFlashRPC(), errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos))
}
// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
// TODO: the number of retry time should be limited:since region may be unavailable
// when some unrecoverable disaster happened.
err = bo.Backoff(tikv.BoTiFlashRPC(), errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos))
return errors.Trace(err)
}
22 changes: 12 additions & 10 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,14 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
if originalTask != nil {
sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient(), m.enableCollectExecutionInfo)
rpcResp, retry, _, err = sender.SendReqToAddr(bo, originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium)
if err != nil && disaggregatedTiFlash {
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err)
}
// No matter what the rpc error is, we won't retry the mpp dispatch tasks.
// TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling.
// That's a hard job but we can try it in the future.
if sender.GetRPCError() != nil {
logutil.BgLogger().Warn("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
if disaggregatedTiFlash {
m.store.GetRegionCache().InvalidateTiFlashComputeStores()
}
// if needTriggerFallback is true, we return timeout to trigger tikv's fallback
if m.needTriggerFallback {
err = derr.ErrTiFlashServerTimeout
Expand All @@ -275,7 +275,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
retry = false
} else if err != nil {
if disaggregatedTiFlash {
m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err)
m.store.GetRegionCache().InvalidateTiFlashComputeStores()
}
if bo.Backoff(tikv.BoTiFlashRPC(), err) == nil {
retry = true
Expand Down Expand Up @@ -355,6 +355,7 @@ func (m *mppIterator) cancelMppTasks() {
}

// send cancel cmd to all stores where tasks run
gotErr := atomic.Bool{}
wg := util.WaitGroupWrapper{}
for addr := range usedStoreAddrs {
storeAddr := addr
Expand All @@ -363,13 +364,14 @@ func (m *mppIterator) cancelMppTasks() {
logutil.BgLogger().Debug("cancel task", zap.Uint64("query id ", m.startTs), zap.String("on addr", storeAddr))
if err != nil {
logutil.BgLogger().Error("cancel task error", zap.Error(err), zap.Uint64("query id", m.startTs), zap.String("on addr", storeAddr))
if disaggregatedTiFlash {
m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err)
}
gotErr.CompareAndSwap(false, true)
}
})
}
wg.Wait()
if gotErr.Load() && disaggregatedTiFlash {
m.store.GetRegionCache().InvalidateTiFlashComputeStores()
}
}

func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) {
Expand All @@ -396,15 +398,15 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques

if err != nil {
logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
if disaggregatedTiFlash {
m.store.GetRegionCache().InvalidateTiFlashComputeStores()
}
// if needTriggerFallback is true, we return timeout to trigger tikv's fallback
if m.needTriggerFallback {
m.sendError(derr.ErrTiFlashServerTimeout)
} else {
m.sendError(err)
}
if disaggregatedTiFlash {
m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err)
}
return
}

Expand Down