-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(pubsub): add helper method for parsing ErrorInfos #6281
feat(pubsub): add helper method for parsing ErrorInfos #6281
Conversation
pubsub/iterator.go
Outdated
@@ -662,3 +664,70 @@ func maxDuration(x, y time.Duration) time.Duration { | |||
} | |||
return y | |||
} | |||
|
|||
func getStatus(err error) *status.Status { | |||
st, ok := status.FromError(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If for what you are doing it is easier to work with the raw grpc error this is fine, but all of our error should be of type APIError which might be easier to work with, although I did not look too close here to see if it is a good match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually since we're not using it now, I'll remove it. I'll need it in a later PR.
pubsub/iterator.go
Outdated
// are used to complete the AckResults in `ackResMap` (with a success | ||
// or error) or to return requests for further retries. | ||
// Logic is derived from python-pubsub: https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py#L161-L220 | ||
func processResults(errorStatus *status.Status, ackResMap map[string]*AckResult, errorsMap map[string]string) ([]*AckResult, []*AckResult) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should errorMap be map[string]error? Seems like you return some of these back to error. Would be good to perserve the orginal error as it could be a part of an error wrapping chain.
Also maybe errorsByAckID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Along the same lines I wonder if errorStatus should be an error as it gets "re-cast" back to an error but potentially loses some details in the process
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the map's name and value here to your suggestion to avoid the extra string conversion.
I think errorStatus
should remain as is. In order to initialize errorsByAckID
, I will need to convert the original grpc error to a status.Status
(or APIError
), and passing in an error
here would mean another re-cast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, actually I was wrong. The map should be map[string]string
since that's what APIError.Metadata
returns. I'll update this in a future PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few minor things
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates!
* feat(pubsub): prepare iterator for exactly once (#6040) * feat(pubsub): read exactly once for SubscriptionProperties * rename vars to be specific this is exactly once delivery * feat(pubsub): send stream ack deadline seconds on exactly once change #6157 (#6162) * add RWMutex for guarding exactly once bool * feat(pubsub): send stream ack deadline seconds on exactly once change * remove extra test * feat(pubsub): add AckWithResult and NackWithResult to message (#6201) * add AckResult and related methods * feat(pubsub): add AckWithResult and NackWithResult to message * feat(pubsub): add AckWithResult and NackWithResult to message * add comments for AckResult and bring over AcknowledgeStatus from internal * update function definition for IgnoreExported in tests * temporarily update internal/pubsub for samples test * change enum naming to AcknowledgeStatus * remove extra enums in temp internal message.go * remove internal/pubsub/message.go * fix style issues with variadic function options * add back comment format to exported const * keep track of AckResults if exactly once is enabled * feat(pubsub): add helper method for parsing ErrorInfos (#6281) * add AckResult and related methods * feat(pubsub): add AckWithResult and NackWithResult to message * feat(pubsub): add AckWithResult and NackWithResult to message * add comments for AckResult and bring over AcknowledgeStatus from internal * update function definition for IgnoreExported in tests * temporarily update internal/pubsub for samples test * add process results * change enum naming to AcknowledgeStatus * remove extra enums in temp internal message.go * remove internal/pubsub/message.go * add process results * update process info with new enum names * add tests to process error info * add process results * update process info with new enum names * add process results * add tests to process error info * clean up iterator from merge * cleanup comments * add list of retriable errors to test * simplify testing of completed/retry slice lengths * remove getStatus/ackErrors methods * address code review comments * remove error string conversion step * feat(pubsub): complete AckResult for exactly once (#6387) * refactor sendAck to pipe errors to AckResult map * rewrite sendAck/sendModAck for exactly once * add AckResult to list of uncompared methods * use ackResultWithID in all locations * feat(pubsub): retry temporary failures for ack/modacks (#6485) * retry acks in goroutine * retry acks/modacks with transient errors * add retry test * add nack tests and support shorter timeouts * add integration tests * remove extra comment * add commnets to ack/modack methods in iterator * remove transient invalid ack id error string * reduce number of mutex locks * pass in StreamAckDeadline seconds for streaming pull requests in fake_test * fix lint issues * add changes to internal/pubsub/message * implement default ack handler functions in lite * use pubsub package ack result * use pinned library for pubsublite * resolve all lite Ack/NackWithResult to success
This PR corresponds with the 6th point in the PR sequencing doc. Logic is derived from Python's
process_requests
function.