Skip to content

Commit

Permalink
ddl: add helper function to set and query TiFlash's sync status (#30473)
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinNeo committed Dec 17, 2021
1 parent 321d307 commit 1cf2a6d
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 37 deletions.
37 changes: 37 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/table"
goutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
Expand Down Expand Up @@ -731,3 +732,39 @@ func init() {
RunInGoTest = true
}
}

// GetDropOrTruncateTableInfoFromJobsByStore implements GetDropOrTruncateTableInfoFromJobs
func GetDropOrTruncateTableInfoFromJobsByStore(jobs []*model.Job, gcSafePoint uint64, getTable func(uint64, int64, int64) (*model.TableInfo, error), fn func(*model.Job, *model.TableInfo) (bool, error)) (bool, error) {
for _, job := range jobs {
// Check GC safe point for getting snapshot infoSchema.
err := gcutil.ValidateSnapshotWithGCSafePoint(job.StartTS, gcSafePoint)
if err != nil {
return false, err
}
if job.Type != model.ActionDropTable && job.Type != model.ActionTruncateTable {
continue
}

tbl, err := getTable(job.StartTS, job.SchemaID, job.TableID)
if err != nil {
if meta.ErrDBNotExists.Equal(err) {
// The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution,
// then can't find the table from the snapshot info-schema. Should just ignore error here,
// see more in TestParallelDropSchemaAndDropTable.
continue
}
return false, err
}
if tbl == nil {
// The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution,
// then can't find the table from the snapshot info-schema. Should just ignore error here,
// see more in TestParallelDropSchemaAndDropTable.
continue
}
finish, err := fn(job, tbl)
if err != nil || finish {
return finish, err
}
}
return false, nil
}
39 changes: 6 additions & 33 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,42 +710,15 @@ func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, t *meta.Meta,
// GetDropOrTruncateTableInfoFromJobs gets the dropped/truncated table information from DDL jobs,
// it will use the `start_ts` of DDL job as snapshot to get the dropped/truncated table information.
func GetDropOrTruncateTableInfoFromJobs(jobs []*model.Job, gcSafePoint uint64, dom *domain.Domain, fn func(*model.Job, *model.TableInfo) (bool, error)) (bool, error) {
for _, job := range jobs {
// Check GC safe point for getting snapshot infoSchema.
err := gcutil.ValidateSnapshotWithGCSafePoint(job.StartTS, gcSafePoint)
getTable := func(StartTS uint64, SchemaID int64, TableID int64) (*model.TableInfo, error) {
snapMeta, err := dom.GetSnapshotMeta(StartTS)
if err != nil {
return false, err
}
if job.Type != model.ActionDropTable && job.Type != model.ActionTruncateTable {
continue
}

snapMeta, err := dom.GetSnapshotMeta(job.StartTS)
if err != nil {
return false, err
}
tbl, err := snapMeta.GetTable(job.SchemaID, job.TableID)
if err != nil {
if meta.ErrDBNotExists.Equal(err) {
// The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution,
// then can't find the table from the snapshot info-schema. Should just ignore error here,
// see more in TestParallelDropSchemaAndDropTable.
continue
}
return false, err
}
if tbl == nil {
// The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution,
// then can't find the table from the snapshot info-schema. Should just ignore error here,
// see more in TestParallelDropSchemaAndDropTable.
continue
}
finish, err := fn(job, tbl)
if err != nil || finish {
return finish, err
return nil, err
}
tbl, err := snapMeta.GetTable(SchemaID, TableID)
return tbl, err
}
return false, nil
return ddl.GetDropOrTruncateTableInfoFromJobsByStore(jobs, gcSafePoint, getTable, fn)
}

func (e *DDLExec) getRecoverTableByTableName(tableName *ast.TableName) (*model.Job, *model.TableInfo, error) {
Expand Down
239 changes: 235 additions & 4 deletions store/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package helper

import (
"bufio"
"bytes"
"context"
"encoding/hex"
Expand All @@ -32,7 +33,7 @@ import (
"github.com/pingcap/errors"
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -875,7 +876,7 @@ type PDRegionStats struct {
func (h *Helper) GetPDRegionStats(tableID int64, stats *PDRegionStats) error {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return err
return errors.Trace(err)
}

startKey := tablecodec.EncodeTablePrefix(tableID)
Expand All @@ -891,16 +892,246 @@ func (h *Helper) GetPDRegionStats(tableID int64, stats *PDRegionStats) error {

resp, err := util.InternalHTTPClient().Get(statURL)
if err != nil {
return err
return errors.Trace(err)
}
defer func() {
if err = resp.Body.Close(); err != nil {
logutil.BgLogger().Error("err", zap.Error(err))
}
}()

dec := json.NewDecoder(resp.Body)

return dec.Decode(stats)
}

// DeletePlacementRule is to delete placement rule for certain group.
func (h *Helper) DeletePlacementRule(group string, ruleID string) error {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return errors.Trace(err)
}

deleteURL := fmt.Sprintf("%s://%s/pd/api/v1/config/rule/%v/%v",
util.InternalHTTPSchema(),
pdAddrs[0],
group,
ruleID,
)

req, err := http.NewRequest("DELETE", deleteURL, nil)
if err != nil {
return errors.Trace(err)
}

resp, err := util.InternalHTTPClient().Do(req)
if err != nil {
return errors.Trace(err)
}
defer func() {
if err = resp.Body.Close(); err != nil {
logutil.BgLogger().Error("err", zap.Error(err))
}
}()
if resp.StatusCode != http.StatusOK {
return errors.New("DeletePlacementRule returns error")
}
return nil
}

// SetPlacementRule is a helper function to set placement rule.
func (h *Helper) SetPlacementRule(rule placement.Rule) error {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return errors.Trace(err)
}
m, _ := json.Marshal(rule)

postURL := fmt.Sprintf("%s://%s/pd/api/v1/config/rule",
util.InternalHTTPSchema(),
pdAddrs[0],
)
buf := bytes.NewBuffer(m)
resp, err := util.InternalHTTPClient().Post(postURL, "application/json", buf)
if err != nil {
return errors.Trace(err)
}
defer func() {
if err = resp.Body.Close(); err != nil {
logutil.BgLogger().Error("err", zap.Error(err))
}
}()
if resp.StatusCode != http.StatusOK {
return errors.New("SetPlacementRule returns error")
}
return nil
}

// GetGroupRules to get all placement rule in a certain group.
func (h *Helper) GetGroupRules(group string) ([]placement.Rule, error) {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return nil, errors.Trace(err)
}

getURL := fmt.Sprintf("%s://%s/pd/api/v1/config/rules/group/%s",
util.InternalHTTPSchema(),
pdAddrs[0],
group,
)

resp, err := util.InternalHTTPClient().Get(getURL)
if err != nil {
return nil, errors.Trace(err)
}
defer func() {
if err = resp.Body.Close(); err != nil {
logutil.BgLogger().Error("err", zap.Error(err))
}
}()

if resp.StatusCode != http.StatusOK {
return nil, errors.New("GetGroupRules returns error")
}

buf := new(bytes.Buffer)
_, err = buf.ReadFrom(resp.Body)
if err != nil {
return nil, errors.Trace(err)
}

var rules []placement.Rule
err = json.Unmarshal(buf.Bytes(), &rules)
if err != nil {
return nil, errors.Trace(err)
}

return rules, nil
}

// PostAccelerateSchedule sends `regions/accelerate-schedule` request.
func (h *Helper) PostAccelerateSchedule(tableID int64) error {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return errors.Trace(err)
}
startKey := tablecodec.GenTableRecordPrefix(tableID)
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
startKey = codec.EncodeBytes([]byte{}, startKey)
endKey = codec.EncodeBytes([]byte{}, endKey)

postURL := fmt.Sprintf("%s://%s/pd/api/v1/regions/accelerate-schedule",
util.InternalHTTPSchema(),
pdAddrs[0])

input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
}
v, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)
}
resp, err := util.InternalHTTPClient().Post(postURL, "application/json", bytes.NewBuffer(v))
if err != nil {
return errors.Trace(err)
}
defer func() {
if err = resp.Body.Close(); err != nil {
log.Error("err", zap.Error(err))
logutil.BgLogger().Error("err", zap.Error(err))
}
}()
return nil
}

