Skip to content

Commit

Permalink
operator: make gRPC connections synced (pingcap#52051) (pingcap#53)
Browse files Browse the repository at this point in the history
close pingcap#52049

Co-authored-by: 山岚 <[email protected]>
  • Loading branch information
2 people authored and GitHub Enterprise committed Mar 28, 2024
1 parent a754a18 commit bc045c8
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
32 changes: 31 additions & 1 deletion br/pkg/backup/prepare_snap/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package preparesnap

import (
"context"
"slices"
"sync"
"time"

"github.com/docker/go-units"
Expand Down Expand Up @@ -128,6 +130,34 @@ func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) {
return withoutTiFlash, err
}

func AdaptForGRPCInTest(p PrepareClient) PrepareClient {
return &gRPCGoAdapter{
inner: p,
}
}

// GrpcGoAdapter makes the `Send` call synchronous.
// grpc-go doesn't guarantee concurrency call to `Send` or `Recv` is safe.
// But concurrency call to `send` and `recv` is safe.
// This type is exported for testing.
type gRPCGoAdapter struct {
inner PrepareClient
sendMu sync.Mutex
recvMu sync.Mutex
}

func (s *gRPCGoAdapter) Send(req *brpb.PrepareSnapshotBackupRequest) error {
s.sendMu.Lock()
defer s.sendMu.Unlock()
return s.inner.Send(req)
}

func (s *gRPCGoAdapter) Recv() (*brpb.PrepareSnapshotBackupResponse, error) {
s.recvMu.Lock()
defer s.recvMu.Unlock()
return s.inner.Recv()
}

func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
var cli brpb.Backup_PrepareSnapshotBackupClient
err := c.Mgr.TryWithConn(ctx, storeID, func(cc *grpc.ClientConn) error {
Expand All @@ -142,7 +172,7 @@ func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClie
if err != nil {
return nil, err
}
return cli, nil
return &gRPCGoAdapter{inner: cli}, nil
}

func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/prepare_snap/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (Prepar
}
m.onCreateStore(m.stores[storeID])
}
return m.stores[storeID], nil
return AdaptForGRPCInTest(m.stores[storeID]), nil
}

func (m *mockStores) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {
Expand Down

0 comments on commit bc045c8

Please sign in to comment.