-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(pubsub): complete AckResult for exactly once #6387
feat(pubsub): complete AckResult for exactly once #6387
Conversation
for _, ackID := range toSend { | ||
resultsByAckID[ackID] = m[ackID] | ||
} | ||
// TODO(hongalex): retry the ackIDs with transient retriable errors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming this is for the mentioned next PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it is, I just left it as a todo for now.
AckIds: toSend, | ||
}) | ||
it.eoMu.RLock() | ||
exactlyOnceDelivery := it.enableExactlyOnceDelivery |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually brings up an interesting point for me - if the EO semantics are enabled, and we receive messages and they're queued up for acks, and then EO is disabled... do we do the existing messages using the EO semantics, or switch immediately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps @maheshgattani can comment on this. I don't think switching exactly once delivery on/off is a common use case, so either behavior should be ok here. It's significantly easier to keep track of this through the actual on/off status of exactly once (through StreamingPullResponse) rather than keeping track of exactly once for every message.
For my own curiosity, what's left in order to support exactly once delivery for pubsub messages with the go client? |
@gnagel This PR needs to be merged, followed by retrying of temporary failures. After that we can start merging everything. |
Fantastic, thank you for working on this. I'm very excited to see this rollout and use it in production 😎 |
* feat(pubsub): prepare iterator for exactly once (#6040) * feat(pubsub): read exactly once for SubscriptionProperties * rename vars to be specific this is exactly once delivery * feat(pubsub): send stream ack deadline seconds on exactly once change #6157 (#6162) * add RWMutex for guarding exactly once bool * feat(pubsub): send stream ack deadline seconds on exactly once change * remove extra test * feat(pubsub): add AckWithResult and NackWithResult to message (#6201) * add AckResult and related methods * feat(pubsub): add AckWithResult and NackWithResult to message * feat(pubsub): add AckWithResult and NackWithResult to message * add comments for AckResult and bring over AcknowledgeStatus from internal * update function definition for IgnoreExported in tests * temporarily update internal/pubsub for samples test * change enum naming to AcknowledgeStatus * remove extra enums in temp internal message.go * remove internal/pubsub/message.go * fix style issues with variadic function options * add back comment format to exported const * keep track of AckResults if exactly once is enabled * feat(pubsub): add helper method for parsing ErrorInfos (#6281) * add AckResult and related methods * feat(pubsub): add AckWithResult and NackWithResult to message * feat(pubsub): add AckWithResult and NackWithResult to message * add comments for AckResult and bring over AcknowledgeStatus from internal * update function definition for IgnoreExported in tests * temporarily update internal/pubsub for samples test * add process results * change enum naming to AcknowledgeStatus * remove extra enums in temp internal message.go * remove internal/pubsub/message.go * add process results * update process info with new enum names * add tests to process error info * add process results * update process info with new enum names * add process results * add tests to process error info * clean up iterator from merge * cleanup comments * add list of retriable errors to test * simplify testing of completed/retry slice lengths * remove getStatus/ackErrors methods * address code review comments * remove error string conversion step * feat(pubsub): complete AckResult for exactly once (#6387) * refactor sendAck to pipe errors to AckResult map * rewrite sendAck/sendModAck for exactly once * add AckResult to list of uncompared methods * use ackResultWithID in all locations * feat(pubsub): retry temporary failures for ack/modacks (#6485) * retry acks in goroutine * retry acks/modacks with transient errors * add retry test * add nack tests and support shorter timeouts * add integration tests * remove extra comment * add commnets to ack/modack methods in iterator * remove transient invalid ack id error string * reduce number of mutex locks * pass in StreamAckDeadline seconds for streaming pull requests in fake_test * fix lint issues * add changes to internal/pubsub/message * implement default ack handler functions in lite * use pubsub package ack result * use pinned library for pubsublite * resolve all lite Ack/NackWithResult to success
Pipe the output of Ack/Modack operations to AckResult that are returned when users call Ack/NackWithResult. This involves removing a callback and makes
sendAck
andsendModAck
have repeated code. However, removing the callback also makes it more readable. In addition, we don't infinitely retry these methods until they fail anymore, since that will be handled by a later PR.To support this, this PR also includes
processResults
back to using a map[string]string for ackID->error strings (since that's what apiError metadata field returns)processResults
andpending
maps to use a new struct that contains both ackIDs and AckResult (for retrying later on)