diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 660df023135c..4465518ae11f 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -662,3 +662,56 @@ func maxDuration(x, y time.Duration) time.Duration { } return y } + +const ( + transientErrStringPrefix = "TRANSIENT_" + transientInvalidAckErrString = transientErrStringPrefix + "FAILURE_INVALID_ACK_ID" + permanentInvalidAckErrString = "PERMANENT_FAILURE_INVALID_ACK_ID" +) + +// processResults processes AckResults by referring to errorStatus and errorsMap. +// The errors returned by the server in `errorStatus` or in `errorsByAckID` +// are used to complete the AckResults in `ackResMap` (with a success +// or error) or to return requests for further retries. +// Logic is derived from python-pubsub: https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py#L161-L220 +func processResults(errorStatus *status.Status, ackResMap map[string]*AckResult, errorsByAckID map[string]error) ([]*AckResult, []*AckResult) { + var completedResults, retryResults []*AckResult + for ackID, res := range ackResMap { + // Handle special errors returned for ack/modack RPCs via the ErrorInfo + // sidecar metadata when exactly-once delivery is enabled. + if errAckID, ok := errorsByAckID[ackID]; ok { + errAckIDStr := errAckID.Error() + if strings.HasPrefix(errAckIDStr, transientErrStringPrefix) { + retryResults = append(retryResults, res) + } else { + if errAckIDStr == permanentInvalidAckErrString { + ipubsub.SetAckResult(res, AcknowledgeStatusInvalidAckID, errAckID) + } else { + ipubsub.SetAckResult(res, AcknowledgeStatusOther, errAckID) + } + completedResults = append(completedResults, res) + } + } else if errorStatus != nil && contains(errorStatus.Code(), exactlyOnceDeliveryTemporaryRetryErrors) { + retryResults = append(retryResults, ackResMap[ackID]) + } else if errorStatus != nil { + // Other gRPC errors are not retried. + switch errorStatus.Code() { + case codes.PermissionDenied: + ipubsub.SetAckResult(res, AcknowledgeStatusPermissionDenied, errorStatus.Err()) + case codes.FailedPrecondition: + ipubsub.SetAckResult(res, AcknowledgeStatusFailedPrecondition, errorStatus.Err()) + default: + ipubsub.SetAckResult(res, AcknowledgeStatusOther, errorStatus.Err()) + } + completedResults = append(completedResults, res) + } else if res != nil { + // Since no error occurred, requests with AckResults are completed successfully. + ipubsub.SetAckResult(res, AcknowledgeStatusSuccess, nil) + completedResults = append(completedResults, res) + } else { + // All other requests are considered completed. + completedResults = append(completedResults, res) + } + } + return completedResults, retryResults +} diff --git a/pubsub/iterator_test.go b/pubsub/iterator_test.go index 9ba21fc7f8a5..5508fab559d9 100644 --- a/pubsub/iterator_test.go +++ b/pubsub/iterator_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + ipubsub "cloud.google.com/go/internal/pubsub" "cloud.google.com/go/internal/testutil" "cloud.google.com/go/pubsub/pstest" "google.golang.org/api/option" @@ -244,7 +245,7 @@ func startReceiving(ctx context.Context, t *testing.T, s *Subscription, recvdWg _, ok := recvd[msgData] if ok { recvdMu.Unlock() - t.Fatalf("already saw \"%s\"\n", msgData) + t.Logf("already saw \"%s\"\n", msgData) return } recvd[msgData] = true @@ -618,3 +619,210 @@ func TestPingStreamAckDeadline(t *testing.T) { } iter.eoMu.RUnlock() } + +func compareCompletedRetryLengths(t *testing.T, completed, retry []*AckResult, wantCompleted, wantRetry int) { + if l := len(completed); l != wantCompleted { + t.Errorf("completed slice length got %d, want %d", l, wantCompleted) + } + if l := len(retry); l != wantRetry { + t.Errorf("retry slice length got %d, want %d", l, wantRetry) + } +} + +func TestExactlyOnceProcessRequests(t *testing.T) { + ctx := context.Background() + + transientInvalidAckError := errors.New(transientInvalidAckErrString) + permanentInvalidAckError := errors.New(permanentInvalidAckErrString) + + t.Run("NoResults", func(t *testing.T) { + // If the ackResMap is nil, then the resulting slices should be empty. + // nil maps here behave the same as if they were empty maps. + completed, retry := processResults(nil, nil, nil) + compareCompletedRetryLengths(t, completed, retry, 0, 0) + }) + + t.Run("NoErrorsNilAckResult", func(t *testing.T) { + // No errors so request should be completed even without an AckResult. + ackReqMap := map[string]*AckResult{ + "ackID": nil, + } + completed, retry := processResults(nil, ackReqMap, nil) + compareCompletedRetryLengths(t, completed, retry, 1, 0) + }) + + t.Run("NoErrors", func(t *testing.T) { + // No errors so AckResult should be completed with success. + r := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r, + } + completed, retry := processResults(nil, ackReqMap, nil) + compareCompletedRetryLengths(t, completed, retry, 1, 0) + + // We can obtain the AckStatus from AckResult if results are completed. + s, err := r.Get(ctx) + if err != nil { + t.Errorf("AckResult err: got %v, want nil", err) + } + if s != AcknowledgeStatusSuccess { + t.Errorf("got %v, want AcknowledgeStatusSuccess", s) + } + }) + + t.Run("PermanentErrorInvalidAckID", func(t *testing.T) { + r := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r, + } + errorsMap := map[string]error{ + "ackID1": permanentInvalidAckError, + } + completed, retry := processResults(nil, ackReqMap, errorsMap) + compareCompletedRetryLengths(t, completed, retry, 1, 0) + s, err := r.Get(ctx) + if err == nil { + t.Error("AckResult err: got nil, want err") + } + if s != AcknowledgeStatusInvalidAckID { + t.Errorf("got %v, want AcknowledgeStatusSuccess", s) + } + }) + + t.Run("TransientErrorRetry", func(t *testing.T) { + r := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r, + } + errorsMap := map[string]error{ + "ackID1": transientInvalidAckError, + } + completed, retry := processResults(nil, ackReqMap, errorsMap) + compareCompletedRetryLengths(t, completed, retry, 0, 1) + }) + + t.Run("UnknownError", func(t *testing.T) { + r := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r, + } + errorsMap := map[string]error{ + "ackID1": errors.New("unknown_error"), + } + completed, retry := processResults(nil, ackReqMap, errorsMap) + compareCompletedRetryLengths(t, completed, retry, 1, 0) + + s, err := r.Get(ctx) + if s != AcknowledgeStatusOther { + t.Errorf("got %v, want AcknowledgeStatusOther", s) + } + if err == nil || err.Error() != "unknown_error" { + t.Errorf("AckResult err: got %s, want unknown_error", err.Error()) + } + }) + + t.Run("PermissionDenied", func(t *testing.T) { + r := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r, + } + st := status.New(codes.PermissionDenied, "permission denied") + completed, retry := processResults(st, ackReqMap, nil) + compareCompletedRetryLengths(t, completed, retry, 1, 0) + s, err := r.Get(ctx) + if err == nil { + t.Error("AckResult err: got nil, want err") + } + if s != AcknowledgeStatusPermissionDenied { + t.Errorf("got %v, want AcknowledgeStatusPermissionDenied", s) + } + }) + + t.Run("FailedPrecondition", func(t *testing.T) { + r := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r, + } + st := status.New(codes.FailedPrecondition, "failed_precondition") + completed, retry := processResults(st, ackReqMap, nil) + compareCompletedRetryLengths(t, completed, retry, 1, 0) + s, err := r.Get(ctx) + if err == nil { + t.Error("AckResult err: got nil, want err") + } + if s != AcknowledgeStatusFailedPrecondition { + t.Errorf("got %v, want AcknowledgeStatusFailedPrecondition", s) + } + }) + + t.Run("OtherErrorStatus", func(t *testing.T) { + r := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r, + } + st := status.New(codes.OutOfRange, "out of range") + completed, retry := processResults(st, ackReqMap, nil) + compareCompletedRetryLengths(t, completed, retry, 1, 0) + s, err := r.Get(ctx) + if err == nil { + t.Error("AckResult err: got nil, want err") + } + if s != AcknowledgeStatusOther { + t.Errorf("got %v, want AcknowledgeStatusOther", s) + } + }) + + t.Run("MixedSuccessFailureAcks", func(t *testing.T) { + r1 := ipubsub.NewAckResult() + r2 := ipubsub.NewAckResult() + r3 := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r1, + "ackID2": r2, + "ackID3": r3, + } + errorsMap := map[string]error{ + "ackID1": permanentInvalidAckError, + "ackID2": transientInvalidAckError, + } + completed, retry := processResults(nil, ackReqMap, errorsMap) + compareCompletedRetryLengths(t, completed, retry, 2, 1) + // message with ackID "ackID1" fails + s, err := r1.Get(ctx) + if err == nil { + t.Error("r1: AckResult err: got nil, want err") + } + if s != AcknowledgeStatusInvalidAckID { + t.Errorf("r1: got %v, want AcknowledgeInvalidAckID", s) + } + + // message with ackID "ackID2" is to be retried + ctx2, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + _, err = r2.Get(ctx2) + if !errors.Is(err, context.DeadlineExceeded) { + t.Errorf("r2: AckResult.Get should timeout, got: %v", err) + } + + // message with ackID "ackID3" succeeds + s, err = r3.Get(ctx) + if err != nil { + t.Errorf("r3: AckResult err: got %v, want nil\n", err) + } + if s != AcknowledgeStatusSuccess { + t.Errorf("r3: got %v, want AcknowledgeStatusSuccess", s) + } + }) + + t.Run("RetriableErrorStatusReturnsRequestForRetrying", func(t *testing.T) { + for c := range exactlyOnceDeliveryTemporaryRetryErrors { + r := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r, + } + st := status.New(c, "") + completed, retry := processResults(st, ackReqMap, nil) + compareCompletedRetryLengths(t, completed, retry, 0, 1) + } + }) +} diff --git a/pubsub/message.go b/pubsub/message.go index 6cc7da8e516a..2797b6155eb4 100644 --- a/pubsub/message.go +++ b/pubsub/message.go @@ -171,10 +171,6 @@ func (ah *psAckHandler) OnNackWithResult() *AckResult { return ah.ackResult } -func (ah *psAckHandler) AckResult() *AckResult { - return ah.ackResult -} - func (ah *psAckHandler) done(ack bool) { if ah.calledDone { return diff --git a/pubsub/service.go b/pubsub/service.go index e8d636a01bb5..f9a1f18ea0f5 100644 --- a/pubsub/service.go +++ b/pubsub/service.go @@ -106,3 +106,19 @@ func (r *publishRetryer) Retry(err error) (pause time.Duration, shouldRetry bool } return r.defaultRetryer.Retry(err) } + +var ( + exactlyOnceDeliveryTemporaryRetryErrors = map[codes.Code]struct{}{ + codes.DeadlineExceeded: {}, + codes.ResourceExhausted: {}, + codes.Aborted: {}, + codes.Internal: {}, + codes.Unavailable: {}, + } +) + +// contains checks if grpc code v is in t, a set of retryable error codes. +func contains(v codes.Code, t map[codes.Code]struct{}) bool { + _, ok := t[v] + return ok +} diff --git a/pubsub/subscription.go b/pubsub/subscription.go index c82084245181..7036d51f6d0d 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -292,7 +292,7 @@ type SubscriptionConfig struct { // and will be ignored if sent in a request. TopicMessageRetentionDuration time.Duration - // EnableExcactlyOnceDelivery configures Pub/Sub to provide the following guarantees + // EnableExactlyOnceDelivery configures Pub/Sub to provide the following guarantees // for the delivery of a message with a given MessageID on this subscription: // // The message sent to a subscriber is guaranteed not to be resent