// GetPDRegionRecordStats is a helper function calling `/stats/region`.
func (h *Helper) GetPDRegionRecordStats(tableID int64, stats *PDRegionStats) error {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return errors.Trace(err)
}

startKey := tablecodec.GenTableRecordPrefix(tableID)
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
startKey = codec.EncodeBytes([]byte{}, startKey)
endKey = codec.EncodeBytes([]byte{}, endKey)

statURL := fmt.Sprintf("%s://%s/pd/api/v1/stats/region?start_key=%s&end_key=%s",
util.InternalHTTPSchema(),
pdAddrs[0],
url.QueryEscape(string(startKey)),
url.QueryEscape(string(endKey)))

resp, err := util.InternalHTTPClient().Get(statURL)
if err != nil {
return errors.Trace(err)
}
defer func() {
if err = resp.Body.Close(); err != nil {
logutil.BgLogger().Error("err", zap.Error(err))
}
}()

dec := json.NewDecoder(resp.Body)

return dec.Decode(stats)
}

// GetTiFlashTableIDFromEndKey computes tableID from pd rule's endKey.
func GetTiFlashTableIDFromEndKey(endKey string) int64 {
endKey, _ = url.QueryUnescape(endKey)
_, decodedEndKey, _ := codec.DecodeBytes([]byte(endKey), []byte{})
tableID := tablecodec.DecodeTableID(decodedEndKey)
tableID -= 1
return tableID
}

// ComputeTiFlashStatus is helper function for CollectTiFlashStatus.
func ComputeTiFlashStatus(reader *bufio.Reader, regionReplica *map[int64]int) error {
ns, _, _ := reader.ReadLine()
n, err := strconv.ParseInt(string(ns), 10, 64)
if err != nil {
return errors.Trace(err)
}
for i := int64(0); i < n; i++ {
rs, _, _ := reader.ReadLine()
// For (`table`, `store`), has region `r`
r, err := strconv.ParseInt(strings.Trim(string(rs), "\r\n \t"), 10, 32)
if err != nil {
return errors.Trace(err)
}
if i, ok := (*regionReplica)[r]; ok {
(*regionReplica)[r] = i + 1
} else {
(*regionReplica)[r] = 1
}
}
return nil
}

// CollectTiFlashStatus query sync status of one table from TiFlash store.
// `regionReplica` is a map from RegionID to count of TiFlash Replicas in this region.
func CollectTiFlashStatus(statusAddress string, tableID int64, regionReplica *map[int64]int) error {
statURL := fmt.Sprintf("%s://%s/tiflash/sync-status/%d",
util.InternalHTTPSchema(),
statusAddress,
tableID,
)
resp, err := util.InternalHTTPClient().Get(statURL)
if err != nil {
return errors.Trace(err)
}

defer func() {
err = resp.Body.Close()
if err != nil {
logutil.BgLogger().Error("close body failed", zap.Error(err))
}
}()

reader := bufio.NewReader(resp.Body)
if err = ComputeTiFlashStatus(reader, regionReplica); err != nil {
return errors.Trace(err)
}
return nil
}
Loading

0 comments on commit 1cf2a6d

Please sign in to comment.