Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): add helper method for parsing ErrorInfos #6281

Merged
merged 26 commits into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fb01253
add AckResult and related methods
hongalex Jun 14, 2022
6327975
feat(pubsub): add AckWithResult and NackWithResult to message
hongalex Jun 16, 2022
17d96f9
feat(pubsub): add AckWithResult and NackWithResult to message
hongalex Jun 16, 2022
9f58224
add comments for AckResult and bring over AcknowledgeStatus from inte…
hongalex Jun 16, 2022
06f21cb
update function definition for IgnoreExported in tests
hongalex Jun 17, 2022
c170640
temporarily update internal/pubsub for samples test
hongalex Jun 18, 2022
9f5510c
add process results
hongalex Jun 24, 2022
9b6917c
change enum naming to AcknowledgeStatus
hongalex Jun 24, 2022
acdd6c4
remove extra enums in temp internal message.go
hongalex Jun 24, 2022
ae6d493
remove internal/pubsub/message.go
hongalex Jun 24, 2022
8ab1005
add process results
hongalex Jun 24, 2022
ca947ae
update process info with new enum names
hongalex Jun 24, 2022
1ac3a20
resolve merge conflict
hongalex Jun 24, 2022
f212ab9
add tests to process error info
hongalex Jun 28, 2022
f6262ed
add process results
hongalex Jun 24, 2022
4ab7345
update process info with new enum names
hongalex Jun 24, 2022
fa7b992
add process results
hongalex Jun 24, 2022
414b2c0
add tests to process error info
hongalex Jun 28, 2022
876dfbe
resolve merge with pubsub-exactly-once
hongalex Jun 29, 2022
931c1f4
clean up iterator from merge
hongalex Jun 29, 2022
10c8d93
cleanup comments
hongalex Jun 29, 2022
f6c707a
add list of retriable errors to test
hongalex Jun 29, 2022
3c97632
simplify testing of completed/retry slice lengths
hongalex Jun 30, 2022
7cd8bc5
remove getStatus/ackErrors methods
hongalex Jul 1, 2022
31963a0
address code review comments
hongalex Jul 7, 2022
02ac348
remove error string conversion step
hongalex Jul 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pubsub
import (
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
Expand All @@ -27,6 +28,7 @@ import (
"cloud.google.com/go/pubsub/internal/distribution"
gax "github.com/googleapis/gax-go/v2"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -662,3 +664,70 @@ func maxDuration(x, y time.Duration) time.Duration {
}
return y
}

func getStatus(err error) *status.Status {
st, ok := status.FromError(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If for what you are doing it is easier to work with the raw grpc error this is fine, but all of our error should be of type APIError which might be easier to work with, although I did not look too close here to see if it is a good match.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually since we're not using it now, I'll remove it. I'll need it in a later PR.

if !ok {
return nil
}
return st
}

// getAckErrors retrieves the metadata of an rpc error if available.
func getAckErrors(err error) map[string]string {
st := getStatus(err)
if st != nil {
for _, detail := range st.Details() {
info, _ := detail.(*errdetails.ErrorInfo)
return info.GetMetadata()
}
}
return nil
}

// processResults processes AckResults by referring to errorStatus and errorsMap.
// The errors returned by the server in `errorStatus` or in `errorsMap`
// are used to complete the AckResults in `ackResMap` (with a success
// or error) or to return requests for further retries.
// Logic is derived from python-pubsub: https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py#L161-L220
func processResults(errorStatus *status.Status, ackResMap map[string]*AckResult, errorsMap map[string]string) ([]*AckResult, []*AckResult) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should errorMap be map[string]error? Seems like you return some of these back to error. Would be good to perserve the orginal error as it could be a part of an error wrapping chain.

Also maybe errorsByAckID?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Along the same lines I wonder if errorStatus should be an error as it gets "re-cast" back to an error but potentially loses some details in the process

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the map's name and value here to your suggestion to avoid the extra string conversion.

I think errorStatus should remain as is. In order to initialize errorsByAckID, I will need to convert the original grpc error to a status.Status (or APIError), and passing in an error here would mean another re-cast.

Copy link
Member Author

@hongalex hongalex Jul 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, actually I was wrong. The map should be map[string]string since that's what APIError.Metadata returns. I'll update this in a future PR.

var completedResults, retryResults []*AckResult
for ackID, res := range ackResMap {
// Handle special errors returned for ack/modack RPCs via the ErrorInfo
// sidecar metadata when exactly-once delivery is enabled.
if exactlyOnceErrStr, ok := errorsMap[ackID]; ok {
if strings.HasPrefix(exactlyOnceErrStr, "TRANSIENT_") {
hongalex marked this conversation as resolved.
Show resolved Hide resolved
retryResults = append(retryResults, res)
} else {
exactlyOnceErr := fmt.Errorf(exactlyOnceErrStr)
if exactlyOnceErrStr == "PERMANENT_FAILURE_INVALID_ACK_ID" {
ipubsub.SetAckResult(res, AcknowledgeStatusInvalidAckID, exactlyOnceErr)
} else {
ipubsub.SetAckResult(res, AcknowledgeStatusOther, exactlyOnceErr)
}
completedResults = append(completedResults, res)
}
} else if errorStatus != nil && contains(errorStatus.Code(), exactlyOnceDeliveryTemporaryRetryErrors) {
retryResults = append(retryResults, ackResMap[ackID])
} else if errorStatus != nil {
// Other gRPC errors are not retried.
switch errorStatus.Code() {
case codes.PermissionDenied:
ipubsub.SetAckResult(res, AcknowledgeStatusPermissionDenied, errorStatus.Err())
case codes.FailedPrecondition:
ipubsub.SetAckResult(res, AcknowledgeStatusFailedPrecondition, errorStatus.Err())
default:
ipubsub.SetAckResult(res, AcknowledgeStatusOther, errorStatus.Err())
}
completedResults = append(completedResults, res)
} else if res != nil {
// Since no error occurred, requests with AckResults are completed successfully.
ipubsub.SetAckResult(res, AcknowledgeStatusSuccess, nil)
completedResults = append(completedResults, res)
} else {
// All other requests are considered completed.
completedResults = append(completedResults, res)
}
}
return completedResults, retryResults
}
205 changes: 205 additions & 0 deletions pubsub/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"testing"
"time"

ipubsub "cloud.google.com/go/internal/pubsub"
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsub/pstest"
"google.golang.org/api/option"
Expand Down Expand Up @@ -618,3 +619,207 @@ func TestPingStreamAckDeadline(t *testing.T) {
}
iter.eoMu.RUnlock()
}

