diff --git a/pkg/googlecloud/subscriber.go b/pkg/googlecloud/subscriber.go index ec71de3..cbec3af 100644 --- a/pkg/googlecloud/subscriber.go +++ b/pkg/googlecloud/subscriber.go @@ -174,6 +174,9 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa receiveFinished := make(chan struct{}) s.allSubscriptionsWaitGroup.Add(1) go func() { + exponentialBackoff := backoff.NewExponentialBackOff() + exponentialBackoff.MaxElapsedTime = 0 // 0 means it never expires + if err := backoff.Retry(func() error { err := s.receive(ctx, sub, logFields, output) if err == nil { @@ -188,7 +191,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa s.logger.Error("Receiving messages failed, retrying", err, logFields) return err - }, backoff.NewExponentialBackOff()); err != nil { + }, exponentialBackoff); err != nil { s.logger.Error("Retrying receiving messages failed", err, logFields) }