Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br/streamhelper: fix subscribe error #39689

Merged
merged 12 commits into from
Dec 7, 2022
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (t trivialFlushStream) Recv() (*logbackup.SubscribeFlushEventResponse, erro
return &item, nil
default:
}
return nil, t.cx.Err()
return nil, status.Error(codes.Canceled, t.cx.Err().Error())
}
}

Expand Down
10 changes: 6 additions & 4 deletions br/pkg/streamhelper/flush_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,20 +211,22 @@ func (s *subscription) connect(ctx context.Context, dialer LogBackupService) {

func (s *subscription) doConnect(ctx context.Context, dialer LogBackupService) error {
log.Info("[log backup subscription manager] Adding subscription.", zap.Uint64("store", s.storeID), zap.Uint64("boot", s.storeBootAt))
s.clearError()
// We should shutdown the background task firstly.
// Once it yields some error during shuting down, the error won't be brought to next run.
s.close()
s.clearError()

c, err := dialer.GetLogBackupClient(ctx, s.storeID)
if err != nil {
return err
return errors.Annotate(err, "failed to get log backup client")
}
cx, cancel := context.WithCancel(ctx)
cli, err := c.SubscribeFlushEvent(cx, &logbackup.SubscribeFlushEventRequest{
ClientId: uuid.NewString(),
})
if err != nil {
cancel()
return err
return errors.Annotate(err, "failed to subscribe events")
}
s.cancel = cancel
s.background = spawnJoinable(func() { s.listenOver(cli) })
Expand All @@ -249,7 +251,7 @@ func (s *subscription) listenOver(cli eventStream) {
msg, err := cli.Recv()
if err != nil {
log.Info("[log backup flush subscriber] Listen stopped.", zap.Uint64("store", storeID), logutil.ShortError(err))
if err == io.EOF || err == context.Canceled {
if err == io.EOF || err == context.Canceled || status.Code(err) == codes.Canceled {
return
}
s.emitError(errors.Annotatef(err, "while receiving from store id %d", storeID))
Expand Down
21 changes: 21 additions & 0 deletions br/pkg/streamhelper/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/streamhelper/spans"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func installSubscribeSupport(c *fakeCluster) {
Expand Down Expand Up @@ -105,6 +107,25 @@ func TestHasFailureStores(t *testing.T) {
req.NoError(sub.PendingErrors())
}

func TestStoreOffline(t *testing.T) {
req := require.New(t)
ctx := context.Background()
c := createFakeCluster(t, 4, true)
c.splitAndScatter("0001", "0002", "0003", "0008", "0009")
installSubscribeSupport(c)

c.onGetClient = func(u uint64) error {
return status.Error(codes.DataLoss, "upon an eclipsed night, some of data (not all data) have fled from the dataset")
}
sub := streamhelper.NewSubscriber(c, c)
req.NoError(sub.UpdateStoreTopology(ctx))
req.Error(sub.PendingErrors())

c.onGetClient = nil
sub.HandleErrors(ctx)
req.NoError(sub.PendingErrors())
}

func TestStoreRemoved(t *testing.T) {
req := require.New(t)
ctx := context.Background()
Expand Down