Skip to content

Commit

Permalink
*: add service gc safepoint commands (#2797)
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus authored Aug 21, 2020
1 parent c200bdb commit d40341c
Show file tree
Hide file tree
Showing 11 changed files with 319 additions and 15 deletions.
14 changes: 13 additions & 1 deletion pkg/tsoutil/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

package tsoutil

import "time"
import (
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
)

const (
physicalShiftBits = 18
Expand All @@ -27,3 +31,11 @@ func ParseTS(ts uint64) (time.Time, uint64) {
physicalTime := time.Unix(int64(physical/1000), int64(physical)%1000*time.Millisecond.Nanoseconds())
return physicalTime, logical
}

// ParseTimestamp parses pdpb.Timestamp to time.Time
func ParseTimestamp(ts pdpb.Timestamp) (time.Time, uint64) {
logical := uint64(ts.Logical)
physical := ts.Physical
physicalTime := time.Unix(int64(physical/1000), int64(physical)%1000*time.Millisecond.Nanoseconds())
return physicalTime, logical
}
5 changes: 5 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ func createRouter(ctx context.Context, prefix string, svr *server.Server) *mux.R
apiRouter.Handle("/debug/pprof/block", pprof.Handler("block"))
apiRouter.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine"))

// service GC safepoint API
serviceGCSafepointHandler := newServiceGCSafepointHandler(svr, rd)
apiRouter.HandleFunc("/gc/safepoint", serviceGCSafepointHandler.List).Methods("GET")
apiRouter.HandleFunc("/gc/safepoint/{service_id}", serviceGCSafepointHandler.Delete).Methods("DELETE")

// Deprecated
rootRouter.Handle("/health", newHealthHandler(svr, rd)).Methods("GET")
// Deprecated
Expand Down
84 changes: 84 additions & 0 deletions server/api/service_gc_safepoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2020 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"net/http"

"github.com/gorilla/mux"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/core"
"github.com/unrolled/render"
)

type serviceGCSafepointHandler struct {
svr *server.Server
rd *render.Render
}

func newServiceGCSafepointHandler(svr *server.Server, rd *render.Render) *serviceGCSafepointHandler {
return &serviceGCSafepointHandler{
svr: svr,
rd: rd,
}
}

type listServiceGCSafepoint struct {
ServiceGCSafepoints []*core.ServiceSafePoint `json:"service_gc_safe_points"`
GCSafePoint uint64 `json:"gc_safe_point"`
}

// @Tags servicegcsafepoint
// @Summary Get all service GC safepoint.
// @Produce json
// @Success 200 {array} listServiceGCSafepoint
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /gc/safepoint [get]
func (h *serviceGCSafepointHandler) List(w http.ResponseWriter, r *http.Request) {
storage := h.svr.GetStorage()
gcSafepoint, err := storage.LoadGCSafePoint()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
ssps, err := storage.GetAllServiceGCSafePoints()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
list := listServiceGCSafepoint{
GCSafePoint: gcSafepoint,
ServiceGCSafepoints: ssps,
}
h.rd.JSON(w, http.StatusOK, list)
}

// @Tags servicegcsafepoint
// @Summary Delete a service GC safepoint.
// @Param service_id path string true "Service ID"
// @Produce json
// @Success 200 {string} string "Delete service GC safepoint successfully."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /gc/safepoint/{service_id} [delete]
// @Tags rule
func (h *serviceGCSafepointHandler) Delete(w http.ResponseWriter, r *http.Request) {
storage := h.svr.GetStorage()
serviceID := mux.Vars(r)["service_id"]
err := storage.RemoveServiceGCSafePoint(serviceID)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, "Delete service GC safepoint successfully.")
}
95 changes: 95 additions & 0 deletions server/api/service_gc_safepoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2020 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"fmt"
"net/http"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/apiutil"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/core"
)

var _ = Suite(&testServiceGCSafepointSuite{})

type testServiceGCSafepointSuite struct {
svr *server.Server
cleanup cleanUpFunc
urlPrefix string
}

