Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#4798
Browse files Browse the repository at this point in the history
ref tikv#4797

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
rleungx authored and ti-chi-bot committed Jun 22, 2022
1 parent 0b84f98 commit ebd2007
Show file tree
Hide file tree
Showing 2 changed files with 346 additions and 2 deletions.
327 changes: 325 additions & 2 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,65 @@ import (
"google.golang.org/grpc/status"
)

<<<<<<< HEAD
const slowThreshold = 5 * time.Millisecond
=======
const (
heartbeatSendTimeout = 5 * time.Second
// store config
storeReadyWaitTime = 5 * time.Second

// tso
maxMergeTSORequests = 10000
defaultTSOProxyTimeout = 3 * time.Second

// global config
globalConfigPath = "/global/config/"
)
>>>>>>> e3fd147fd (server: partially fix grpc status (#4798))

// gRPC errors
var (
// ErrNotLeader is returned when current server is not the leader and not possible to process request.
// TODO: work as proxy.
<<<<<<< HEAD
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
)

=======
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
)

// GrpcServer wraps Server to provide grpc service.
type GrpcServer struct {
*Server
}

type forwardFn func(ctx context.Context, client *grpc.ClientConn) (interface{}, error)

func (s *GrpcServer) unaryMiddleware(ctx context.Context, header *pdpb.RequestHeader, fn forwardFn) (rsp interface{}, err error) {
failpoint.Inject("customTimeout", func() {
time.Sleep(5 * time.Second)
})
forwardedHost := getForwardedHost(ctx)
if !s.isLocalRequest(forwardedHost) {
client, err := s.getDelegateClient(ctx, forwardedHost)
if err != nil {
return nil, err
}
ctx = grpcutil.ResetForwardContext(ctx)
return fn(ctx, client)
}
if err := s.validateRequest(header); err != nil {
return nil, err
}
return nil, nil
}

>>>>>>> e3fd147fd (server: partially fix grpc status (#4798))
// GetMembers implements gRPC PDServer.
func (s *Server) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) {
if s.IsClosed() {
Expand Down Expand Up @@ -455,10 +504,46 @@ func (s *Server) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHeartbea
Header: s.header(),
ReplicationStatus: rc.GetReplicationMode().GetReplicationStatus(),
ClusterVersion: rc.GetClusterVersion(),
<<<<<<< HEAD
}, nil
}

const regionHeartbeatSendTimeout = 5 * time.Second
=======
}
if rc.GetUnsafeRecoveryController() != nil {
rc.GetUnsafeRecoveryController().HandleStoreHeartbeat(request, resp)
}
return resp, nil
}

// bucketHeartbeatServer wraps PD_ReportBucketsServer to ensure when any error
// occurs on SendAndClose() or Recv(), both endpoints will be closed.
type bucketHeartbeatServer struct {
stream pdpb.PD_ReportBucketsServer
closed int32
}

func (b *bucketHeartbeatServer) Send(bucket *pdpb.ReportBucketsResponse) error {
if atomic.LoadInt32(&b.closed) == 1 {
return status.Errorf(codes.Canceled, "stream is closed")
}
done := make(chan error, 1)
go func() {
done <- b.stream.SendAndClose(bucket)
}()
select {
case err := <-done:
if err != nil {
atomic.StoreInt32(&b.closed, 1)
}
return err
case <-time.After(heartbeatSendTimeout):
atomic.StoreInt32(&b.closed, 1)
return ErrSendHeartbeatTimeout
}
}
>>>>>>> e3fd147fd (server: partially fix grpc status (#4798))

var errSendRegionHeartbeatTimeout = errors.New("send region heartbeat timeout")

Expand All @@ -479,6 +564,106 @@ func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error {
case err := <-done:
if err != nil {
atomic.StoreInt32(&s.closed, 1)
<<<<<<< HEAD
=======
}
return errors.WithStack(err)
case <-time.After(heartbeatSendTimeout):
atomic.StoreInt32(&s.closed, 1)
return ErrSendHeartbeatTimeout
}
}

func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) {
if atomic.LoadInt32(&s.closed) == 1 {
return nil, io.EOF
}
req, err := s.stream.Recv()
if err != nil {
atomic.StoreInt32(&s.closed, 1)
return nil, errors.WithStack(err)
}
return req, nil
}