func compareCompletedRetryLengths(t *testing.T, completed, retry []*AckResult, wantCompleted, wantRetry int) {
hongalex marked this conversation as resolved.
Show resolved Hide resolved
if l := len(completed); l != wantCompleted {
t.Errorf("completed slice length is %d, want %d\n", l, wantCompleted)
hongalex marked this conversation as resolved.
Show resolved Hide resolved
}
if l := len(retry); l != wantRetry {
t.Errorf("retry slice length is %d, want %d\n", l, wantRetry)
}
}

func TestExactlyOnceProcessRequests(t *testing.T) {
ctx := context.Background()

t.Run("NoResults", func(t *testing.T) {
// If the ackResMap is nil, then the resulting slices should be empty.
// nil maps here behave the same as if they were empty maps.
completed, retry := processResults(nil, nil, nil)
compareCompletedRetryLengths(t, completed, retry, 0, 0)
})

t.Run("NoErrorsNilAckResult", func(t *testing.T) {
// No errors so request should be completed even without an AckResult.
ackReqMap := map[string]*AckResult{
"ackID": nil,
}
completed, retry := processResults(nil, ackReqMap, nil)
compareCompletedRetryLengths(t, completed, retry, 1, 0)
})

t.Run("NoErrors", func(t *testing.T) {
// No errors so AckResult should be completed with success.
r := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r,
}
completed, retry := processResults(nil, ackReqMap, nil)
compareCompletedRetryLengths(t, completed, retry, 1, 0)

// We can obtain the AckStatus from AckResult if results are completed.
s, err := r.Get(ctx)
if err != nil {
t.Errorf("got err from AckResult: %v", err)
}
if s != AcknowledgeStatusSuccess {
t.Errorf("expected AcknowledgeStatusSuccess, got %v", s)
hongalex marked this conversation as resolved.
Show resolved Hide resolved
}
})

t.Run("PermanentErrorInvalidAckID", func(t *testing.T) {
r := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r,
}
errorsMap := map[string]string{
"ackID1": "PERMANENT_FAILURE_INVALID_ACK_ID",
}
completed, retry := processResults(nil, ackReqMap, errorsMap)
compareCompletedRetryLengths(t, completed, retry, 1, 0)
s, err := r.Get(ctx)
if err == nil {
t.Error("expected error from AckResult, got nil\n")
}
if s != AcknowledgeStatusInvalidAckID {
t.Errorf("expected AcknowledgeStatusSuccess, got %v", s)
}
})

t.Run("TransientErrorRetry", func(t *testing.T) {
r := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r,
}
errorsMap := map[string]string{
"ackID1": "TRANSIENT_FAILURE_INVALID_ACK_ID",
}
completed, retry := processResults(nil, ackReqMap, errorsMap)
compareCompletedRetryLengths(t, completed, retry, 0, 1)
})

