Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keyspace: some cherry-pick from pd-cse #6477

Merged
merged 10 commits into from
Jul 3, 2023
2 changes: 0 additions & 2 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ func (manager *Manager) LoadKeyspace(name string) (*keyspacepb.KeyspaceMeta, err
if meta == nil {
return ErrKeyspaceNotFound
}
meta.Id = id
return nil
})
return meta, err
Expand All @@ -390,7 +389,6 @@ func (manager *Manager) LoadKeyspaceByID(spaceID uint32) (*keyspacepb.KeyspaceMe
}
return nil
})
meta.Id = spaceID
return meta, err
}

Expand Down
35 changes: 25 additions & 10 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func keyspaceIDHash(id uint32) uint32 {
return id & 0xFF
}

// makeKeyRanges encodes keyspace ID to correct LabelRule data.
// RegionBound represents the region boundary of the given keyspace.
// For a keyspace with id ['a', 'b', 'c'], it has four boundaries:
//
// Lower bound for raw mode: ['r', 'a', 'b', 'c']
Expand All @@ -126,23 +126,38 @@ func keyspaceIDHash(id uint32) uint32 {
// And shares upper bound with keyspace with id ['a', 'b', 'c + 1'].
// These repeated bound will not cause any problem, as repetitive bound will be ignored during rangeListBuild,
// but provides guard against hole in keyspace allocations should it occur.
func makeKeyRanges(id uint32) []interface{} {
type RegionBound struct {
RawLeftBound []byte
RawRightBound []byte
TxnLeftBound []byte
TxnRightBound []byte
}

// MakeRegionBound constructs the correct region boundaries of the given keyspace.
func MakeRegionBound(id uint32) *RegionBound {
keyspaceIDBytes := make([]byte, 4)
nextKeyspaceIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(keyspaceIDBytes, id)
binary.BigEndian.PutUint32(nextKeyspaceIDBytes, id+1)
rawLeftBound := hex.EncodeToString(codec.EncodeBytes(append([]byte{'r'}, keyspaceIDBytes[1:]...)))
rawRightBound := hex.EncodeToString(codec.EncodeBytes(append([]byte{'r'}, nextKeyspaceIDBytes[1:]...)))
txnLeftBound := hex.EncodeToString(codec.EncodeBytes(append([]byte{'x'}, keyspaceIDBytes[1:]...)))
txnRightBound := hex.EncodeToString(codec.EncodeBytes(append([]byte{'x'}, nextKeyspaceIDBytes[1:]...)))
return &RegionBound{
RawLeftBound: codec.EncodeBytes(append([]byte{'r'}, keyspaceIDBytes[1:]...)),
RawRightBound: codec.EncodeBytes(append([]byte{'r'}, nextKeyspaceIDBytes[1:]...)),
TxnLeftBound: codec.EncodeBytes(append([]byte{'x'}, keyspaceIDBytes[1:]...)),
TxnRightBound: codec.EncodeBytes(append([]byte{'x'}, nextKeyspaceIDBytes[1:]...)),
}
}

// makeKeyRanges encodes keyspace ID to correct LabelRule data.
func makeKeyRanges(id uint32) []interface{} {
regionBound := MakeRegionBound(id)
return []interface{}{
map[string]interface{}{
"start_key": rawLeftBound,
"end_key": rawRightBound,
"start_key": hex.EncodeToString(regionBound.RawLeftBound),
"end_key": hex.EncodeToString(regionBound.RawRightBound),
},
map[string]interface{}{
"start_key": txnLeftBound,
"end_key": txnRightBound,
"start_key": hex.EncodeToString(regionBound.TxnLeftBound),
"end_key": hex.EncodeToString(regionBound.TxnRightBound),
},
}
}
Expand Down
51 changes: 51 additions & 0 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/server"
"github.com/tikv/pd/pkg/keyspace"
"github.com/unrolled/render"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -398,6 +399,56 @@ func (h *regionsHandler) GetStoreRegions(w http.ResponseWriter, r *http.Request)
h.rd.JSON(w, http.StatusOK, regionsInfo)
}

// @Tags region
// @Summary List regions belongs to the given keyspace ID.
// @Param keyspace_id query string true "Keyspace ID"
// @Param limit query integer false "Limit count" default(16)
// @Produce json
// @Success 200 {object} RegionsInfo
// @Failure 400 {string} string "The input is invalid."
// @Router /regions/keyspace/id/{id} [get]
func (h *regionsHandler) GetKeyspaceRegions(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
vars := mux.Vars(r)
keyspaceIDStr := vars["id"]
if keyspaceIDStr == "" {
h.rd.JSON(w, http.StatusBadRequest, "keyspace id is empty")
return
}

keyspaceID64, err := strconv.ParseUint(keyspaceIDStr, 10, 32)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
keyspaceID := uint32(keyspaceID64)
keyspaceManager := h.svr.GetKeyspaceManager()
if _, err := keyspaceManager.LoadKeyspaceByID(keyspaceID); err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}

limit := defaultRegionLimit
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
limit, err = strconv.Atoi(limitStr)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
}
if limit > maxRegionLimit {
limit = maxRegionLimit
}
regionBound := keyspace.MakeRegionBound(keyspaceID)
regions := rc.ScanRegions(regionBound.RawLeftBound, regionBound.RawRightBound, limit)
if limit <= 0 || limit > len(regions) {
txnRegion := rc.ScanRegions(regionBound.TxnLeftBound, regionBound.TxnRightBound, limit-len(regions))
regions = append(regions, txnRegion...)
}
regionsInfo := convertToAPIRegions(regions)
h.rd.JSON(w, http.StatusOK, regionsInfo)
}

