Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): exactly-once delivery #9436

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -473,9 +473,8 @@ TEST_F(SubscriberIntegrationTest, UnifiedCredentials) {
}

TEST_F(SubscriberIntegrationTest, ExactlyOnce) {
auto options =
google::cloud::Options{}.set<google::cloud::UnifiedCredentialsOption>(
google::cloud::MakeGoogleDefaultCredentials());
auto options = Options{}.set<google::cloud::UnifiedCredentialsOption>(
dbolduc marked this conversation as resolved.
Show resolved Hide resolved
MakeGoogleDefaultCredentials());
auto const using_emulator =
internal::GetEnv("PUBSUB_EMULATOR_HOST").has_value();
if (using_emulator) {
Expand Down Expand Up @@ -505,21 +504,23 @@ TEST_F(SubscriberIntegrationTest, ExactlyOnce) {
SCOPED_TRACE("Search for message " + m.message_id());
std::unique_lock<std::mutex> lk(mu);
auto i = ids.find(m.message_id());
// Remember that Cloud Pub/Sub has "at least once" semantics, so a dup is
// perfectly possible, in that case the message would not be in the map
// of pending ids.
if (i == ids.end()) return;
// The first time just NACK the message to exercise that path, we expect
// Cloud Pub/Sub to retry.
ASSERT_FALSE(i == ids.end());
if (i->second == 0) {
std::move(h).nack();
++i->second;
lk.unlock();
std::move(h).nack().then([id = m.message_id()](auto f) {
auto status = f.get();
ASSERT_STATUS_OK(status) << " nack() failed for id=" << id;
});
return;
}
ids.erase(i);
if (ids.empty()) ids_empty.set_value();
lk.unlock();
std::move(h).ack();
std::move(h).ack().then([id = m.message_id()](auto f) {
auto status = f.get();
ASSERT_STATUS_OK(status) << " ack() failed for id=" << id;
});
};

auto result = subscriber.Subscribe(callback);
Expand Down
6 changes: 0 additions & 6 deletions google/cloud/pubsub/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ class Subscriber {
* messages. For example, because there was an unrecoverable error trying
* to receive data. Calling `.cancel()` in this object will (eventually)
* terminate the session and satisfy the future.
*
* [std-function-link]:
* https://en.cppreference.com/w/cpp/utility/functional/function
*/
future<Status> Subscribe(ApplicationCallback cb) {
return connection_->Subscribe({std::move(cb)});
Expand Down Expand Up @@ -152,9 +149,6 @@ class Subscriber {
* messages. For example, because there was an unrecoverable error trying
* to receive data. Calling `.cancel()` in this object will (eventually)
* terminate the session and satisfy the future.
*
* [std-function-link]:
* https://en.cppreference.com/w/cpp/utility/functional/function
*/
future<Status> Subscribe(ExactlyOnceApplicationCallback cb) {
dbolduc marked this conversation as resolved.
Show resolved Hide resolved
return connection_->ExactlyOnceSubscribe({std::move(cb)});
Expand Down
10 changes: 8 additions & 2 deletions google/cloud/pubsub/subscriber_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,20 @@ class SubscriberConnection {
ApplicationCallback callback;
};

/// Defines the interface for `Subscriber::Subscribe()`
/// Defines the interface for `Subscriber::Subscribe(ApplicationCallback)`
virtual future<Status> Subscribe(SubscribeParams p);

struct ExactlyOnceSubscribeParams {
ExactlyOnceApplicationCallback callback;
};

/// Defines the interface for `Subscriber::Subscribe()`
/**
* Defines the interface for
* `Subscriber::Subscribe(ExactlyOnceApplicationCallback)`.
*
* We use a different name for this function (as opposed to an overload) to
* simplify the use is mocks.
dbolduc marked this conversation as resolved.
Show resolved Hide resolved
*/
virtual future<Status> ExactlyOnceSubscribe(ExactlyOnceSubscribeParams p);
};

Expand Down