t.Run("UnknownError", func(t *testing.T) {
r := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r,
}
errorsMap := map[string]string{
"ackID1": "unknown_error",
}
completed, retry := processResults(nil, ackReqMap, errorsMap)
compareCompletedRetryLengths(t, completed, retry, 1, 0)

s, err := r.Get(ctx)
if s != AcknowledgeStatusOther {
t.Errorf("expected AcknowledgeStatusOther, got %v", s)
}
if err == nil || err.Error() != "unknown_error" {
t.Errorf("expected unknown_error, got: %s", err.Error())
}
})

t.Run("PermissionDenied", func(t *testing.T) {
r := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r,
}
st := status.New(codes.PermissionDenied, "permission denied")
completed, retry := processResults(st, ackReqMap, nil)
compareCompletedRetryLengths(t, completed, retry, 1, 0)
s, err := r.Get(ctx)
if err == nil {
t.Error("expected error from AckResult, got nil\n")
}
if s != AcknowledgeStatusPermissionDenied {
t.Errorf("expected AcknowledgeStatusPermissionDenied, got %v", s)
}
})

t.Run("FailedPrecondition", func(t *testing.T) {
r := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r,
}
st := status.New(codes.FailedPrecondition, "failed_precondition")
completed, retry := processResults(st, ackReqMap, nil)
compareCompletedRetryLengths(t, completed, retry, 1, 0)
s, err := r.Get(ctx)
if err == nil {
t.Error("expected error from AckResult, got nil\n")
}
if s != AcknowledgeStatusFailedPrecondition {
t.Errorf("expected AcknowledgeStatusFailedPrecondition, got %v", s)
}
})

t.Run("OtherErrorStatus", func(t *testing.T) {
r := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r,
}
st := status.New(codes.OutOfRange, "out of range")
completed, retry := processResults(st, ackReqMap, nil)
compareCompletedRetryLengths(t, completed, retry, 1, 0)
s, err := r.Get(ctx)
if err == nil {
t.Error("expected error from AckResult, got nil\n")
}
if s != AcknowledgeStatusOther {
t.Errorf("expected AcknowledgeStatusOther, got %v", s)
}
})

t.Run("MixedSuccessFailureAcks", func(t *testing.T) {
r1 := ipubsub.NewAckResult()
r2 := ipubsub.NewAckResult()
r3 := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r1,
"ackID2": r2,
"ackID3": r3,
}
errorsMap := map[string]string{
"ackID1": "PERMANENT_FAILURE_INVALID_ACK_ID",
"ackID2": "TRANSIENT_FAILURE_INVALID_ACK_ID",
}
completed, retry := processResults(nil, ackReqMap, errorsMap)
compareCompletedRetryLengths(t, completed, retry, 2, 1)
// message with ackID "ackID1" fails
s, err := r1.Get(ctx)
if err == nil {
t.Error("r1: expected error from AckResult, got nil\n")
}
if s != AcknowledgeStatusInvalidAckID {
t.Errorf("r1: expected AcknowledgeInvalidAckID, got %v", s)
}

// message with ackID "ackID2" is to be retried
ctx2, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
_, err = r2.Get(ctx2)
if err.Error() != "context deadline exceeded" {
hongalex marked this conversation as resolved.
Show resolved Hide resolved
t.Errorf("r2: expected AckResult.Get to timeout, got: %v", err)
}

// message with ackID "ackID3" succeeds
s, err = r3.Get(ctx)
if err != nil {
t.Errorf("r3: got err from AckResult.Get: %v\n", err)
}
if s != AcknowledgeStatusSuccess {
t.Errorf("r3: expected AcknowledgeStatusSuccess, got %v", s)
}
})

t.Run("RetriableErrorStatusReturnsRequestForRetrying", func(t *testing.T) {
for _, c := range exactlyOnceDeliveryTemporaryRetryErrors {
r := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r,
}
st := status.New(c, "")
completed, retry := processResults(st, ackReqMap, nil)
compareCompletedRetryLengths(t, completed, retry, 0, 1)
}
})
}
4 changes: 0 additions & 4 deletions pubsub/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,6 @@ func (ah *psAckHandler) OnNackWithResult() *AckResult {
return ah.ackResult
}

func (ah *psAckHandler) AckResult() *AckResult {
return ah.ackResult
}

func (ah *psAckHandler) done(ack bool) {
if ah.calledDone {
return
Expand Down
21 changes: 21 additions & 0 deletions pubsub/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,24 @@ func (r *publishRetryer) Retry(err error) (pause time.Duration, shouldRetry bool
}
return r.defaultRetryer.Retry(err)
}

var (
exactlyOnceDeliveryTemporaryRetryErrors = []codes.Code{
hongalex marked this conversation as resolved.
Show resolved Hide resolved
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Aborted,
codes.Internal,
codes.Unavailable,
}
)

// contains checks if grpc code v is in t, a slice of retryable error codes.
// Consider replacing with generics's slice.Contains once go 1.18 is the min version.
func contains(v codes.Code, t []codes.Code) bool {
for _, c := range t {
if v == c {
return true
}
}
return false
}