func (s *testServiceGCSafepointSuite) SetUpSuite(c *C) {
s.svr, s.cleanup = mustNewServer(c)
mustWaitLeader(c, []*server.Server{s.svr})

addr := s.svr.GetAddr()
s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)

mustBootstrapCluster(c, s.svr)
mustPutStore(c, s.svr, 1, metapb.StoreState_Up, nil)
}

func (s *testServiceGCSafepointSuite) TearDownSuite(c *C) {
s.cleanup()
}

func (s *testServiceGCSafepointSuite) TestRegionStats(c *C) {
sspURL := s.urlPrefix + "/gc/safepoint"

storage := s.svr.GetStorage()
list := &listServiceGCSafepoint{
ServiceGCSafepoints: []*core.ServiceSafePoint{
{
ServiceID: "a",
ExpiredAt: time.Now().Unix() + 10,
SafePoint: 1,
},
{
ServiceID: "b",
ExpiredAt: time.Now().Unix() + 10,
SafePoint: 2,
},
{
ServiceID: "c",
ExpiredAt: time.Now().Unix() + 10,
SafePoint: 3,
},
},
GCSafePoint: 1,
}
for _, ssp := range list.ServiceGCSafepoints {
err := storage.SaveServiceGCSafePoint(ssp)
c.Assert(err, IsNil)
}
storage.SaveGCSafePoint(1)

res, err := testDialClient.Get(sspURL)
c.Assert(err, IsNil)
listResp := &listServiceGCSafepoint{}
err = apiutil.ReadJSON(res.Body, listResp)
c.Assert(err, IsNil)
c.Assert(listResp, DeepEquals, list)

res, err = doDelete(testDialClient, sspURL+"/a")
c.Assert(err, IsNil)
c.Assert(res.StatusCode, Equals, http.StatusOK)

left, err := storage.GetAllServiceGCSafePoints()
c.Assert(err, IsNil)
c.Assert(left, DeepEquals, list.ServiceGCSafepoints[1:])
}
35 changes: 29 additions & 6 deletions server/core/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,9 @@ func (s *Storage) LoadGCSafePoint() (uint64, error) {

// ServiceSafePoint is the safepoint for a specific service
type ServiceSafePoint struct {
ServiceID string
ExpiredAt int64
SafePoint uint64
ServiceID string `json:"service_id"`
ExpiredAt int64 `json:"expired_at"`
SafePoint uint64 `json:"safe_point"`
}

// SaveServiceGCSafePoint saves a GC safepoint for the service
Expand All @@ -454,7 +454,7 @@ func (s *Storage) RemoveServiceGCSafePoint(serviceID string) error {
}

// LoadMinServiceGCSafePoint returns the minimum safepoint across all services
func (s *Storage) LoadMinServiceGCSafePoint() (*ServiceSafePoint, error) {
func (s *Storage) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, error) {
prefix := path.Join(gcPath, "safe_point", "service") + "/"
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := s.LoadRange(prefix, prefixEnd, 0)
Expand All @@ -466,13 +466,12 @@ func (s *Storage) LoadMinServiceGCSafePoint() (*ServiceSafePoint, error) {
}

min := &ServiceSafePoint{SafePoint: math.MaxUint64}
now := time.Now().Unix()
for i, key := range keys {
ssp := &ServiceSafePoint{}
if err := json.Unmarshal([]byte(values[i]), ssp); err != nil {
return nil, err
}
if ssp.ExpiredAt < now {
if ssp.ExpiredAt < now.Unix() {
s.Remove(key)
continue
}
Expand All @@ -484,6 +483,30 @@ func (s *Storage) LoadMinServiceGCSafePoint() (*ServiceSafePoint, error) {
return min, nil
}

// GetAllServiceGCSafePoints returns all services GC safepoints
func (s *Storage) GetAllServiceGCSafePoints() ([]*ServiceSafePoint, error) {
prefix := path.Join(gcPath, "safe_point", "service") + "/"
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := s.LoadRange(prefix, prefixEnd, 0)
if err != nil {
return nil, err
}
if len(keys) == 0 {
return []*ServiceSafePoint{}, nil
}

ssps := make([]*ServiceSafePoint, 0, len(keys))
for i := range keys {
ssp := &ServiceSafePoint{}
if err := json.Unmarshal([]byte(values[i]), ssp); err != nil {
return nil, err
}
ssps = append(ssps, ssp)
}

return ssps, nil
}

// LoadAllScheduleConfig loads all schedulers' config.
func (s *Storage) LoadAllScheduleConfig() ([]string, []string, error) {
prefix := customScheduleConfigPath + "/"
Expand Down
2 changes: 1 addition & 1 deletion server/core/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (s *testKVSuite) TestLoadMinServiceGCSafePoint(c *C) {
c.Assert(storage.SaveServiceGCSafePoint(ssp), IsNil)
}

ssp, err := storage.LoadMinServiceGCSafePoint()
ssp, err := storage.LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(ssp.ServiceID, Equals, "2")
c.Assert(ssp.ExpiredAt, Equals, expireAt)
Expand Down
12 changes: 8 additions & 4 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,15 +775,19 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd
}
}

min, err := s.storage.LoadMinServiceGCSafePoint()
now, err := s.tso.Now()
if err != nil {
return nil, err
}
min, err := s.storage.LoadMinServiceGCSafePoint(now)
if err != nil {
return nil, err
}

if request.TTL > 0 && request.SafePoint >= min.SafePoint {
ssp := &core.ServiceSafePoint{
ServiceID: string(request.ServiceId),
ExpiredAt: time.Now().Unix() + request.TTL,
ExpiredAt: now.Unix() + request.TTL,
SafePoint: request.SafePoint,
}
if err := s.storage.SaveServiceGCSafePoint(ssp); err != nil {
Expand All @@ -795,7 +799,7 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd
zap.Uint64("safepoint", ssp.SafePoint))
// If the min safepoint is updated, load the next one
if string(request.ServiceId) == min.ServiceID {
min, err = s.storage.LoadMinServiceGCSafePoint()
min, err = s.storage.LoadMinServiceGCSafePoint(now)
if err != nil {
return nil, err
}
Expand All @@ -809,7 +813,7 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd
return &pdpb.UpdateServiceGCSafePointResponse{
Header: s.header(),
ServiceId: []byte(min.ServiceID),
TTL: min.ExpiredAt - time.Now().Unix(),
TTL: min.ExpiredAt - now.Unix(),
MinSafePoint: min.SafePoint,
}, nil
}
Expand Down
10 changes: 10 additions & 0 deletions server/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,13 @@ func (t *TimestampOracle) GetRespTS(count uint32) (pdpb.Timestamp, error) {
}
return resp, errors.New("can not get timestamp")
}

// Now returns the current tso time.
func (t *TimestampOracle) Now() (time.Time, error) {
resp, err := t.GetRespTS(1)
if err != nil {
return time.Time{}, err
}
tm, _ := tsoutil.ParseTimestamp(resp)
return tm, nil
}
6 changes: 3 additions & 3 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,15 +676,15 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) {
c.Assert(min, Equals, uint64(3))

// Update only the TTL of the minimum safepoint
oldMinSsp, err := s.srv.GetStorage().LoadMinServiceGCSafePoint()
oldMinSsp, err := s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(oldMinSsp.ServiceID, Equals, "c")
c.Assert(oldMinSsp.SafePoint, Equals, uint64(3))
min, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"c", 2000, 3)
c.Assert(err, IsNil)
c.Assert(min, Equals, uint64(3))
minSsp, err := s.srv.GetStorage().LoadMinServiceGCSafePoint()
minSsp, err := s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(minSsp.ServiceID, Equals, "c")
c.Assert(oldMinSsp.SafePoint, Equals, uint64(3))
Expand All @@ -695,7 +695,7 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) {
"c", 1, 3)
c.Assert(err, IsNil)
c.Assert(min, Equals, uint64(3))
minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint()
minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(minSsp.ServiceID, Equals, "c")
c.Assert(oldMinSsp.SafePoint, Equals, uint64(3))
Expand Down
Loading

0 comments on commit d40341c

Please sign in to comment.