diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 5e847a6ea78c4..1dff77dd72864 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -134,7 +134,7 @@ func (t trivialFlushStream) Recv() (*logbackup.SubscribeFlushEventResponse, erro return &item, nil default: } - return nil, t.cx.Err() + return nil, status.Error(codes.Canceled, t.cx.Err().Error()) } } diff --git a/br/pkg/streamhelper/flush_subscriber.go b/br/pkg/streamhelper/flush_subscriber.go index 64607e68858f8..70cd4d8e4501d 100644 --- a/br/pkg/streamhelper/flush_subscriber.go +++ b/br/pkg/streamhelper/flush_subscriber.go @@ -211,12 +211,14 @@ func (s *subscription) connect(ctx context.Context, dialer LogBackupService) { func (s *subscription) doConnect(ctx context.Context, dialer LogBackupService) error { log.Info("[log backup subscription manager] Adding subscription.", zap.Uint64("store", s.storeID), zap.Uint64("boot", s.storeBootAt)) - s.clearError() + // We should shutdown the background task firstly. + // Once it yields some error during shuting down, the error won't be brought to next run. s.close() + s.clearError() c, err := dialer.GetLogBackupClient(ctx, s.storeID) if err != nil { - return err + return errors.Annotate(err, "failed to get log backup client") } cx, cancel := context.WithCancel(ctx) cli, err := c.SubscribeFlushEvent(cx, &logbackup.SubscribeFlushEventRequest{ @@ -224,7 +226,7 @@ func (s *subscription) doConnect(ctx context.Context, dialer LogBackupService) e }) if err != nil { cancel() - return err + return errors.Annotate(err, "failed to subscribe events") } s.cancel = cancel s.background = spawnJoinable(func() { s.listenOver(cli) }) @@ -249,7 +251,7 @@ func (s *subscription) listenOver(cli eventStream) { msg, err := cli.Recv() if err != nil { log.Info("[log backup flush subscriber] Listen stopped.", zap.Uint64("store", storeID), logutil.ShortError(err)) - if err == io.EOF || err == context.Canceled { + if err == io.EOF || err == context.Canceled || status.Code(err) == codes.Canceled { return } s.emitError(errors.Annotatef(err, "while receiving from store id %d", storeID)) diff --git a/br/pkg/streamhelper/subscription_test.go b/br/pkg/streamhelper/subscription_test.go index 519801ce2b448..2341cb05dc01e 100644 --- a/br/pkg/streamhelper/subscription_test.go +++ b/br/pkg/streamhelper/subscription_test.go @@ -11,6 +11,8 @@ import ( "github.com/pingcap/tidb/br/pkg/streamhelper" "github.com/pingcap/tidb/br/pkg/streamhelper/spans" "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func installSubscribeSupport(c *fakeCluster) { @@ -105,6 +107,25 @@ func TestHasFailureStores(t *testing.T) { req.NoError(sub.PendingErrors()) } +func TestStoreOffline(t *testing.T) { + req := require.New(t) + ctx := context.Background() + c := createFakeCluster(t, 4, true) + c.splitAndScatter("0001", "0002", "0003", "0008", "0009") + installSubscribeSupport(c) + + c.onGetClient = func(u uint64) error { + return status.Error(codes.DataLoss, "upon an eclipsed night, some of data (not all data) have fled from the dataset") + } + sub := streamhelper.NewSubscriber(c, c) + req.NoError(sub.UpdateStoreTopology(ctx)) + req.Error(sub.PendingErrors()) + + c.onGetClient = nil + sub.HandleErrors(ctx) + req.NoError(sub.PendingErrors()) +} + func TestStoreRemoved(t *testing.T) { req := require.New(t) ctx := context.Background()