Skip to content

Commit

Permalink
*: support disaggregated tiflash (#33535)
Browse files Browse the repository at this point in the history
close #34707
  • Loading branch information
guo-shaoge committed Dec 8, 2022
1 parent 631f32d commit 4c04abc
Show file tree
Hide file tree
Showing 17 changed files with 316 additions and 26 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ type Config struct {
Plugin Plugin `toml:"plugin" json:"plugin"`
MaxServerConnections uint32 `toml:"max-server-connections" json:"max-server-connections"`
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
DisaggregatedTiFlash bool `toml:"disaggregated-tiflash" json:"disaggregated-tiflash"`
// TiDBMaxReuseChunk indicates max cached chunk num
TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"`
// TiDBMaxReuseColumn indicates max cached column num
Expand Down Expand Up @@ -988,6 +989,7 @@ var defaultConf = Config{
NewCollationsEnabledOnFirstBootstrap: true,
EnableGlobalKill: true,
TrxSummary: DefaultTrxSummary(),
DisaggregatedTiFlash: false,
TiDBMaxReuseChunk: 64,
TiDBMaxReuseColumn: 256,
}
Expand Down
64 changes: 62 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import (
"github.com/pingcap/tidb/util/memoryusagealarm"
"github.com/pingcap/tidb/util/servermemorylimit"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/transaction"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -1416,6 +1417,64 @@ func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error {
return nil
}

// WatchTiFlashComputeNodeChange create a routine to watch if the topology of tiflash_compute node is changed.
// TODO: tiflashComputeNodeKey is not put to etcd yet(finish this when AutoScaler is done)
//
// store cache will only be invalidated every 30 seconds.
func (do *Domain) WatchTiFlashComputeNodeChange() error {
var watchCh clientv3.WatchChan
if do.etcdClient != nil {
watchCh = do.etcdClient.Watch(context.Background(), tiflashComputeNodeKey)
}
do.wg.Add(1)
duration := 10 * time.Second
go func() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("WatchTiFlashComputeNodeChange exit")
util.Recover(metrics.LabelDomain, "WatchTiFlashComputeNodeChange", nil, false)
}()

var count int
var logCount int
for {
ok := true
var watched bool
select {
case <-do.exit:
return
case _, ok = <-watchCh:
watched = true
case <-time.After(duration):
}
if !ok {
logutil.BgLogger().Error("WatchTiFlashComputeNodeChange watch channel closed")
watchCh = do.etcdClient.Watch(context.Background(), tiflashComputeNodeKey)
count++
if count > 10 {
time.Sleep(time.Duration(count) * time.Second)
}
continue
}
count = 0
switch s := do.store.(type) {
case tikv.Storage:
logCount++
s.GetRegionCache().InvalidateTiFlashComputeStores()
if logCount == 60 {
// Print log every 60*duration seconds.
logutil.BgLogger().Debug("tiflash_compute store cache invalied, will update next query", zap.Bool("watched", watched))
logCount = 0
}
default:
logutil.BgLogger().Debug("No need to watch tiflash_compute store cache for non-tikv store")
return
}
}
}()
return nil
}

// PrivilegeHandle returns the MySQLPrivilege.
func (do *Domain) PrivilegeHandle() *privileges.Handle {
return do.privHandle
Expand Down Expand Up @@ -2078,8 +2137,9 @@ func (do *Domain) ServerMemoryLimitHandle() *servermemorylimit.Handle {
}

const (
privilegeKey = "/tidb/privilege"
sysVarCacheKey = "/tidb/sysvars"
privilegeKey = "/tidb/privilege"
sysVarCacheKey = "/tidb/sysvars"
tiflashComputeNodeKey = "/tiflash/new_tiflash_compute_nodes"
)

// NotifyUpdatePrivilege updates privilege key in etcd, TiDB client that watches
Expand Down
25 changes: 25 additions & 0 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/parser/auth"
Expand Down Expand Up @@ -1278,3 +1279,27 @@ func TestBindingFromHistoryWithTiFlashBindable(t *testing.T) {
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", sql)).Rows()
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]), "can't create binding for query with tiflash engine")
}

func TestDisaggregatedTiFlash(t *testing.T) {
config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = true
})
store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(c1 int)")
tk.MustExec("alter table t set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")

err = tk.ExecToErr("select * from t;")
require.Contains(t, err.Error(), "Please check tiflash_compute node is available")

