diff --git a/google/cloud/pubsub/CMakeLists.txt b/google/cloud/pubsub/CMakeLists.txt index a43ec86e4ea78..b8c2e72c025f0 100644 --- a/google/cloud/pubsub/CMakeLists.txt +++ b/google/cloud/pubsub/CMakeLists.txt @@ -42,6 +42,8 @@ add_library( backoff_policy.h connection_options.cc connection_options.h + exactly_once_ack_handler.cc + exactly_once_ack_handler.h internal/batch_sink.h internal/batching_publisher_connection.cc internal/batching_publisher_connection.h @@ -52,8 +54,6 @@ add_library( internal/default_batch_sink.h internal/defaults.cc internal/defaults.h - internal/exactly_once_ack_handler.cc - internal/exactly_once_ack_handler.h internal/exactly_once_policies.cc internal/exactly_once_policies.h internal/extend_leases_with_retry.cc @@ -185,6 +185,7 @@ target_sources( google_cloud_cpp_pubsub_mocks INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/mocks/mock_ack_handler.h + ${CMAKE_CURRENT_SOURCE_DIR}/mocks/mock_exactly_once_ack_handler.h ${CMAKE_CURRENT_SOURCE_DIR}/mocks/mock_publisher_connection.h ${CMAKE_CURRENT_SOURCE_DIR}/mocks/mock_schema_admin_connection.h ${CMAKE_CURRENT_SOURCE_DIR}/mocks/mock_subscription_admin_connection.h @@ -243,10 +244,10 @@ function (google_cloud_cpp_pubsub_client_define_tests) set(pubsub_client_unit_tests # cmake-format: sort ack_handler_test.cc + exactly_once_ack_handler_test.cc internal/batching_publisher_connection_test.cc internal/default_batch_sink_test.cc internal/defaults_test.cc - internal/exactly_once_ack_handler_test.cc internal/exactly_once_policies_test.cc internal/extend_leases_with_retry_test.cc internal/flow_controlled_publisher_connection_test.cc diff --git a/google/cloud/pubsub/application_callback.h b/google/cloud/pubsub/application_callback.h index 96c5482ac4339..6f0acc171800c 100644 --- a/google/cloud/pubsub/application_callback.h +++ b/google/cloud/pubsub/application_callback.h @@ -24,6 +24,7 @@ namespace pubsub { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN class Message; class AckHandler; +class ExactlyOnceAckHandler; /** * Defines the interface for application-level callbacks. @@ -34,6 +35,18 @@ class AckHandler; */ using ApplicationCallback = std::function; +/** + * Defines the interface for application-level callbacks with exactly-once + * delivery. + * + * Applications provide a callable compatible with this type to receive + * messages. They acknowledge (or reject) messages using + * `ExactlyOnceAckHandler`. This is a move-only type to support asynchronous + * acknowledgments. + */ +using ExactlyOnceApplicationCallback = + std::function; + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace pubsub } // namespace cloud diff --git a/google/cloud/pubsub/doc/subscriber-mock.dox b/google/cloud/pubsub/doc/subscriber-mock.dox index d8e0aa283a993..42abd87f95365 100644 --- a/google/cloud/pubsub/doc/subscriber-mock.dox +++ b/google/cloud/pubsub/doc/subscriber-mock.dox @@ -55,6 +55,12 @@ Finally we present the full code for this example: @snippet mock_subscriber.cc all +### Mocking with Exactly-Once Delivery + +To mock exactly once delivery is very similar: + +@snippet mock_subscriber.cc exactly-once + [googletest-link]: https://github.com/google/googletest [googlemock-link]: https://github.com/google/googletest/tree/main/googlemock diff --git a/google/cloud/pubsub/internal/exactly_once_ack_handler.cc b/google/cloud/pubsub/exactly_once_ack_handler.cc similarity index 91% rename from google/cloud/pubsub/internal/exactly_once_ack_handler.cc rename to google/cloud/pubsub/exactly_once_ack_handler.cc index e8d3b76851108..1e2c52146df30 100644 --- a/google/cloud/pubsub/internal/exactly_once_ack_handler.cc +++ b/google/cloud/pubsub/exactly_once_ack_handler.cc @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "google/cloud/pubsub/internal/exactly_once_ack_handler.h" +#include "google/cloud/pubsub/exactly_once_ack_handler.h" #include namespace google { namespace cloud { -namespace pubsub_internal { +namespace pubsub { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN static_assert(!std::is_copy_assignable::value, @@ -36,6 +36,6 @@ ExactlyOnceAckHandler::~ExactlyOnceAckHandler() { ExactlyOnceAckHandler::Impl::~Impl() = default; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END -} // namespace pubsub_internal +} // namespace pubsub } // namespace cloud } // namespace google diff --git a/google/cloud/pubsub/internal/exactly_once_ack_handler.h b/google/cloud/pubsub/exactly_once_ack_handler.h similarity index 94% rename from google/cloud/pubsub/internal/exactly_once_ack_handler.h rename to google/cloud/pubsub/exactly_once_ack_handler.h index 5fc0d4cb9b470..2eeed99fde99d 100644 --- a/google/cloud/pubsub/internal/exactly_once_ack_handler.h +++ b/google/cloud/pubsub/exactly_once_ack_handler.h @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_EXACTLY_ONCE_ACK_HANDLER_H -#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_EXACTLY_ONCE_ACK_HANDLER_H +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_EXACTLY_ONCE_ACK_HANDLER_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_EXACTLY_ONCE_ACK_HANDLER_H #include "google/cloud/pubsub/version.h" #include "google/cloud/future.h" @@ -22,7 +22,7 @@ namespace google { namespace cloud { -namespace pubsub_internal { +namespace pubsub { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN /** @@ -135,8 +135,8 @@ class ExactlyOnceAckHandler { }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END -} // namespace pubsub_internal +} // namespace pubsub } // namespace cloud } // namespace google -#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_EXACTLY_ONCE_ACK_HANDLER_H +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_EXACTLY_ONCE_ACK_HANDLER_H diff --git a/google/cloud/pubsub/internal/exactly_once_ack_handler_test.cc b/google/cloud/pubsub/exactly_once_ack_handler_test.cc similarity index 79% rename from google/cloud/pubsub/internal/exactly_once_ack_handler_test.cc rename to google/cloud/pubsub/exactly_once_ack_handler_test.cc index a9115dc57ff87..5fb40ed065bdf 100644 --- a/google/cloud/pubsub/internal/exactly_once_ack_handler_test.cc +++ b/google/cloud/pubsub/exactly_once_ack_handler_test.cc @@ -12,39 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "google/cloud/pubsub/internal/exactly_once_ack_handler.h" +#include "google/cloud/pubsub/exactly_once_ack_handler.h" +#include "google/cloud/pubsub/mocks/mock_exactly_once_ack_handler.h" #include "google/cloud/testing_util/status_matchers.h" #include "absl/memory/memory.h" #include namespace google { namespace cloud { -namespace pubsub_internal { +namespace pubsub { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace { +using ::google::cloud::pubsub_mocks::MockExactlyOnceAckHandler; using ::google::cloud::testing_util::StatusIs; using ::testing::ByMove; using ::testing::Return; -// TODO(#9327) move this class to the public mocks -/** - * A googlemock-based mock for - * [pubsub::ExactlyOnceAckHandler::Impl][mocked-link] - * - * [mocked-link]: @ref - * google::cloud::pubsub_internal::ExactlyOnceAckHandler::Impl - * - * @see @ref subscriber-exactly-once-mock for an example using this class. - */ -class MockExactlyOnceAckHandler : public ExactlyOnceAckHandler::Impl { - public: - MOCK_METHOD(future, ack, (), (override)); - MOCK_METHOD(future, nack, (), (override)); - MOCK_METHOD(std::string, ack_id, ()); - MOCK_METHOD(std::int32_t, delivery_attempt, (), (const, override)); -}; - TEST(AckHandlerTest, AutoNack) { auto mock = absl::make_unique(); EXPECT_CALL(*mock, nack()) @@ -96,6 +80,6 @@ TEST(AckHandlerTest, Nack) { } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END -} // namespace pubsub_internal +} // namespace pubsub } // namespace cloud } // namespace google diff --git a/google/cloud/pubsub/google_cloud_cpp_pubsub.bzl b/google/cloud/pubsub/google_cloud_cpp_pubsub.bzl index 5dd31dfccfe60..a56fe3ea13e1e 100644 --- a/google/cloud/pubsub/google_cloud_cpp_pubsub.bzl +++ b/google/cloud/pubsub/google_cloud_cpp_pubsub.bzl @@ -21,13 +21,13 @@ google_cloud_cpp_pubsub_hdrs = [ "application_callback.h", "backoff_policy.h", "connection_options.h", + "exactly_once_ack_handler.h", "internal/batch_sink.h", "internal/batching_publisher_connection.h", "internal/containing_publisher_connection.h", "internal/create_channel.h", "internal/default_batch_sink.h", "internal/defaults.h", - "internal/exactly_once_ack_handler.h", "internal/exactly_once_policies.h", "internal/extend_leases_with_retry.h", "internal/flow_controlled_publisher_connection.h", @@ -85,11 +85,11 @@ google_cloud_cpp_pubsub_hdrs = [ google_cloud_cpp_pubsub_srcs = [ "ack_handler.cc", "connection_options.cc", + "exactly_once_ack_handler.cc", "internal/batching_publisher_connection.cc", "internal/create_channel.cc", "internal/default_batch_sink.cc", "internal/defaults.cc", - "internal/exactly_once_ack_handler.cc", "internal/exactly_once_policies.cc", "internal/extend_leases_with_retry.cc", "internal/flow_controlled_publisher_connection.cc", diff --git a/google/cloud/pubsub/google_cloud_cpp_pubsub_mocks.bzl b/google/cloud/pubsub/google_cloud_cpp_pubsub_mocks.bzl index ba5c118f170bb..3a79eaea55bf2 100644 --- a/google/cloud/pubsub/google_cloud_cpp_pubsub_mocks.bzl +++ b/google/cloud/pubsub/google_cloud_cpp_pubsub_mocks.bzl @@ -18,6 +18,7 @@ google_cloud_cpp_pubsub_mocks_hdrs = [ "mocks/mock_ack_handler.h", + "mocks/mock_exactly_once_ack_handler.h", "mocks/mock_publisher_connection.h", "mocks/mock_schema_admin_connection.h", "mocks/mock_subscription_admin_connection.h", diff --git a/google/cloud/pubsub/integration_tests/subscriber_integration_test.cc b/google/cloud/pubsub/integration_tests/subscriber_integration_test.cc index 935093813b237..2571a13384c8e 100644 --- a/google/cloud/pubsub/integration_tests/subscriber_integration_test.cc +++ b/google/cloud/pubsub/integration_tests/subscriber_integration_test.cc @@ -60,6 +60,8 @@ class SubscriberIntegrationTest project_id, pubsub_testing::RandomSubscriptionId(generator_)); ordered_subscription_ = Subscription( project_id, pubsub_testing::RandomSubscriptionId(generator_)); + exactly_once_subscription_ = Subscription( + project_id, pubsub_testing::RandomSubscriptionId(generator_)); auto topic_admin = TopicAdminClient(MakeTopicAdminConnection()); auto subscription_admin = @@ -68,11 +70,13 @@ class SubscriberIntegrationTest auto topic_metadata = topic_admin.CreateTopic(TopicBuilder(topic_)); ASSERT_THAT(topic_metadata, AnyOf(IsOk(), StatusIs(StatusCode::kAlreadyExists))); + auto subscription_metadata = subscription_admin.CreateSubscription( topic_, subscription_, SubscriptionBuilder{}.set_ack_deadline(std::chrono::seconds(10))); ASSERT_THAT(subscription_metadata, AnyOf(IsOk(), StatusIs(StatusCode::kAlreadyExists))); + auto ordered_subscription_metadata = subscription_admin.CreateSubscription( topic_, ordered_subscription_, SubscriptionBuilder{} @@ -80,6 +84,15 @@ class SubscriberIntegrationTest .enable_message_ordering(true)); ASSERT_THAT(ordered_subscription_metadata, AnyOf(IsOk(), StatusIs(StatusCode::kAlreadyExists))); + + auto exactly_once_subscription_metadata = + subscription_admin.CreateSubscription( + topic_, exactly_once_subscription_, + SubscriptionBuilder{} + .set_ack_deadline(std::chrono::seconds(30)) + .enable_exactly_once_delivery(true)); + ASSERT_THAT(ordered_subscription_metadata, + AnyOf(IsOk(), StatusIs(StatusCode::kAlreadyExists))); } void TearDown() override { @@ -87,6 +100,10 @@ class SubscriberIntegrationTest auto subscription_admin = SubscriptionAdminClient(MakeSubscriptionAdminConnection()); + auto delete_exactly_once_subscription = + subscription_admin.DeleteSubscription(exactly_once_subscription_); + EXPECT_THAT(delete_exactly_once_subscription, + AnyOf(IsOk(), StatusIs(StatusCode::kNotFound))); auto delete_ordered_subscription = subscription_admin.DeleteSubscription(ordered_subscription_); EXPECT_THAT(delete_ordered_subscription, @@ -103,6 +120,7 @@ class SubscriberIntegrationTest Topic topic_ = Topic("unused", "unused"); Subscription subscription_ = Subscription("unused", "unused"); Subscription ordered_subscription_ = Subscription("unused", "unused"); + Subscription exactly_once_subscription_ = Subscription("unused", "unused"); }; void TestRoundtrip(pubsub::Publisher publisher, pubsub::Subscriber subscriber) { @@ -439,8 +457,7 @@ TEST_F(SubscriberIntegrationTest, PublishOrdered) { TEST_F(SubscriberIntegrationTest, UnifiedCredentials) { auto options = - google::cloud::Options{}.set( - google::cloud::MakeGoogleDefaultCredentials()); + Options{}.set(MakeGoogleDefaultCredentials()); auto const using_emulator = internal::GetEnv("PUBSUB_EMULATOR_HOST").has_value(); if (using_emulator) { @@ -454,6 +471,56 @@ TEST_F(SubscriberIntegrationTest, UnifiedCredentials) { ASSERT_NO_FATAL_FAILURE(TestRoundtrip(publisher, subscriber)); } +TEST_F(SubscriberIntegrationTest, ExactlyOnce) { + auto publisher = Publisher(MakePublisherConnection(topic_)); + auto subscriber = + Subscriber(MakeSubscriberConnection(exactly_once_subscription_)); + + std::mutex mu; + std::map ids; + for (auto const* data : {"message-0", "message-1", "message-2"}) { + auto response = + publisher.Publish(MessageBuilder{}.SetData(data).Build()).get(); + EXPECT_STATUS_OK(response); + if (response) { + std::lock_guard lk(mu); + ids.emplace(*std::move(response), 0); + } + } + EXPECT_FALSE(ids.empty()); + + promise ids_empty; + auto callback = [&](pubsub::Message const& m, ExactlyOnceAckHandler h) { + SCOPED_TRACE("Search for message " + m.message_id()); + std::unique_lock lk(mu); + auto i = ids.find(m.message_id()); + ASSERT_FALSE(i == ids.end()); + if (i->second == 0) { + ++i->second; + lk.unlock(); + std::move(h).nack().then([id = m.message_id()](auto f) { + auto status = f.get(); + ASSERT_STATUS_OK(status) << " nack() failed for id=" << id; + }); + return; + } + ids.erase(i); + if (ids.empty()) ids_empty.set_value(); + lk.unlock(); + std::move(h).ack().then([id = m.message_id()](auto f) { + auto status = f.get(); + ASSERT_STATUS_OK(status) << " ack() failed for id=" << id; + }); + }; + + auto result = subscriber.Subscribe(callback); + // Wait until there are no more ids pending, then cancel the subscription and + // get its status. + ids_empty.get_future().get(); + result.cancel(); + EXPECT_STATUS_OK(result.get()); +} + /// @test Verify the backwards compatibility `v1` namespace still exists. TEST_F(SubscriberIntegrationTest, BackwardsCompatibility) { auto connection = ::google::cloud::pubsub::v1::MakeSubscriberConnection( diff --git a/google/cloud/pubsub/internal/subscription_concurrency_control.cc b/google/cloud/pubsub/internal/subscription_concurrency_control.cc index 2e6c5f5a65b1d..0c4c109a5af54 100644 --- a/google/cloud/pubsub/internal/subscription_concurrency_control.cc +++ b/google/cloud/pubsub/internal/subscription_concurrency_control.cc @@ -13,7 +13,7 @@ // limitations under the License. #include "google/cloud/pubsub/internal/subscription_concurrency_control.h" -#include "google/cloud/pubsub/ack_handler.h" +#include "google/cloud/pubsub/exactly_once_ack_handler.h" #include "google/cloud/log.h" #include "absl/memory/memory.h" @@ -23,7 +23,7 @@ namespace pubsub_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace { -class AckHandlerImpl : public pubsub_internal::ExactlyOnceAckHandler::Impl { +class AckHandlerImpl : public pubsub::ExactlyOnceAckHandler::Impl { public: explicit AckHandlerImpl(std::weak_ptr w, std::string ack_id, std::int32_t delivery_attempt) diff --git a/google/cloud/pubsub/internal/subscription_concurrency_control.h b/google/cloud/pubsub/internal/subscription_concurrency_control.h index 6e9cc74e5aafa..064f10d87b7a0 100644 --- a/google/cloud/pubsub/internal/subscription_concurrency_control.h +++ b/google/cloud/pubsub/internal/subscription_concurrency_control.h @@ -15,7 +15,7 @@ #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_SUBSCRIPTION_CONCURRENCY_CONTROL_H #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_SUBSCRIPTION_CONCURRENCY_CONTROL_H -#include "google/cloud/pubsub/internal/exactly_once_ack_handler.h" +#include "google/cloud/pubsub/exactly_once_ack_handler.h" #include "google/cloud/pubsub/internal/session_shutdown_manager.h" #include "google/cloud/pubsub/internal/subscription_message_source.h" #include "google/cloud/pubsub/message.h" @@ -30,7 +30,7 @@ namespace pubsub_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN using Callback = std::function)>; + pubsub::Message, std::unique_ptr)>; class SubscriptionConcurrencyControl : public std::enable_shared_from_this { diff --git a/google/cloud/pubsub/internal/subscription_concurrency_control_test.cc b/google/cloud/pubsub/internal/subscription_concurrency_control_test.cc index fc45851ac1732..bcc4ad02c5173 100644 --- a/google/cloud/pubsub/internal/subscription_concurrency_control_test.cc +++ b/google/cloud/pubsub/internal/subscription_concurrency_control_test.cc @@ -13,12 +13,12 @@ // limitations under the License. #include "google/cloud/pubsub/internal/subscription_concurrency_control.h" +#include "google/cloud/pubsub/exactly_once_ack_handler.h" #include "google/cloud/pubsub/internal/subscription_session.h" #include "google/cloud/pubsub/testing/mock_subscription_message_source.h" #include "google/cloud/log.h" #include "google/cloud/testing_util/status_matchers.h" #include -#include #include #include #include @@ -30,6 +30,7 @@ namespace pubsub_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace { +using ::google::cloud::pubsub::ExactlyOnceAckHandler; using ::google::cloud::testing_util::IsOk; using ::testing::AtLeast; using ::testing::ByMove; diff --git a/google/cloud/pubsub/internal/subscription_session.cc b/google/cloud/pubsub/internal/subscription_session.cc index 1202cf6ac31ec..bde98f96f3af1 100644 --- a/google/cloud/pubsub/internal/subscription_session.cc +++ b/google/cloud/pubsub/internal/subscription_session.cc @@ -14,7 +14,7 @@ #include "google/cloud/pubsub/internal/subscription_session.h" #include "google/cloud/pubsub/ack_handler.h" -#include "google/cloud/pubsub/internal/exactly_once_ack_handler.h" +#include "google/cloud/pubsub/exactly_once_ack_handler.h" #include "google/cloud/pubsub/internal/streaming_subscription_batch_source.h" #include "google/cloud/pubsub/internal/subscription_lease_management.h" #include "google/cloud/pubsub/internal/subscription_message_queue.h" @@ -27,11 +27,13 @@ namespace pubsub_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace { -class AckHandlerWrapper : public pubsub::AckHandler::Impl { +using ::google::cloud::pubsub::AckHandler; +using ::google::cloud::pubsub::ExactlyOnceAckHandler; + +class AckHandlerWrapper : public AckHandler::Impl { public: - explicit AckHandlerWrapper( - std::unique_ptr impl, - std::string message_id) + explicit AckHandlerWrapper(std::unique_ptr impl, + std::string message_id) : impl_(std::move(impl)), message_id_(std::move(message_id)) {} ~AckHandlerWrapper() override = default; @@ -54,7 +56,7 @@ class AckHandlerWrapper : public pubsub::AckHandler::Impl { } private: - std::unique_ptr impl_; + std::unique_ptr impl_; std::string message_id_; }; @@ -98,27 +100,27 @@ class SubscriptionSessionImpl std::shared_ptr shutdown_manager, std::shared_ptr source, pubsub::ApplicationCallback application_callback) { - return Create( - opts, std::move(cq), std::move(shutdown_manager), std::move(source), - [cb = std::move(application_callback)]( - pubsub::Message m, std::unique_ptr h) { - auto wrapper = absl::make_unique(std::move(h), - m.message_id()); - cb(std::move(m), pubsub::AckHandler(std::move(wrapper))); - }); + return Create(opts, std::move(cq), std::move(shutdown_manager), + std::move(source), + [cb = std::move(application_callback)]( + pubsub::Message m, + std::unique_ptr h) { + auto wrapper = absl::make_unique( + std::move(h), m.message_id()); + cb(std::move(m), pubsub::AckHandler(std::move(wrapper))); + }); } static future Create( Options const& opts, CompletionQueue cq, std::shared_ptr shutdown_manager, std::shared_ptr source, - pubsub_internal::ExactlyOnceApplicationCallback application_callback) { + pubsub::ExactlyOnceApplicationCallback application_callback) { return Create( opts, std::move(cq), std::move(shutdown_manager), std::move(source), [cb = std::move(application_callback)]( pubsub::Message m, std::unique_ptr h) { - cb(std::move(m), - pubsub_internal::ExactlyOnceAckHandler(std::move(h))); + cb(std::move(m), pubsub::ExactlyOnceAckHandler(std::move(h))); }); } @@ -241,7 +243,7 @@ future CreateSubscriptionSession( pubsub::Subscription const& subscription, Options const& opts, std::shared_ptr const& stub, CompletionQueue const& cq, std::string client_id, - pubsub_internal::ExactlyOnceApplicationCallback application_callback) { + pubsub::ExactlyOnceApplicationCallback application_callback) { auto shutdown_manager = std::make_shared(); auto batch = std::make_shared( cq, shutdown_manager, stub, subscription.FullName(), std::move(client_id), diff --git a/google/cloud/pubsub/internal/subscription_session.h b/google/cloud/pubsub/internal/subscription_session.h index 9c9f1ba332a3a..f3405eeff57e8 100644 --- a/google/cloud/pubsub/internal/subscription_session.h +++ b/google/cloud/pubsub/internal/subscription_session.h @@ -35,19 +35,6 @@ namespace cloud { namespace pubsub_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN -// TODO(#9327) - move this to application_callback.h -/** - * Defines the interface for application-level callbacks with exactly-once - * delivery. - * - * Applications provide a callable compatible with this type to receive - * messages. They acknowledge (or reject) messages using - * `ExactlyOnceAckHandler`. This is a move-only type to support asynchronous - * acknowledgments. - */ -using ExactlyOnceApplicationCallback = - std::function; - future CreateSubscriptionSession( pubsub::Subscription const& subscription, Options const& opts, std::shared_ptr const& stub, CompletionQueue const& cq, @@ -56,7 +43,8 @@ future CreateSubscriptionSession( future CreateSubscriptionSession( pubsub::Subscription const& subscription, Options const& opts, std::shared_ptr const& stub, CompletionQueue const& cq, - std::string client_id, ExactlyOnceApplicationCallback application_callback); + std::string client_id, + pubsub::ExactlyOnceApplicationCallback application_callback); GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace pubsub_internal diff --git a/google/cloud/pubsub/internal/subscription_session_test.cc b/google/cloud/pubsub/internal/subscription_session_test.cc index 05de395025fc3..9a459ff645b6b 100644 --- a/google/cloud/pubsub/internal/subscription_session_test.cc +++ b/google/cloud/pubsub/internal/subscription_session_test.cc @@ -13,6 +13,8 @@ // limitations under the License. #include "google/cloud/pubsub/internal/subscription_session.h" +#include "google/cloud/pubsub/application_callback.h" +#include "google/cloud/pubsub/exactly_once_ack_handler.h" #include "google/cloud/pubsub/internal/defaults.h" #include "google/cloud/pubsub/subscriber_connection.h" #include "google/cloud/pubsub/testing/fake_streaming_pull.h" @@ -34,6 +36,7 @@ namespace pubsub_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace { +using ::google::cloud::pubsub::ExactlyOnceAckHandler; using ::google::cloud::pubsub_testing::FakeAsyncStreamingPull; using ::google::cloud::testing_util::AsyncSequencer; using ::google::cloud::testing_util::ScopedLog; @@ -56,7 +59,7 @@ future CreateTestSubscriptionSession( future CreateTestSubscriptionSession( pubsub::Subscription const& subscription, Options opts, std::shared_ptr const& mock, CompletionQueue const& cq, - ExactlyOnceApplicationCallback callback) { + pubsub::ExactlyOnceApplicationCallback callback) { opts = DefaultSubscriberOptions( pubsub_testing::MakeTestOptions(std::move(opts))); return CreateSubscriptionSession(subscription, std::move(opts), mock, cq, diff --git a/google/cloud/pubsub/mocks/mock_exactly_once_ack_handler.h b/google/cloud/pubsub/mocks/mock_exactly_once_ack_handler.h new file mode 100644 index 0000000000000..060e9877f867a --- /dev/null +++ b/google/cloud/pubsub/mocks/mock_exactly_once_ack_handler.h @@ -0,0 +1,49 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_MOCKS_MOCK_EXACTLY_ONCE_ACK_HANDLER_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_MOCKS_MOCK_EXACTLY_ONCE_ACK_HANDLER_H + +#include "google/cloud/pubsub/exactly_once_ack_handler.h" +#include +#include + +namespace google { +namespace cloud { +namespace pubsub_mocks { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +/** + * A googlemock-based mock for + * [pubsub::ExactlyOnceAckHandler::Impl][mocked-link] + * + * [mocked-link]: @ref + * google::cloud::pubsub_internal::ExactlyOnceAckHandler::Impl + * + * @see @ref subscriber-mock for an example using this class. + */ +class MockExactlyOnceAckHandler : public pubsub::ExactlyOnceAckHandler::Impl { + public: + MOCK_METHOD(future, ack, (), (override)); + MOCK_METHOD(future, nack, (), (override)); + MOCK_METHOD(std::string, ack_id, ()); + MOCK_METHOD(std::int32_t, delivery_attempt, (), (const, override)); +}; + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace pubsub_mocks +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_MOCKS_MOCK_EXACTLY_ONCE_ACK_HANDLER_H diff --git a/google/cloud/pubsub/mocks/mock_subscriber_connection.h b/google/cloud/pubsub/mocks/mock_subscriber_connection.h index a1bc423bb0cc6..bbfccbc38eba9 100644 --- a/google/cloud/pubsub/mocks/mock_subscriber_connection.h +++ b/google/cloud/pubsub/mocks/mock_subscriber_connection.h @@ -34,6 +34,9 @@ class MockSubscriberConnection : public pubsub::SubscriberConnection { public: MOCK_METHOD(future, Subscribe, (pubsub::SubscriberConnection::SubscribeParams), (override)); + MOCK_METHOD(future, ExactlyOnceSubscribe, + (pubsub::SubscriberConnection::ExactlyOnceSubscribeParams), + (override)); }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/pubsub/pubsub_client_unit_tests.bzl b/google/cloud/pubsub/pubsub_client_unit_tests.bzl index 1fdc5ef10a594..2bd8c906bfd7e 100644 --- a/google/cloud/pubsub/pubsub_client_unit_tests.bzl +++ b/google/cloud/pubsub/pubsub_client_unit_tests.bzl @@ -18,10 +18,10 @@ pubsub_client_unit_tests = [ "ack_handler_test.cc", + "exactly_once_ack_handler_test.cc", "internal/batching_publisher_connection_test.cc", "internal/default_batch_sink_test.cc", "internal/defaults_test.cc", - "internal/exactly_once_ack_handler_test.cc", "internal/exactly_once_policies_test.cc", "internal/extend_leases_with_retry_test.cc", "internal/flow_controlled_publisher_connection_test.cc", diff --git a/google/cloud/pubsub/samples/mock_subscriber.cc b/google/cloud/pubsub/samples/mock_subscriber.cc index 7fbbf462822eb..ca42334d159ca 100644 --- a/google/cloud/pubsub/samples/mock_subscriber.cc +++ b/google/cloud/pubsub/samples/mock_subscriber.cc @@ -15,6 +15,7 @@ //! [all] //! [required-includes] #include "google/cloud/pubsub/mocks/mock_ack_handler.h" +#include "google/cloud/pubsub/mocks/mock_exactly_once_ack_handler.h" #include "google/cloud/pubsub/mocks/mock_subscriber_connection.h" #include "google/cloud/pubsub/subscriber.h" #include "absl/memory/memory.h" @@ -99,3 +100,66 @@ TEST(MockSubscribeExample, Subscribe) { } // namespace //! [all] + +namespace { + +//! [exactly-once] +using ::google::cloud::Status; +using ::google::cloud::pubsub_mocks::MockExactlyOnceAckHandler; +using ::testing::ByMove; + +TEST(MockSubscribeExample, ExactlyOnceSubscribe) { + auto mock = std::make_shared(); + + // Generate 3 messages in a separate thread and then close the + // subscription with success. + auto generator = + [](promise promise, + pubsub::SubscriberConnection::ExactlyOnceSubscribeParams const& + params) { + for (int i = 0; i != 3; ++i) { + auto mock_handler = absl::make_unique(); + EXPECT_CALL(*mock_handler, ack_id) + .WillRepeatedly(Return("ack-id-" + std::to_string(i))); + EXPECT_CALL(*mock_handler, ack) + .WillOnce(Return(ByMove(make_ready_future(Status{})))); + + // Simulate callbacks + params.callback( + pubsub::MessageBuilder{} + .SetData("message-" + std::to_string(i)) + .Build(), + pubsub::ExactlyOnceAckHandler(std::move(mock_handler))); + } + // Close the stream with a successful error code + promise.set_value({}); + }; + + EXPECT_CALL(*mock, ExactlyOnceSubscribe) + .WillOnce( + [&](pubsub::SubscriberConnection::ExactlyOnceSubscribeParams params) { + promise p; + auto result = p.get_future(); + // start the generator in a separate thread. + (void)std::async(std::launch::async, generator, std::move(p), + std::move(params)); + return result; + }); + + pubsub::Subscriber subscriber(mock); + + std::vector payloads; + auto callback = [&](pubsub::Message const& m, + pubsub::ExactlyOnceAckHandler h) { + payloads.push_back(m.data()); + std::move(h).ack(); + }; + auto session = subscriber.Subscribe(callback); + + EXPECT_TRUE(session.get().ok()); + EXPECT_THAT(payloads, + UnorderedElementsAre("message-0", "message-1", "message-2")); +} +//! [exactly-once] + +} // namespace \ No newline at end of file diff --git a/google/cloud/pubsub/samples/samples.cc b/google/cloud/pubsub/samples/samples.cc index 1317cf491a98d..bea44fa13731c 100644 --- a/google/cloud/pubsub/samples/samples.cc +++ b/google/cloud/pubsub/samples/samples.cc @@ -287,6 +287,30 @@ void CreateSubscription(google::cloud::pubsub::SubscriptionAdminClient client, (std::move(client), argv.at(0), argv.at(1), argv.at(2)); } +void CreateSubscriptionWithExactlyOnceDelivery( + google::cloud::pubsub::SubscriptionAdminClient client, + std::vector const& argv) { + // [START pubsub_create_subscription_with_exactly_once_delivery] + namespace pubsub = ::google::cloud::pubsub; + [](pubsub::SubscriptionAdminClient client, std::string const& project_id, + std::string const& topic_id, std::string const& subscription_id) { + auto sub = client.CreateSubscription( + pubsub::Topic(project_id, topic_id), + pubsub::Subscription(project_id, subscription_id), + pubsub::SubscriptionBuilder{}.enable_exactly_once_delivery(true)); + if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) { + std::cout << "The subscription already exists\n"; + return; + } + if (!sub) throw std::runtime_error(sub.status().message()); + + std::cout << "The subscription was successfully created: " + << sub->DebugString() << "\n"; + } + // [END pubsub_create_subscription_with_exactly_once_delivery] + (std::move(client), argv.at(0), argv.at(1), argv.at(2)); +} + void CreateFilteredSubscription( google::cloud::pubsub::SubscriptionAdminClient client, std::vector const& argv) { @@ -1285,6 +1309,29 @@ void Subscribe(google::cloud::pubsub::Subscriber subscriber, sample(std::move(subscriber)), __func__); } +void ExactlyOnceSubscribe(google::cloud::pubsub::Subscriber subscriber, + std::vector const&) { + auto const initial = EventCounter::Instance().Current(); + //! [START pubsub_subscriber_exactly_once] [exactly-once-subscribe] + namespace pubsub = ::google::cloud::pubsub; + auto sample = [](pubsub::Subscriber subscriber) { + return subscriber.Subscribe( + [&](pubsub::Message const& m, pubsub::ExactlyOnceAckHandler h) { + std::cout << "Received message " << m << "\n"; + std::move(h).ack().then([id = m.message_id()](auto f) { + auto status = f.get(); + std::cout << "Message id " << id + << " ack() completed with status=" << status << "\n"; + }); + PleaseIgnoreThisSimplifiesTestingTheSamples(); + }); + }; + //! [END pubsub_subscriber_exactly_once] [exactly-once-subscribe] + EventCounter::Instance().Wait( + [initial](std::int64_t count) { return count > initial; }, + sample(std::move(subscriber)), __func__); +} + void SubscribeErrorListener(google::cloud::pubsub::Subscriber subscriber, std::vector const&) { auto current = EventCounter::Instance().Current(); @@ -1972,6 +2019,7 @@ void AutoRun(std::vector const& argv) { auto generator = google::cloud::internal::MakeDefaultPRNG(); auto const topic_id = RandomTopicId(generator); auto const subscription_id = RandomSubscriptionId(generator); + auto const exactly_once_subscription_id = RandomSubscriptionId(generator); auto const filtered_subscription_id = RandomSubscriptionId(generator); auto const push_subscription_id = RandomSubscriptionId(generator); auto const ordering_subscription_id = RandomSubscriptionId(generator); @@ -2023,6 +2071,20 @@ void AutoRun(std::vector const& argv) { CreateFilteredSubscription(subscription_admin_client, {project_id, topic_id, filtered_subscription_id}); + std::cout + << "\nRunning CreateSubscriptionWithExactlyOnceDelivery() sample [1]" + << std::endl; + CreateSubscriptionWithExactlyOnceDelivery( + subscription_admin_client, + {project_id, topic_id, exactly_once_subscription_id}); + + std::cout + << "\nRunning CreateSubscriptionWithExactlyOnceDelivery() sample [2]" + << std::endl; + CreateSubscriptionWithExactlyOnceDelivery( + subscription_admin_client, + {project_id, topic_id, exactly_once_subscription_id}); + std::cout << "\nRunning ListTopicSubscriptions() sample" << std::endl; ListTopicSubscriptions(topic_admin_client, {project_id, topic_id}); @@ -2137,6 +2199,11 @@ void AutoRun(std::vector const& argv) { google::cloud::pubsub::MakeSubscriberConnection( dead_letter_subscription)); + auto exactly_once_subscriber = google::cloud::pubsub::Subscriber( + google::cloud::pubsub::MakeSubscriberConnection( + google::cloud::pubsub::Subscription(project_id, + exactly_once_subscription_id))); + auto filtered_subscriber = google::cloud::pubsub::Subscriber( google::cloud::pubsub::MakeSubscriberConnection( google::cloud::pubsub::Subscription(project_id, @@ -2151,6 +2218,9 @@ void AutoRun(std::vector const& argv) { std::cout << "\nRunning Subscribe() sample" << std::endl; Subscribe(subscriber, {}); + std::cout << "\nRunning ExactlyOnceSubscribe() sample" << std::endl; + ExactlyOnceSubscribe(exactly_once_subscriber, {}); + std::cout << "\nRunning Subscribe(filtered) sample" << std::endl; PublishHelper(publisher, "Subscribe(filtered)", 8); Subscribe(filtered_subscriber, {}); @@ -2296,6 +2366,10 @@ int main(int argc, char* argv[]) { // NOLINT(bugprone-exception-escape) "create-filtered-subscription", {"project-id", "topic-id", "subscription-id"}, CreateFilteredSubscription), + CreateSubscriptionAdminCommand( + "create-subscription-with-exactly-once-delivery", + {"project-id", "topic-id", "subscription-id"}, + CreateSubscriptionWithExactlyOnceDelivery), CreateSubscriptionAdminCommand( "create-push-subscription", {"project-id", "topic-id", "subscription-id", "endpoint"}, @@ -2388,6 +2462,8 @@ int main(int argc, char* argv[]) { // NOLINT(bugprone-exception-escape) CreatePublisherCommand("publish-ordering-key", {}, PublishOrderingKey), CreatePublisherCommand("resume-ordering-key", {}, ResumeOrderingKey), CreateSubscriberCommand("subscribe", {}, Subscribe), + CreateSubscriberCommand("exactly-once-subscribe", {}, + ExactlyOnceSubscribe), CreateSubscriberCommand("subscribe-error-listener", {}, SubscribeErrorListener), CreateSubscriberCommand("subscribe-custom-attributes", {}, diff --git a/google/cloud/pubsub/subscriber.h b/google/cloud/pubsub/subscriber.h index 691408a2ae48a..90c3ff091c4a2 100644 --- a/google/cloud/pubsub/subscriber.h +++ b/google/cloud/pubsub/subscriber.h @@ -15,6 +15,8 @@ #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_SUBSCRIBER_H #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_SUBSCRIBER_H +#include "google/cloud/pubsub/ack_handler.h" +#include "google/cloud/pubsub/exactly_once_ack_handler.h" #include "google/cloud/pubsub/message.h" #include "google/cloud/pubsub/subscriber_connection.h" #include "google/cloud/pubsub/subscription.h" @@ -94,9 +96,6 @@ class Subscriber { /** * Creates a new session to receive messages from @p subscription. * - * @note Callable must be `CopyConstructible`, as @p cb will be stored in a - * [`std::function<>`][std-function-link]. - * * @par Idempotency * @parblock * This is an idempotent operation; it only reads messages from the service. @@ -119,14 +118,42 @@ class Subscriber { * messages. For example, because there was an unrecoverable error trying * to receive data. Calling `.cancel()` in this object will (eventually) * terminate the session and satisfy the future. - * - * [std-function-link]: - * https://en.cppreference.com/w/cpp/utility/functional/function */ - future Subscribe(std::function cb) { + future Subscribe(ApplicationCallback cb) { return connection_->Subscribe({std::move(cb)}); } + /** + * Creates a new session to receive messages from @p subscription using + * exactly-once delivery. + * + * @par Idempotency + * @parblock + * This is an idempotent operation; it only reads messages from the service. + * Will make multiple attempts to start a connection to the service, subject + * to the retry policies configured in the `SubscriberConnection`. Once a + * successful connection is established the library will try to resume the + * connection even if the connection fails with a permanent error. Resuming + * the connection is subject to the retry policies as described earlier. + * + * Note that calling `ExactlyOnceAckHandler::ack()` and/or + * `ExactlyOnceAckHandler::nack()` have their own rules with respect to + * retrying. Check the documentation of these functions for details. + * @endparblock + * + * @par Example + * @snippet samples.cc exactly-once-subscribe + * + * @param cb the callable invoked when messages are received. + * @return a future that is satisfied when the session will no longer receive + * messages. For example, because there was an unrecoverable error trying + * to receive data. Calling `.cancel()` in this object will (eventually) + * terminate the session and satisfy the future. + */ + future Subscribe(ExactlyOnceApplicationCallback cb) { + return connection_->ExactlyOnceSubscribe({std::move(cb)}); + } + private: std::shared_ptr connection_; }; diff --git a/google/cloud/pubsub/subscriber_connection.cc b/google/cloud/pubsub/subscriber_connection.cc index ab381ffe3224a..6cb4a76dcbd6c 100644 --- a/google/cloud/pubsub/subscriber_connection.cc +++ b/google/cloud/pubsub/subscriber_connection.cc @@ -47,18 +47,25 @@ class SubscriberConnectionImpl : public pubsub::SubscriberConnection { ~SubscriberConnectionImpl() override = default; future Subscribe(SubscribeParams p) override { - auto client_id = [this] { - std::lock_guard lk(mu_); - auto constexpr kLength = 32; - auto constexpr kChars = "abcdefghijklmnopqrstuvwxyz0123456789"; - return internal::Sample(generator_, kLength, kChars); - }(); return CreateSubscriptionSession(subscription_, opts_, stub_, - background_->cq(), std::move(client_id), + background_->cq(), MakeClientId(), + std::move(p.callback)); + } + + future ExactlyOnceSubscribe(ExactlyOnceSubscribeParams p) override { + return CreateSubscriptionSession(subscription_, opts_, stub_, + background_->cq(), MakeClientId(), std::move(p.callback)); } private: + std::string MakeClientId() { + std::lock_guard lk(mu_); + auto constexpr kLength = 32; + auto constexpr kChars = "abcdefghijklmnopqrstuvwxyz0123456789"; + return internal::Sample(generator_, kLength, kChars); + } + pubsub::Subscription const subscription_; Options const opts_; std::shared_ptr stub_; @@ -99,6 +106,13 @@ future SubscriberConnection::Subscribe(SubscribeParams) { Status{StatusCode::kUnimplemented, "needs-override"}); } +future SubscriberConnection::ExactlyOnceSubscribe( + // NOLINTNEXTLINE(performance-unnecessary-value-param) + ExactlyOnceSubscribeParams) { + return make_ready_future( + Status{StatusCode::kUnimplemented, "needs-override"}); +} + std::shared_ptr MakeSubscriberConnection( Subscription subscription, std::initializer_list) { diff --git a/google/cloud/pubsub/subscriber_connection.h b/google/cloud/pubsub/subscriber_connection.h index 6798326766f08..29e001d21694f 100644 --- a/google/cloud/pubsub/subscriber_connection.h +++ b/google/cloud/pubsub/subscriber_connection.h @@ -63,8 +63,21 @@ class SubscriberConnection { ApplicationCallback callback; }; - /// Defines the interface for `Subscriber::Subscribe()` + /// Defines the interface for `Subscriber::Subscribe(ApplicationCallback)` virtual future Subscribe(SubscribeParams p); + + struct ExactlyOnceSubscribeParams { + ExactlyOnceApplicationCallback callback; + }; + + /** + * Defines the interface for + * `Subscriber::Subscribe(ExactlyOnceApplicationCallback)`. + * + * We use a different name for this function (as opposed to an overload) to + * simplify the use of mocks. + */ + virtual future ExactlyOnceSubscribe(ExactlyOnceSubscribeParams p); }; /** diff --git a/google/cloud/pubsub/subscriber_connection_test.cc b/google/cloud/pubsub/subscriber_connection_test.cc index a3eb58f2b4dd5..85e90df2da261 100644 --- a/google/cloud/pubsub/subscriber_connection_test.cc +++ b/google/cloud/pubsub/subscriber_connection_test.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/pubsub/subscriber_connection.h" +#include "google/cloud/pubsub/exactly_once_ack_handler.h" #include "google/cloud/pubsub/internal/defaults.h" #include "google/cloud/pubsub/testing/fake_streaming_pull.h" #include "google/cloud/pubsub/testing/mock_subscriber_stub.h" @@ -97,6 +98,51 @@ TEST(SubscriberConnectionTest, Basic) { t.join(); } +TEST(SubscriberConnectionTest, ExactlyOnce) { + auto mock = std::make_shared(); + Subscription const subscription("test-project", "test-subscription"); + EXPECT_CALL(*mock, AsyncModifyAckDeadline) + .WillRepeatedly([](google::cloud::CompletionQueue&, + std::unique_ptr, + google::pubsub::v1::ModifyAckDeadlineRequest const&) { + return make_ready_future(Status{}); + }); + EXPECT_CALL(*mock, AsyncAcknowledge) + .WillOnce([](google::cloud::CompletionQueue&, + std::unique_ptr, + google::pubsub::v1::AcknowledgeRequest const& request) { + EXPECT_THAT(request.ack_ids(), Contains("test-ack-id-0")); + return make_ready_future( + Status{StatusCode::kUnknown, "test-only-unknown"}); + }); + EXPECT_CALL(*mock, AsyncStreamingPull) + .Times(AtLeast(1)) + .WillRepeatedly(FakeAsyncStreamingPull); + + CompletionQueue cq; + auto subscriber = MakeTestSubscriberConnection(subscription, mock, + UserSuppliedThreadsOption(cq)); + std::atomic_flag received_one{false}; + promise waiter; + auto callback = [&](Message const& m, ExactlyOnceAckHandler h) { + if (received_one.test_and_set()) return; + EXPECT_THAT(m.message_id(), StartsWith("test-message-id-")); + auto status = std::move(h).ack().get(); + EXPECT_THAT(status, StatusIs(StatusCode::kUnknown, "test-only-unknown")); + waiter.set_value(); + }; + std::thread t([&cq] { cq.Run(); }); + auto response = subscriber->ExactlyOnceSubscribe({callback}); + waiter.get_future().wait(); + response.cancel(); + ASSERT_STATUS_OK(response.get()); + // We need to explicitly cancel any pending timers (some of which may be quite + // long) left by the subscription. + cq.CancelAll(); + cq.Shutdown(); + t.join(); +} + TEST(SubscriberConnectionTest, PullFailure) { auto mock = std::make_shared(); Subscription const subscription("test-project", "test-subscription");