// ReportBuckets implements gRPC PDServer
func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error {
var (
server = &bucketHeartbeatServer{stream: stream}
forwardStream pdpb.PD_ReportBucketsClient
cancel context.CancelFunc
lastForwardedHost string
errCh chan error
)
defer func() {
if cancel != nil {
cancel()
}
}()
for {
request, err := server.Recv()
if err == io.EOF {
return nil
}
forwardedHost := getForwardedHost(stream.Context())
if !s.isLocalRequest(forwardedHost) {
if forwardStream == nil || lastForwardedHost != forwardedHost {
if cancel != nil {
cancel()
}
client, err := s.getDelegateClient(s.ctx, forwardedHost)
if err != nil {
return err
}
log.Info("create bucket report forward stream", zap.String("forwarded-host", forwardedHost))
forwardStream, cancel, err = s.createReportBucketsForwardStream(client)
if err != nil {
return err
}
lastForwardedHost = forwardedHost
errCh = make(chan error, 1)
go forwardReportBucketClientToServer(forwardStream, server, errCh)
}
if err := forwardStream.Send(request); err != nil {
return errors.WithStack(err)
}

select {
case err := <-errCh:
return err
default:
}
continue
}
rc := s.GetRaftCluster()
if rc == nil {
resp := &pdpb.ReportBucketsResponse{
Header: s.notBootstrappedHeader(),
}
err := server.Send(resp)
return errors.WithStack(err)
}
if err := s.validateRequest(request.GetHeader()); err != nil {
return err
}
buckets := request.GetBuckets()
if buckets == nil || len(buckets.Keys) == 0 {
continue
}
store := rc.GetLeaderStoreByRegionID(buckets.GetRegionId())
if store == nil {
return errors.Errorf("the store of the bucket in region %v is not found ", buckets.GetRegionId())
}
storeLabel := strconv.FormatUint(store.GetID(), 10)
storeAddress := store.GetAddress()
bucketReportCounter.WithLabelValues(storeAddress, storeLabel, "report", "recv").Inc()

start := time.Now()
err = rc.HandleReportBuckets(buckets)
if err != nil {
bucketReportCounter.WithLabelValues(storeAddress, storeLabel, "report", "err").Inc()
continue
>>>>>>> e3fd147fd (server: partially fix grpc status (#4798))
}
return errors.WithStack(err)
case <-time.After(regionHeartbeatSendTimeout):
Expand Down Expand Up @@ -1243,7 +1428,7 @@ func (s *Server) GetOperator(ctx context.Context, request *pdpb.GetOperatorReque
// TODO: Call it in gRPC interceptor.
func (s *Server) validateRequest(header *pdpb.RequestHeader) error {
if s.IsClosed() || !s.member.IsLeader() {
return errors.WithStack(ErrNotLeader)
return ErrNotLeader
}
if header.GetClusterId() != s.clusterID {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, header.GetClusterId())
Expand Down Expand Up @@ -1405,7 +1590,7 @@ func (s *Server) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCLocat
// the gRPC communication between PD servers internally.
func (s *Server) validateInternalRequest(header *pdpb.RequestHeader, onlyAllowLeader bool) error {
if s.IsClosed() {
return errors.WithStack(ErrNotStarted)
return ErrNotStarted
}
// If onlyAllowLeader is true, check whether the sender is PD leader.
if onlyAllowLeader {
Expand Down Expand Up @@ -1502,3 +1687,141 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan
}
<-done
}
<<<<<<< HEAD
=======

// StoreGlobalConfig store global config into etcd by transaction
func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
ops := make([]clientv3.Op, len(request.Changes))
for i, item := range request.Changes {
name := globalConfigPath + item.GetName()
value := item.GetValue()
ops[i] = clientv3.OpPut(name, value)
}
res, err :=
kv.NewSlowLogTxn(s.client).Then(ops...).Commit()
if err != nil {
return &pdpb.StoreGlobalConfigResponse{Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: err.Error()}}, err
}
if !res.Succeeded {
return &pdpb.StoreGlobalConfigResponse{Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: "failed to execute StoreGlobalConfig transaction"}}, errors.Errorf("failed to execute StoreGlobalConfig transaction")
}
return &pdpb.StoreGlobalConfigResponse{}, err
}

// LoadGlobalConfig load global config from etcd
func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) {
names := request.Names
res := make([]*pdpb.GlobalConfigItem, len(names))
for i, name := range names {
r, err := s.client.Get(ctx, globalConfigPath+name)
if err != nil {
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: err.Error()}}
} else if len(r.Kvs) == 0 {
msg := "key " + name + " not found"
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND, Message: msg}}
} else {
res[i] = &pdpb.GlobalConfigItem{Name: name, Value: string(r.Kvs[0].Value)}
}
}
return &pdpb.LoadGlobalConfigResponse{Items: res}, nil
}