config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = false
})
tk.MustQuery("select * from t;").Check(testkit.Rows())
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ require (
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
github.com/stathat/consistent v1.0.0
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
Expand Down Expand Up @@ -218,7 +219,6 @@ require (
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stathat/consistent v1.0.0 // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
Expand Down
1 change: 1 addition & 0 deletions planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ go_library(
"//sessiontxn/staleread",
"//statistics",
"//statistics/handle",
"//store/driver/backoff",
"//table",
"//table/tables",
"//table/temptable",
Expand Down
4 changes: 3 additions & 1 deletion planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -1985,7 +1986,8 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
}
}
}
if prop.TaskTp == property.MppTaskType {
// In disaggregated tiflash mode, only MPP is allowed, Cop and BatchCop is deprecated.
if prop.TaskTp == property.MppTaskType || config.GetGlobalConfig().DisaggregatedTiFlash && ts.StoreType == kv.TiFlash {
if ts.KeepOrder {
return invalidTask, nil
}
Expand Down
19 changes: 19 additions & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
Expand All @@ -49,6 +50,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/table/temptable"
Expand All @@ -65,6 +67,7 @@ import (
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/size"
"github.com/tikv/client-go/v2/tikv"
)

const (
Expand Down Expand Up @@ -685,6 +688,13 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) {
ds.preferStoreType = 0
return
}
if config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ds.ctx) {
// TiFlash is in disaggregated mode, need to make sure tiflash_compute node is available.
errMsg := "No available tiflash_compute node"
warning := ErrInternal.GenWithStack(errMsg)
ds.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
return
}
for _, path := range ds.possibleAccessPaths {
if path.StoreType == kv.TiFlash {
ds.preferStoreType |= preferTiFlash
Expand All @@ -702,6 +712,15 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) {
}
}

func isTiFlashComputeNodeAvailable(ctx sessionctx.Context) bool {
bo := backoff.NewBackofferWithVars(context.Background(), 5000, nil)
stores, err := ctx.GetStore().(tikv.Storage).GetRegionCache().GetTiFlashComputeStores(bo.TiKVBackoffer())
if err != nil || len(stores) == 0 {
return false
}
return true
}

func resetNotNullFlag(schema *expression.Schema, start, end int) {
for i := start; i < end; i++ {
col := *schema.Columns[i]
Expand Down
19 changes: 17 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,15 +1392,26 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath,
isolationReadEngines := ctx.GetSessionVars().GetIsolationReadEngines()
availableEngine := map[kv.StoreType]struct{}{}
var availableEngineStr string
var outputComputeNodeErrMsg bool
noTiFlashComputeNode := config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ctx)
for i := len(paths) - 1; i >= 0; i-- {
// availableEngineStr is for warning message.
if _, ok := availableEngine[paths[i].StoreType]; !ok {
availableEngine[paths[i].StoreType] = struct{}{}
if availableEngineStr != "" {
availableEngineStr += ", "
}
availableEngineStr += paths[i].StoreType.Name()
}
if _, ok := isolationReadEngines[paths[i].StoreType]; !ok && paths[i].StoreType != kv.TiDB {
_, exists := isolationReadEngines[paths[i].StoreType]
// Prune this path if:
// 1. path.StoreType doesn't exists in isolationReadEngines or
// 2. TiFlash is disaggregated and the number of tiflash_compute node is zero.
shouldPruneTiFlashCompute := noTiFlashComputeNode && exists && paths[i].StoreType == kv.TiFlash
if shouldPruneTiFlashCompute {
outputComputeNodeErrMsg = true
}
if (!exists && paths[i].StoreType != kv.TiDB) || shouldPruneTiFlashCompute {
paths = append(paths[:i], paths[i+1:]...)
}
}
Expand All @@ -1409,7 +1420,11 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath,
if len(paths) == 0 {
helpMsg := ""
if engineVals == "tiflash" {
helpMsg = ". Please check tiflash replica or ensure the query is readonly"
if outputComputeNodeErrMsg {
helpMsg = ". Please check tiflash_compute node is available"
} else {
helpMsg = ". Please check tiflash replica or ensure the query is readonly"
}
}
err = ErrInternal.GenWithStackByArgs(fmt.Sprintf("No access path for table '%s' is found with '%v' = '%v', valid values can be '%s'%s.", tblName.String(),
variable.TiDBIsolationReadEngines, engineVals, availableEngineStr, helpMsg))
Expand Down
4 changes: 4 additions & 0 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1977,6 +1977,10 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task {
}
attachPlan2Task(proj, newMpp)
return newMpp
case NoMpp:
t = mpp.convertToRootTask(p.ctx)
attachPlan2Task(p, t)
return t
default:
return invalidTask
}
Expand Down
8 changes: 8 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3018,6 +3018,14 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
return nil, err
}

if config.GetGlobalConfig().DisaggregatedTiFlash {
// Invalid client-go tiflash_compute store cache if necessary.
err = dom.WatchTiFlashComputeNodeChange()
if err != nil {
return nil, err
}
}

if err = extensionimpl.Bootstrap(context.Background(), dom); err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_pingcap_log//:log",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_stathat_consistent//:consistent",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//metrics",
Expand Down Expand Up @@ -75,6 +76,7 @@ go_test(
"//testkit/testsetup",
"//util/paging",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_stathat_consistent//:consistent",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//testutils",
Expand Down
Loading

0 comments on commit 4c04abc

Please sign in to comment.