Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#38571
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
hehechen committed Jan 6, 2023
1 parent 009bd88 commit 56fc651
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 57 deletions.
6 changes: 3 additions & 3 deletions ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,11 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex

logutil.BgLogger().Debug("CollectTiFlashStatus", zap.Any("regionReplica", regionReplica), zap.Int64("tableID", tb.ID))

var stats helper.PDRegionStats
if err := infosync.GetTiFlashPDRegionRecordStats(context.Background(), tb.ID, &stats); err != nil {
var regionCount int
if err := infosync.GetTiFlashRegionCountFromPD(context.Background(), tb.ID, &regionCount); err != nil {
logutil.BgLogger().Error("Fail to get regionCount from PD.", zap.Int64("tableID", tb.ID))
return allReplicaReady, err
}
regionCount := stats.Count
flashRegionCount := len(regionReplica)
avail := regionCount == flashRegionCount
failpoint.Inject("PollTiFlashReplicaStatusReplaceCurAvailableValue", func(val failpoint.Value) {
Expand Down
6 changes: 3 additions & 3 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,13 +1024,13 @@ func PostTiFlashAccelerateSchedule(ctx context.Context, tableID int64) error {
return is.tiflashPlacementManager.PostAccelerateSchedule(ctx, tableID)
}

// GetTiFlashPDRegionRecordStats is a helper function calling `/stats/region`.
func GetTiFlashPDRegionRecordStats(ctx context.Context, tableID int64, stats *helper.PDRegionStats) error {
// GetTiFlashRegionCountFromPD is a helper function calling `/stats/region`.
func GetTiFlashRegionCountFromPD(ctx context.Context, tableID int64, regionCount *int) error {
is, err := getGlobalInfoSyncer()
if err != nil {
return errors.Trace(err)
}
return is.tiflashPlacementManager.GetPDRegionRecordStats(ctx, tableID, stats)
return is.tiflashPlacementManager.GetRegionCountFromPD(ctx, tableID, regionCount)
}

// GetTiFlashStoresStat gets the TiKV store information by accessing PD's api.
Expand Down
33 changes: 15 additions & 18 deletions domain/infosync/tiflash_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type TiFlashPlacementManager interface {
GetGroupRules(ctx context.Context, group string) ([]placement.TiFlashRule, error)
// PostAccelerateSchedule sends `regions/accelerate-schedule` request.
PostAccelerateSchedule(ctx context.Context, tableID int64) error
// GetPDRegionRecordStats is a helper function calling `/stats/region`.
GetPDRegionRecordStats(ctx context.Context, tableID int64, stats *helper.PDRegionStats) error
// GetRegionCountFromPD is a helper function calling `/stats/region`.
GetRegionCountFromPD(ctx context.Context, tableID int64, regionCount *int) error
// GetStoresStat gets the TiKV store information by accessing PD's api.
GetStoresStat(ctx context.Context) (*helper.StoresStat, error)
// Close is to close TiFlashPlacementManager
Expand Down Expand Up @@ -200,28 +200,29 @@ func (m *TiFlashPDPlacementManager) PostAccelerateSchedule(ctx context.Context,
return nil
}

// GetPDRegionRecordStats is a helper function calling `/stats/region`.
func (m *TiFlashPDPlacementManager) GetPDRegionRecordStats(ctx context.Context, tableID int64, stats *helper.PDRegionStats) error {
// GetRegionCountFromPD is a helper function calling `/stats/region`.
func (m *TiFlashPDPlacementManager) GetRegionCountFromPD(ctx context.Context, tableID int64, regionCount *int) error {
startKey := tablecodec.GenTableRecordPrefix(tableID)
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
startKey = codec.EncodeBytes([]byte{}, startKey)
endKey = codec.EncodeBytes([]byte{}, endKey)

p := fmt.Sprintf("/pd/api/v1/stats/region?start_key=%s&end_key=%s",
p := fmt.Sprintf("/pd/api/v1/stats/region?start_key=%s&end_key=%s&count",
url.QueryEscape(string(startKey)),
url.QueryEscape(string(endKey)))
res, err := doRequest(ctx, "GetPDRegionStats", m.etcdCli.Endpoints(), p, "GET", nil)
if err != nil {
return errors.Trace(err)
}
if res == nil {
return fmt.Errorf("TiFlashPDPlacementManager returns error in GetPDRegionRecordStats")
return fmt.Errorf("TiFlashPDPlacementManager returns error in GetRegionCountFromPD")
}

err = json.Unmarshal(res, stats)
var stats helper.PDRegionStats
err = json.Unmarshal(res, &stats)
if err != nil {
return errors.Trace(err)
}
*regionCount = stats.Count
return nil
}

Expand Down Expand Up @@ -457,16 +458,11 @@ func (tiflash *MockTiFlash) HandlePostAccelerateSchedule(endKey string) error {
return nil
}

// HandleGetPDRegionRecordStats is mock function for GetPDRegionRecordStats.
// HandleGetPDRegionRecordStats is mock function for GetRegionCountFromPD.
// It currently always returns 1 Region for convenience.
func (tiflash *MockTiFlash) HandleGetPDRegionRecordStats(_ int64) helper.PDRegionStats {
return helper.PDRegionStats{
Count: 1,
EmptyCount: 1,
StorageSize: 1,
StorageKeys: 1,
StoreLeaderCount: map[uint64]int{1: 1},
StorePeerCount: map[uint64]int{1: 1},
Count: 1,
}
}

Expand Down Expand Up @@ -645,14 +641,15 @@ func (m *mockTiFlashPlacementManager) PostAccelerateSchedule(ctx context.Context
return m.tiflash.HandlePostAccelerateSchedule(hex.EncodeToString(endKey))
}

// GetPDRegionRecordStats is a helper function calling `/stats/region`.
func (m *mockTiFlashPlacementManager) GetPDRegionRecordStats(ctx context.Context, tableID int64, stats *helper.PDRegionStats) error {
// GetRegionCountFromPD is a helper function calling `/stats/region`.
func (m *mockTiFlashPlacementManager) GetRegionCountFromPD(ctx context.Context, tableID int64, regionCount *int) error {
m.Lock()
defer m.Unlock()
if m.tiflash == nil {
return nil
}
*stats = m.tiflash.HandleGetPDRegionRecordStats(tableID)
stats := m.tiflash.HandleGetPDRegionRecordStats(tableID)
*regionCount = stats.Count
return nil
}

Expand Down
33 changes: 0 additions & 33 deletions store/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,39 +1142,6 @@ func (h *Helper) PostAccelerateSchedule(tableID int64) error {
return nil
}

// GetPDRegionRecordStats is a helper function calling `/stats/region`.
func (h *Helper) GetPDRegionRecordStats(tableID int64, stats *PDRegionStats) error {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return errors.Trace(err)
}

startKey := tablecodec.GenTableRecordPrefix(tableID)
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
startKey = codec.EncodeBytes([]byte{}, startKey)
endKey = codec.EncodeBytes([]byte{}, endKey)

statURL := fmt.Sprintf("%s://%s/pd/api/v1/stats/region?start_key=%s&end_key=%s",
util.InternalHTTPSchema(),
pdAddrs[0],
url.QueryEscape(string(startKey)),
url.QueryEscape(string(endKey)))

resp, err := util.InternalHTTPClient().Get(statURL)
if err != nil {
return errors.Trace(err)
}
defer func() {
if err = resp.Body.Close(); err != nil {
logutil.BgLogger().Error("err", zap.Error(err))
}
}()

dec := json.NewDecoder(resp.Body)

return dec.Decode(stats)
}

// GetTiFlashTableIDFromEndKey computes tableID from pd rule's endKey.
func GetTiFlashTableIDFromEndKey(endKey string) int64 {
e, _ := hex.DecodeString(endKey)
Expand Down

0 comments on commit 56fc651

Please sign in to comment.