diff --git a/config/config.go b/config/config.go index 4a6b28ed12a1d..2352a17acf0d1 100644 --- a/config/config.go +++ b/config/config.go @@ -279,6 +279,7 @@ type Config struct { Plugin Plugin `toml:"plugin" json:"plugin"` MaxServerConnections uint32 `toml:"max-server-connections" json:"max-server-connections"` RunDDL bool `toml:"run-ddl" json:"run-ddl"` + DisaggregatedTiFlash bool `toml:"disaggregated-tiflash" json:"disaggregated-tiflash"` // TiDBMaxReuseChunk indicates max cached chunk num TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"` // TiDBMaxReuseColumn indicates max cached column num @@ -988,6 +989,7 @@ var defaultConf = Config{ NewCollationsEnabledOnFirstBootstrap: true, EnableGlobalKill: true, TrxSummary: DefaultTrxSummary(), + DisaggregatedTiFlash: false, TiDBMaxReuseChunk: 64, TiDBMaxReuseColumn: 256, } diff --git a/domain/domain.go b/domain/domain.go index 128260c324287..60779d463b672 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -70,6 +70,7 @@ import ( "github.com/pingcap/tidb/util/memoryusagealarm" "github.com/pingcap/tidb/util/servermemorylimit" "github.com/pingcap/tidb/util/sqlexec" + "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/transaction" pd "github.com/tikv/pd/client" clientv3 "go.etcd.io/etcd/client/v3" @@ -1416,6 +1417,64 @@ func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error { return nil } +// WatchTiFlashComputeNodeChange create a routine to watch if the topology of tiflash_compute node is changed. +// TODO: tiflashComputeNodeKey is not put to etcd yet(finish this when AutoScaler is done) +// +// store cache will only be invalidated every 30 seconds. +func (do *Domain) WatchTiFlashComputeNodeChange() error { + var watchCh clientv3.WatchChan + if do.etcdClient != nil { + watchCh = do.etcdClient.Watch(context.Background(), tiflashComputeNodeKey) + } + do.wg.Add(1) + duration := 10 * time.Second + go func() { + defer func() { + do.wg.Done() + logutil.BgLogger().Info("WatchTiFlashComputeNodeChange exit") + util.Recover(metrics.LabelDomain, "WatchTiFlashComputeNodeChange", nil, false) + }() + + var count int + var logCount int + for { + ok := true + var watched bool + select { + case <-do.exit: + return + case _, ok = <-watchCh: + watched = true + case <-time.After(duration): + } + if !ok { + logutil.BgLogger().Error("WatchTiFlashComputeNodeChange watch channel closed") + watchCh = do.etcdClient.Watch(context.Background(), tiflashComputeNodeKey) + count++ + if count > 10 { + time.Sleep(time.Duration(count) * time.Second) + } + continue + } + count = 0 + switch s := do.store.(type) { + case tikv.Storage: + logCount++ + s.GetRegionCache().InvalidateTiFlashComputeStores() + if logCount == 60 { + // Print log every 60*duration seconds. + logutil.BgLogger().Debug("tiflash_compute store cache invalied, will update next query", zap.Bool("watched", watched)) + logCount = 0 + } + default: + logutil.BgLogger().Debug("No need to watch tiflash_compute store cache for non-tikv store") + return + } + } + }() + return nil +} + // PrivilegeHandle returns the MySQLPrivilege. func (do *Domain) PrivilegeHandle() *privileges.Handle { return do.privHandle @@ -2078,8 +2137,9 @@ func (do *Domain) ServerMemoryLimitHandle() *servermemorylimit.Handle { } const ( - privilegeKey = "/tidb/privilege" - sysVarCacheKey = "/tidb/sysvars" + privilegeKey = "/tidb/privilege" + sysVarCacheKey = "/tidb/sysvars" + tiflashComputeNodeKey = "/tiflash/new_tiflash_compute_nodes" ) // NotifyUpdatePrivilege updates privilege key in etcd, TiDB client that watches diff --git a/executor/tiflashtest/tiflash_test.go b/executor/tiflashtest/tiflash_test.go index ffd949ee28f66..baba133b3d5a7 100644 --- a/executor/tiflashtest/tiflash_test.go +++ b/executor/tiflashtest/tiflash_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/parser/auth" @@ -1278,3 +1279,27 @@ func TestBindingFromHistoryWithTiFlashBindable(t *testing.T) { planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", sql)).Rows() tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]), "can't create binding for query with tiflash engine") } + +func TestDisaggregatedTiFlash(t *testing.T) { + config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = true + }) + store := testkit.CreateMockStore(t, withMockTiFlash(2)) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(c1 int)") + tk.MustExec("alter table t set tiflash replica 1") + tb := external.GetTableByName(t, tk, "test", "t") + err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true) + require.NoError(t, err) + tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") + + err = tk.ExecToErr("select * from t;") + require.Contains(t, err.Error(), "Please check tiflash_compute node is available") + + config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = false + }) + tk.MustQuery("select * from t;").Check(testkit.Rows()) +} diff --git a/go.mod b/go.mod index c0af6f787e6fa..ca9c46da3977b 100644 --- a/go.mod +++ b/go.mod @@ -84,6 +84,7 @@ require ( github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.6.1 github.com/spf13/pflag v1.0.5 + github.com/stathat/consistent v1.0.0 github.com/stretchr/testify v1.8.0 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 @@ -218,7 +219,6 @@ require ( github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect - github.com/stathat/consistent v1.0.0 // indirect github.com/tklauser/go-sysconf v0.3.10 // indirect github.com/tklauser/numcpus v0.4.0 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect diff --git a/planner/core/BUILD.bazel b/planner/core/BUILD.bazel index dcbb25512f458..0495f1f50cae0 100644 --- a/planner/core/BUILD.bazel +++ b/planner/core/BUILD.bazel @@ -106,6 +106,7 @@ go_library( "//sessiontxn/staleread", "//statistics", "//statistics/handle", + "//store/driver/backoff", "//table", "//table/tables", "//table/temptable", diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index b75942f3a2639..afc5223b9be94 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -20,6 +20,7 @@ import ( "strings" "github.com/pingcap/errors" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" @@ -1985,7 +1986,8 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid } } } - if prop.TaskTp == property.MppTaskType { + // In disaggregated tiflash mode, only MPP is allowed, Cop and BatchCop is deprecated. + if prop.TaskTp == property.MppTaskType || config.GetGlobalConfig().DisaggregatedTiFlash && ts.StoreType == kv.TiFlash { if ts.KeepOrder { return invalidTask, nil } diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 5067bcbfde551..6bc8a677bc6de 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" @@ -49,6 +50,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" + "github.com/pingcap/tidb/store/driver/backoff" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/table/temptable" @@ -65,6 +67,7 @@ import ( "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/size" + "github.com/tikv/client-go/v2/tikv" ) const ( @@ -685,6 +688,13 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) { ds.preferStoreType = 0 return } + if config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ds.ctx) { + // TiFlash is in disaggregated mode, need to make sure tiflash_compute node is available. + errMsg := "No available tiflash_compute node" + warning := ErrInternal.GenWithStack(errMsg) + ds.ctx.GetSessionVars().StmtCtx.AppendWarning(warning) + return + } for _, path := range ds.possibleAccessPaths { if path.StoreType == kv.TiFlash { ds.preferStoreType |= preferTiFlash @@ -702,6 +712,15 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) { } } +func isTiFlashComputeNodeAvailable(ctx sessionctx.Context) bool { + bo := backoff.NewBackofferWithVars(context.Background(), 5000, nil) + stores, err := ctx.GetStore().(tikv.Storage).GetRegionCache().GetTiFlashComputeStores(bo.TiKVBackoffer()) + if err != nil || len(stores) == 0 { + return false + } + return true +} + func resetNotNullFlag(schema *expression.Schema, start, end int) { for i := start; i < end; i++ { col := *schema.Columns[i] diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index faef8312178f7..d8677ea7dd072 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1392,7 +1392,10 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, isolationReadEngines := ctx.GetSessionVars().GetIsolationReadEngines() availableEngine := map[kv.StoreType]struct{}{} var availableEngineStr string + var outputComputeNodeErrMsg bool + noTiFlashComputeNode := config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ctx) for i := len(paths) - 1; i >= 0; i-- { + // availableEngineStr is for warning message. if _, ok := availableEngine[paths[i].StoreType]; !ok { availableEngine[paths[i].StoreType] = struct{}{} if availableEngineStr != "" { @@ -1400,7 +1403,15 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, } availableEngineStr += paths[i].StoreType.Name() } - if _, ok := isolationReadEngines[paths[i].StoreType]; !ok && paths[i].StoreType != kv.TiDB { + _, exists := isolationReadEngines[paths[i].StoreType] + // Prune this path if: + // 1. path.StoreType doesn't exists in isolationReadEngines or + // 2. TiFlash is disaggregated and the number of tiflash_compute node is zero. + shouldPruneTiFlashCompute := noTiFlashComputeNode && exists && paths[i].StoreType == kv.TiFlash + if shouldPruneTiFlashCompute { + outputComputeNodeErrMsg = true + } + if (!exists && paths[i].StoreType != kv.TiDB) || shouldPruneTiFlashCompute { paths = append(paths[:i], paths[i+1:]...) } } @@ -1409,7 +1420,11 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, if len(paths) == 0 { helpMsg := "" if engineVals == "tiflash" { - helpMsg = ". Please check tiflash replica or ensure the query is readonly" + if outputComputeNodeErrMsg { + helpMsg = ". Please check tiflash_compute node is available" + } else { + helpMsg = ". Please check tiflash replica or ensure the query is readonly" + } } err = ErrInternal.GenWithStackByArgs(fmt.Sprintf("No access path for table '%s' is found with '%v' = '%v', valid values can be '%s'%s.", tblName.String(), variable.TiDBIsolationReadEngines, engineVals, availableEngineStr, helpMsg)) diff --git a/planner/core/task.go b/planner/core/task.go index 19ad812e9aaa6..cc27029d83c8e 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1977,6 +1977,10 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { } attachPlan2Task(proj, newMpp) return newMpp + case NoMpp: + t = mpp.convertToRootTask(p.ctx) + attachPlan2Task(p, t) + return t default: return invalidTask } diff --git a/session/session.go b/session/session.go index 17acb8574833c..4852883207ade 100644 --- a/session/session.go +++ b/session/session.go @@ -3018,6 +3018,14 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { return nil, err } + if config.GetGlobalConfig().DisaggregatedTiFlash { + // Invalid client-go tiflash_compute store cache if necessary. + err = dom.WatchTiFlashComputeNodeChange() + if err != nil { + return nil, err + } + } + if err = extensionimpl.Bootstrap(context.Background(), dom); err != nil { return nil, err } diff --git a/store/copr/BUILD.bazel b/store/copr/BUILD.bazel index 674ca8f6c54e1..eb3eb2f016424 100644 --- a/store/copr/BUILD.bazel +++ b/store/copr/BUILD.bazel @@ -41,6 +41,7 @@ go_library( "@com_github_pingcap_kvproto//pkg/mpp", "@com_github_pingcap_log//:log", "@com_github_pingcap_tipb//go-tipb", + "@com_github_stathat_consistent//:consistent", "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//error", "@com_github_tikv_client_go_v2//metrics", @@ -75,6 +76,7 @@ go_test( "//testkit/testsetup", "//util/paging", "@com_github_pingcap_kvproto//pkg/coprocessor", + "@com_github_stathat_consistent//:consistent", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//testutils", diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 5f6e435028e3b..801eebc40de9a 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -31,10 +31,12 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/mpp" "github.com/pingcap/log" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/driver/backoff" derr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/util/logutil" + "github.com/stathat/consistent" "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" @@ -528,12 +530,34 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks [] return ret } -func buildBatchCopTasksForNonPartitionedTable(bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges, storeType kv.StoreType, mppStoreLastFailTime *sync.Map, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { +func buildBatchCopTasksForNonPartitionedTable(bo *backoff.Backoffer, + store *kvStore, + ranges *KeyRanges, + storeType kv.StoreType, + mppStoreLastFailTime *sync.Map, + ttl time.Duration, + balanceWithContinuity bool, + balanceContinuousRegionCount int64) ([]*batchCopTask, error) { + if config.GetGlobalConfig().DisaggregatedTiFlash { + return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) + } return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) } -func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, mppStoreLastFailTime *sync.Map, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64, partitionIDs []int64) ([]*batchCopTask, error) { - batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) +func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, + store *kvStore, + rangesForEachPhysicalTable []*KeyRanges, + storeType kv.StoreType, + mppStoreLastFailTime *sync.Map, + ttl time.Duration, + balanceWithContinuity bool, + balanceContinuousRegionCount int64, + partitionIDs []int64) (batchTasks []*batchCopTask, err error) { + if config.GetGlobalConfig().DisaggregatedTiFlash { + batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) + } else { + batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) + } if err != nil { return nil, err } @@ -542,6 +566,51 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, store *kvStore return batchTasks, nil } +func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, mppStoreLastFailTime *sync.Map, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { + batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) + if err != nil { + return nil, err + } + cache := store.GetRegionCache() + stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer()) + if err != nil { + return nil, err + } + if len(stores) == 0 { + return nil, errors.New("No available tiflash_compute node") + } + + hasher := consistent.New() + for _, store := range stores { + hasher.Add(store.GetAddr()) + } + for _, task := range batchTasks { + addr, err := hasher.Get(task.storeAddr) + if err != nil { + return nil, err + } + var store *tikv.Store + for _, s := range stores { + if s.GetAddr() == addr { + store = s + break + } + } + if store == nil { + return nil, errors.New("cannot find tiflash_compute store: " + addr) + } + + task.storeAddr = addr + task.ctx.Store = store + task.ctx.Addr = addr + } + logutil.BgLogger().Info("build batchCop tasks for disaggregated tiflash using ConsistentHash done.", zap.Int("len(tasks)", len(batchTasks))) + for _, task := range batchTasks { + logutil.BgLogger().Debug("batchTasks detailed info", zap.String("addr", task.storeAddr), zap.Int("RegionInfo number", len(task.regionInfos))) + } + return batchTasks, nil +} + // When `partitionIDs != nil`, it means that buildBatchCopTasksCore is constructing a batch cop tasks for PartitionTableScan. // At this time, `len(rangesForEachPhysicalTable) == len(partitionIDs)` and `rangesForEachPhysicalTable[i]` is for partition `partitionIDs[i]`. // Otherwise, `rangesForEachPhysicalTable[0]` indicates the range for the single physical table. @@ -896,7 +965,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backo if b.req.ResourceGroupTagger != nil { b.req.ResourceGroupTagger(req) } - req.StoreTp = tikvrpc.TiFlash + req.StoreTp = getEndPointType(kv.TiFlash) logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.regionInfos))) resp, retry, cancel, err := sender.SendReqToAddr(bo, task.ctx, task.regionInfos, req, readTimeoutUltraLong) diff --git a/store/copr/batch_coprocessor_test.go b/store/copr/batch_coprocessor_test.go index aafa19071a392..5616f61c54365 100644 --- a/store/copr/batch_coprocessor_test.go +++ b/store/copr/batch_coprocessor_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/pingcap/tidb/kv" + "github.com/stathat/consistent" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/tikv" ) @@ -150,3 +151,57 @@ func TestDeepCopyStoreTaskMap(t *testing.T) { require.Equal(t, 2, len(task.regionInfos)) } } + +// Make sure no duplicated ip:addr. +func generateOneAddr() string { + var ip string + for i := 0; i < 4; i++ { + if i != 0 { + ip += "." + } + ip += strconv.Itoa(rand.Intn(255)) + } + return ip + ":" + strconv.Itoa(rand.Intn(65535)) +} + +func generateDifferentAddrs(num int) (res []string) { + addrMap := make(map[string]struct{}) + for len(addrMap) < num { + addr := generateOneAddr() + if _, ok := addrMap[addr]; !ok { + addrMap[addr] = struct{}{} + } + } + for addr := range addrMap { + res = append(res, addr) + } + return +} + +func TestConsistentHash(t *testing.T) { + allAddrs := generateDifferentAddrs(100) + + computeNodes := allAddrs[:30] + storageNodes := allAddrs[30:] + firstRoundMap := make(map[string]string) + for round := 0; round < 100; round++ { + hasher := consistent.New() + rand.Shuffle(len(computeNodes), func(i, j int) { + computeNodes[i], computeNodes[j] = computeNodes[j], computeNodes[i] + }) + for _, computeNode := range computeNodes { + hasher.Add(computeNode) + } + for _, storageNode := range storageNodes { + computeNode, err := hasher.Get(storageNode) + require.NoError(t, err) + if round == 0 { + firstRoundMap[storageNode] = computeNode + } else { + firstRoundAddr, ok := firstRoundMap[storageNode] + require.True(t, ok) + require.Equal(t, firstRoundAddr, computeNode) + } + } + } +} diff --git a/store/copr/batch_request_sender.go b/store/copr/batch_request_sender.go index b976d26a59ab3..3a3bd339f4e6c 100644 --- a/store/copr/batch_request_sender.go +++ b/store/copr/batch_request_sender.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/config" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" @@ -35,6 +36,7 @@ type RegionInfo struct { Ranges *KeyRanges AllStores []uint64 PartitionIndex int64 // used by PartitionTableScan, indicates the n-th partition of the partition table + Addr string } func (ri *RegionInfo) toCoprocessorRegionInfo() *coprocessor.RegionInfo { @@ -98,18 +100,22 @@ func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx return tikverr.ErrTiDBShuttingDown } - // The reload region param is always true. Because that every time we try, we must - // re-build the range then re-create the batch sender. As a result, the len of "failStores" - // will change. If tiflash's replica is more than two, the "reload region" will always be false. - // Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time - // when meeting io error. - rc := RegionCache{ss.GetRegionCache()} - rc.OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err) + if config.GetGlobalConfig().DisaggregatedTiFlash { + ss.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err) + } else { + // The reload region param is always true. Because that every time we try, we must + // re-build the range then re-create the batch sender. As a result, the len of "failStores" + // will change. If tiflash's replica is more than two, the "reload region" will always be false. + // Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time + // when meeting io error. + rc := RegionCache{ss.GetRegionCache()} + rc.OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err) - // Retry on send request failure when it's not canceled. - // When a store is not available, the leader of related region should be elected quickly. - // TODO: the number of retry time should be limited:since region may be unavailable - // when some unrecoverable disaster happened. - err = bo.Backoff(tikv.BoTiFlashRPC(), errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos)) + // Retry on send request failure when it's not canceled. + // When a store is not available, the leader of related region should be elected quickly. + // TODO: the number of retry time should be limited:since region may be unavailable + // when some unrecoverable disaster happened. + err = bo.Backoff(tikv.BoTiFlashRPC(), errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos)) + } return errors.Trace(err) } diff --git a/store/copr/mpp.go b/store/copr/mpp.go index b75594e49177f..c89bd38eebe6f 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -235,19 +235,24 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req } } + disaggregatedTiFlash := config.GetGlobalConfig().DisaggregatedTiFlash wrappedReq := tikvrpc.NewRequest(tikvrpc.CmdMPPTask, mppReq, kvrpcpb.Context{}) - wrappedReq.StoreTp = tikvrpc.TiFlash + wrappedReq.StoreTp = getEndPointType(kv.TiFlash) // TODO: Handle dispatch task response correctly, including retry logic and cancel logic. var rpcResp *tikvrpc.Response var err error var retry bool + // If copTasks is not empty, we should send request according to region distribution. // Or else it's the task without region, which always happens in high layer task without table. // In that case if originalTask != nil { sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient(), m.enableCollectExecutionInfo) rpcResp, retry, _, err = sender.SendReqToAddr(bo, originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium) + if err != nil && disaggregatedTiFlash { + m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err) + } // No matter what the rpc error is, we won't retry the mpp dispatch tasks. // TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling. // That's a hard job but we can try it in the future. @@ -265,6 +270,9 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled { retry = false } else if err != nil { + if disaggregatedTiFlash { + m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err) + } if bo.Backoff(tikv.BoTiFlashRPC(), err) == nil { retry = true } @@ -327,8 +335,9 @@ func (m *mppIterator) cancelMppTasks() { Meta: &mpp.TaskMeta{StartTs: m.startTs}, } + disaggregatedTiFlash := config.GetGlobalConfig().DisaggregatedTiFlash wrappedReq := tikvrpc.NewRequest(tikvrpc.CmdMPPCancel, killReq, kvrpcpb.Context{}) - wrappedReq.StoreTp = tikvrpc.TiFlash + wrappedReq.StoreTp = getEndPointType(kv.TiFlash) usedStoreAddrs := make(map[string]bool) for _, task := range m.tasks { @@ -350,6 +359,9 @@ func (m *mppIterator) cancelMppTasks() { logutil.BgLogger().Debug("cancel task", zap.Uint64("query id ", m.startTs), zap.String("on addr", storeAddr)) if err != nil { logutil.BgLogger().Error("cancel task error", zap.Error(err), zap.Uint64("query id", m.startTs), zap.String("on addr", storeAddr)) + if disaggregatedTiFlash { + m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err) + } } }) } @@ -365,8 +377,11 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques }, } + var err error + disaggregatedTiFlash := config.GetGlobalConfig().DisaggregatedTiFlash + wrappedReq := tikvrpc.NewRequest(tikvrpc.CmdMPPConn, connReq, kvrpcpb.Context{}) - wrappedReq.StoreTp = tikvrpc.TiFlash + wrappedReq.StoreTp = getEndPointType(kv.TiFlash) // Drain results from root task. // We don't need to process any special error. When we meet errors, just let it fail. @@ -380,6 +395,9 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques } else { m.sendError(err) } + if disaggregatedTiFlash { + m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err) + } return } diff --git a/store/copr/store.go b/store/copr/store.go index ad7ebb5dd9a63..758109b81d805 100644 --- a/store/copr/store.go +++ b/store/copr/store.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/errors" + tidb_config "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/driver/backoff" derr "github.com/pingcap/tidb/store/driver/error" @@ -122,6 +123,9 @@ func getEndPointType(t kv.StoreType) tikvrpc.EndpointType { case kv.TiKV: return tikvrpc.TiKV case kv.TiFlash: + if tidb_config.GetGlobalConfig().DisaggregatedTiFlash { + return tikvrpc.TiFlashCompute + } return tikvrpc.TiFlash case kv.TiDB: return tikvrpc.TiDB diff --git a/util/engine/engine.go b/util/engine/engine.go index 68c369154888e..0a1614041f3bc 100644 --- a/util/engine/engine.go +++ b/util/engine/engine.go @@ -21,7 +21,7 @@ import ( // IsTiFlash tests whether the store is based on tiflash engine. func IsTiFlash(store *metapb.Store) bool { for _, label := range store.Labels { - if label.Key == "engine" && label.Value == "tiflash" { + if label.Key == "engine" && (label.Value == "tiflash_compute" || label.Value == "tiflash") { return true } }