// @Tags region
// @Summary List all regions that miss peer.
// @Produce json
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(clusterRouter, "/regions/key", regionsHandler.ScanRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/count", regionsHandler.GetRegionCount, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/store/{id}", regionsHandler.GetStoreRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/keyspace/id/{id}", regionsHandler.GetKeyspaceRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/writeflow", regionsHandler.GetTopWriteFlowRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/readflow", regionsHandler.GetTopReadFlowRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/confver", regionsHandler.GetTopConfVerRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
Expand Down
8 changes: 7 additions & 1 deletion server/keyspace_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques
putFn := func(kv *mvccpb.KeyValue) error {
meta := &keyspacepb.KeyspaceMeta{}
if err := proto.Unmarshal(kv.Value, meta); err != nil {
defer cancel() // cancel context to stop watcher
return err
}
keyspaces = append(keyspaces, meta)
Expand All @@ -92,9 +93,14 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques
defer func() {
keyspaces = keyspaces[:0]
}()
return stream.Send(&keyspacepb.WatchKeyspacesResponse{
err := stream.Send(&keyspacepb.WatchKeyspacesResponse{
Header: s.header(),
Keyspaces: keyspaces})
if err != nil {
defer cancel() // cancel context to stop watcher
return err
}
return nil
}

watcher := etcdutil.NewLoopWatcher(
Expand Down
1 change: 1 addition & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1840,6 +1840,7 @@ func (s *Server) initTSOPrimaryWatcher() {
if len(listenUrls) > 0 {
// listenUrls[0] is the primary service endpoint of the keyspace group
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
log.Info("update tso primary", zap.String("primary", listenUrls[0]))
}
return nil
}
Expand Down
39 changes: 39 additions & 0 deletions tools/pd-ctl/pdctl/command/region_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
regionsKeyPrefix = "pd/api/v1/regions/key"
regionsSiblingPrefix = "pd/api/v1/regions/sibling"
regionsRangeHolesPrefix = "pd/api/v1/regions/range-holes"
regionsKeyspacePrefix = "pd/api/v1/regions/keyspace"
regionIDPrefix = "pd/api/v1/region/id"
regionKeyPrefix = "pd/api/v1/region/key"
)
Expand All @@ -60,6 +61,7 @@ func NewRegionCommand() *cobra.Command {
r.AddCommand(NewRegionWithCheckCommand())
r.AddCommand(NewRegionWithSiblingCommand())
r.AddCommand(NewRegionWithStoreCommand())
r.AddCommand(NewRegionWithKeyspaceCommand())
r.AddCommand(NewRegionsByKeysCommand())
r.AddCommand(NewRangesWithRangeHolesCommand())

Expand Down Expand Up @@ -463,6 +465,43 @@ func showRegionWithStoreCommandFunc(cmd *cobra.Command, args []string) {
cmd.Println(r)
}

// NewRegionWithKeyspaceCommand returns regions with keyspace subcommand of regionCmd
func NewRegionWithKeyspaceCommand() *cobra.Command {
r := &cobra.Command{
Use: "keyspace <subcommand>",
Short: "show region information of the given keyspace",
}
r.AddCommand(&cobra.Command{
Use: "id <keyspace_id> <limit>",
Short: "show region information for the given keyspace id",
Run: showRegionWithKeyspaceCommandFunc,
})
return r
}

func showRegionWithKeyspaceCommandFunc(cmd *cobra.Command, args []string) {
if len(args) < 1 || len(args) > 2 {
cmd.Println(cmd.UsageString())
return
}

keyspaceID := args[0]
prefix := regionsKeyspacePrefix + "/id/" + keyspaceID
if len(args) == 2 {
if _, err := strconv.Atoi(args[1]); err != nil {
cmd.Println("limit should be a number")
return
}
prefix += "?limit=" + args[1]
}
r, err := doRequest(cmd, prefix, http.MethodGet, http.Header{})
if err != nil {
cmd.Printf("Failed to get regions with the given keyspace: %s\n", err)
return
}
cmd.Println(r)
}

const (
rangeHolesLongDesc = `There are some cases that the region range is not continuous, for example, the region doesn't send the heartbeat to PD after a splitting.
This command will output all empty ranges without any region info.`
Expand Down