// WatchGlobalConfig if the connection of WatchGlobalConfig is end
// or stoped by whatever reason
// just reconnect to it.
func (s *GrpcServer) WatchGlobalConfig(_ *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
ctx, cancel := context.WithCancel(s.Context())
defer cancel()
err := s.sendAllGlobalConfig(ctx, server)
if err != nil {
return err
}
watchChan := s.client.Watch(ctx, globalConfigPath, clientv3.WithPrefix())
for {
select {
case <-ctx.Done():
return nil
case res := <-watchChan:
cfgs := make([]*pdpb.GlobalConfigItem, 0, len(res.Events))
for _, e := range res.Events {
if e.Type != clientv3.EventTypePut {
continue
}
cfgs = append(cfgs, &pdpb.GlobalConfigItem{Name: string(e.Kv.Key), Value: string(e.Kv.Value)})
}
if len(cfgs) > 0 {
err := server.Send(&pdpb.WatchGlobalConfigResponse{Changes: cfgs})
if err != nil {
return err
}
}
}
}
}

func (s *GrpcServer) sendAllGlobalConfig(ctx context.Context, server pdpb.PD_WatchGlobalConfigServer) error {
configList, err := s.client.Get(ctx, globalConfigPath, clientv3.WithPrefix())
if err != nil {
return err
}
ls := make([]*pdpb.GlobalConfigItem, configList.Count)
for i, kv := range configList.Kvs {
ls[i] = &pdpb.GlobalConfigItem{Name: string(kv.Key), Value: string(kv.Value)}
}
err = server.Send(&pdpb.WatchGlobalConfigResponse{Changes: ls})
return err
}

// Evict the leaders when the store is damaged. Damaged regions are emergency errors
// and requires user to manually remove the `evict-leader-scheduler` with pd-ctl
func (s *GrpcServer) handleDamagedStore(stats *pdpb.StoreStats) error {
// TODO: regions have no special process for the time being
// and need to be removed in the future
damagedRegions := stats.GetDamagedRegionsId()
if len(damagedRegions) == 0 {
return nil
}

log.Error("store damaged and leaders will be evicted, you might fix the store and remove evict-leader-scheduler manually",
zap.Uint64("store-id", stats.GetStoreId()),
zap.Uint64s("region-ids", damagedRegions))

// TODO: reimplement add scheduler logic to avoid repeating the introduction HTTP requests inside `server/api`.
return s.GetHandler().AddEvictOrGrant(float64(stats.GetStoreId()), schedulers.EvictLeaderName)
}

// ReportMinResolvedTS implements gRPC PDServer.
func (s *GrpcServer) ReportMinResolvedTS(ctx context.Context, request *pdpb.ReportMinResolvedTsRequest) (*pdpb.ReportMinResolvedTsResponse, error) {
forwardedHost := getForwardedHost(ctx)
if !s.isLocalRequest(forwardedHost) {
client, err := s.getDelegateClient(ctx, forwardedHost)
if err != nil {
return nil, err
}
ctx = grpcutil.ResetForwardContext(ctx)
return pdpb.NewPDClient(client).ReportMinResolvedTS(ctx, request)
}

if err := s.validateRequest(request.GetHeader()); err != nil {
return nil, err
}

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.ReportMinResolvedTsResponse{Header: s.notBootstrappedHeader()}, nil
}

storeID := request.StoreId
minResolvedTS := request.MinResolvedTs
if err := rc.SetMinResolvedTS(storeID, minResolvedTS); err != nil {
return nil, err
}
log.Debug("updated min resolved-ts",
zap.Uint64("store", storeID),
zap.Uint64("min resolved-ts", minResolvedTS))
return &pdpb.ReportMinResolvedTsResponse{
Header: s.header(),
}, nil
}
>>>>>>> e3fd147fd (server: partially fix grpc status (#4798))
Loading

0 comments on commit ebd2007

Please sign in to comment.