diff --git a/google/cloud/pubsub/CMakeLists.txt b/google/cloud/pubsub/CMakeLists.txt index d5be3b1b77abd..4fb8122420397 100644 --- a/google/cloud/pubsub/CMakeLists.txt +++ b/google/cloud/pubsub/CMakeLists.txt @@ -139,6 +139,7 @@ add_library( publisher_options.h pull_ack_handler.cc pull_ack_handler.h + pull_response.h retry_policy.h schema.cc schema.h diff --git a/google/cloud/pubsub/google_cloud_cpp_pubsub.bzl b/google/cloud/pubsub/google_cloud_cpp_pubsub.bzl index be6bc6c68c442..c4e4ac33e94e0 100644 --- a/google/cloud/pubsub/google_cloud_cpp_pubsub.bzl +++ b/google/cloud/pubsub/google_cloud_cpp_pubsub.bzl @@ -72,6 +72,7 @@ google_cloud_cpp_pubsub_hdrs = [ "publisher_connection.h", "publisher_options.h", "pull_ack_handler.h", + "pull_response.h", "retry_policy.h", "schema.h", "schema_admin_client.h", diff --git a/google/cloud/pubsub/integration_tests/subscriber_integration_test.cc b/google/cloud/pubsub/integration_tests/subscriber_integration_test.cc index 930baa99e61fc..9d023f7d3bb0d 100644 --- a/google/cloud/pubsub/integration_tests/subscriber_integration_test.cc +++ b/google/cloud/pubsub/integration_tests/subscriber_integration_test.cc @@ -523,6 +523,32 @@ TEST_F(SubscriberIntegrationTest, ExactlyOnce) { EXPECT_STATUS_OK(result.get()); } +TEST_F(SubscriberIntegrationTest, BlockingPull) { + auto publisher = Publisher(MakePublisherConnection(topic_)); + auto subscriber = + Subscriber(MakeSubscriberConnection(exactly_once_subscription_)); + + std::set ids; + for (auto const* data : {"message-0", "message-1", "message-2"}) { + auto response = + publisher.Publish(MessageBuilder{}.SetData(data).Build()).get(); + EXPECT_STATUS_OK(response); + if (response) ids.insert(*std::move(response)); + } + EXPECT_THAT(ids, Not(IsEmpty())); + + auto const count = 2 * ids.size(); + for (std::size_t i = 0; i != count && !ids.empty(); ++i) { + auto response = subscriber.Pull(); + EXPECT_STATUS_OK(response); + if (!response) continue; + auto ack = std::move(response->handler).ack().get(); + EXPECT_STATUS_OK(ack); + ids.erase(response->message.message_id()); + } + EXPECT_THAT(ids, IsEmpty()); +} + /// @test Verify the backwards compatibility `v1` namespace still exists. TEST_F(SubscriberIntegrationTest, BackwardsCompatibility) { auto connection = ::google::cloud::pubsub::v1::MakeSubscriberConnection( diff --git a/google/cloud/pubsub/internal/subscriber_connection_impl.cc b/google/cloud/pubsub/internal/subscriber_connection_impl.cc index cdfbf50f61f21..efa3da9cd5aac 100644 --- a/google/cloud/pubsub/internal/subscriber_connection_impl.cc +++ b/google/cloud/pubsub/internal/subscriber_connection_impl.cc @@ -48,8 +48,7 @@ future SubscriberConnectionImpl::ExactlyOnceSubscribe( background_->cq(), MakeClientId(), std::move(p.callback)); } -StatusOr -SubscriberConnectionImpl::Pull() { +StatusOr SubscriberConnectionImpl::Pull() { google::pubsub::v1::PullRequest request; request.set_subscription(subscription_.FullName()); request.set_max_messages(1); @@ -76,8 +75,8 @@ SubscriberConnectionImpl::Pull() { received_message.delivery_attempt()); auto message = pubsub_internal::FromProto( std::move(*received_message.mutable_message())); - return PullResponse{pubsub::PullAckHandler(std::move(impl)), - std::move(message)}; + return pubsub::PullResponse{pubsub::PullAckHandler(std::move(impl)), + std::move(message)}; } Options SubscriberConnectionImpl::options() { return opts_; } diff --git a/google/cloud/pubsub/internal/subscriber_connection_impl.h b/google/cloud/pubsub/internal/subscriber_connection_impl.h index b9ad793f501af..d10f1bf21aab1 100644 --- a/google/cloud/pubsub/internal/subscriber_connection_impl.h +++ b/google/cloud/pubsub/internal/subscriber_connection_impl.h @@ -17,7 +17,6 @@ #include "google/cloud/pubsub/ack_handler.h" #include "google/cloud/pubsub/message.h" -#include "google/cloud/pubsub/pull_ack_handler.h" #include "google/cloud/pubsub/subscriber_connection.h" #include "google/cloud/status_or.h" #include "google/cloud/version.h" @@ -39,13 +38,7 @@ class SubscriberConnectionImpl : public pubsub::SubscriberConnection { future ExactlyOnceSubscribe(ExactlyOnceSubscribeParams p) override; - // TODO(#7187) - move to pubsub::SubscriberConnection - struct PullResponse { - pubsub::PullAckHandler handler; - pubsub::Message message; - }; - - StatusOr Pull(); + StatusOr Pull() override; Options options() override; diff --git a/google/cloud/pubsub/internal/subscription_session_test.cc b/google/cloud/pubsub/internal/subscription_session_test.cc index 6ca89602499d3..62205a212288b 100644 --- a/google/cloud/pubsub/internal/subscription_session_test.cc +++ b/google/cloud/pubsub/internal/subscription_session_test.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/pubsub/internal/subscription_session.h" +#include "google/cloud/pubsub/ack_handler.h" #include "google/cloud/pubsub/application_callback.h" #include "google/cloud/pubsub/exactly_once_ack_handler.h" #include "google/cloud/pubsub/internal/defaults.h" @@ -20,7 +21,6 @@ #include "google/cloud/pubsub/testing/fake_streaming_pull.h" #include "google/cloud/pubsub/testing/mock_subscriber_stub.h" #include "google/cloud/pubsub/testing/test_retry_policies.h" -#include "google/cloud/log.h" #include "google/cloud/testing_util/async_sequencer.h" #include "google/cloud/testing_util/fake_completion_queue_impl.h" #include "google/cloud/testing_util/scoped_log.h" diff --git a/google/cloud/pubsub/pull_response.h b/google/cloud/pubsub/pull_response.h new file mode 100644 index 0000000000000..1fa9f0312d5ca --- /dev/null +++ b/google/cloud/pubsub/pull_response.h @@ -0,0 +1,55 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_PULL_RESPONSE_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_PULL_RESPONSE_H + +#include "google/cloud/pubsub/message.h" +#include "google/cloud/pubsub/pull_ack_handler.h" +#include "google/cloud/pubsub/version.h" + +namespace google { +namespace cloud { +namespace pubsub { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +/** + * The response for a blocking pull. + * + * If the application invokes `handler.nack()` or allows `handler` to go out + * of scope, then the service will redeliver the message. + * + * With exactly-once delivery subscriptions, the service will stop + * redelivering the message once the application invokes `handler.ack()` and + * the invocation succeeds. With best-efforts subscriptions, the service *may* + * redeliver the message, even after a successful `handler.ack()` invocation. + * + * If `handler` is not an rvalue, you may need to use `std::move(handler).ack()` + * or `std::move(handler).nack()`. + * + * @see https://cloud.google.com/pubsub/docs/exactly-once-delivery + */ +struct PullResponse { + /// The ack/nack handler associated with this message. + pubsub::PullAckHandler handler; + /// The message attributes and payload. + pubsub::Message message; +}; + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace pubsub +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_PULL_RESPONSE_H diff --git a/google/cloud/pubsub/samples/samples.cc b/google/cloud/pubsub/samples/samples.cc index bc71db24bd058..7820429a91c61 100644 --- a/google/cloud/pubsub/samples/samples.cc +++ b/google/cloud/pubsub/samples/samples.cc @@ -1358,6 +1358,19 @@ void ExactlyOnceSubscribe(google::cloud::pubsub::Subscriber subscriber, sample(std::move(subscriber)), __func__); } +void Pull(google::cloud::pubsub::Subscriber subscriber, + std::vector const&) { + //! [pull] + [](google::cloud::pubsub::Subscriber subscriber) { + auto response = subscriber.Pull(); + if (!response) throw std::move(response).status(); + std::cout << "Received message " << response->message << "\n"; + std::move(response->handler).ack(); + } + //! [pull] + (std::move(subscriber)); +} + void SubscribeErrorListener(google::cloud::pubsub::Subscriber subscriber, std::vector const&) { auto current = EventCounter::Instance().Current(); @@ -2248,6 +2261,10 @@ void AutoRun(std::vector const& argv) { std::cout << "\nRunning ExactlyOnceSubscribe() sample" << std::endl; ExactlyOnceSubscribe(exactly_once_subscriber, {}); + std::cout << "\nRunning Pull() sample" << std::endl; + PublishHelper(publisher, "Pull()", 1); + Pull(subscriber, {}); + std::cout << "\nRunning Subscribe(filtered) sample" << std::endl; PublishHelper(publisher, "Subscribe(filtered)", 8); Subscribe(filtered_subscriber, {}); @@ -2499,6 +2516,7 @@ int main(int argc, char* argv[]) { // NOLINT(bugprone-exception-escape) CreateSubscriberCommand("subscribe", {}, Subscribe), CreateSubscriberCommand("exactly-once-subscribe", {}, ExactlyOnceSubscribe), + CreateSubscriberCommand("pull", {}, Pull), CreateSubscriberCommand("subscribe-error-listener", {}, SubscribeErrorListener), CreateSubscriberCommand("subscribe-custom-attributes", {}, diff --git a/google/cloud/pubsub/subscriber.cc b/google/cloud/pubsub/subscriber.cc index cd829894280b2..b0f67dea5a90a 100644 --- a/google/cloud/pubsub/subscriber.cc +++ b/google/cloud/pubsub/subscriber.cc @@ -36,6 +36,11 @@ future Subscriber::Subscribe(ExactlyOnceApplicationCallback cb, return connection_->ExactlyOnceSubscribe({std::move(cb)}); } +StatusOr Subscriber::Pull(Options opts) { + internal::OptionsSpan span(internal::MergeOptions(std::move(opts), options_)); + return connection_->Pull(); +} + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace pubsub } // namespace cloud diff --git a/google/cloud/pubsub/subscriber.h b/google/cloud/pubsub/subscriber.h index c2de07a210556..d4121e3a09b88 100644 --- a/google/cloud/pubsub/subscriber.h +++ b/google/cloud/pubsub/subscriber.h @@ -157,6 +157,30 @@ class Subscriber { future Subscribe(ExactlyOnceApplicationCallback cb, Options opts = {}); + /** + * Pulls one message from @p subscription. + * + * @par Idempotency + * @parblock + * This is an idempotent operation; it only reads messages from the service. + * It will make multiple attempts to pull a message from the service, subject + * to the retry policies configured in the `SubscriberConnection`. + * + * Note that calling `PullAckHandler::ack()` and/or `PullAckHandler::nack()` + * have their own rules with respect to retrying. + * @endparblock + * + * @par Example + * @snippet samples.cc pull + * + * @param opts any option overrides to use in this call. These options take + * precedence over the options passed in the constructor, and over any + * options provided in the `PublisherConnection` initialization. + * @return a response including the message and a `PullAckHandler` to notify + * the library when the message has been successfully handled. + */ + StatusOr Pull(Options opts = {}); + private: std::shared_ptr connection_; google::cloud::Options options_; diff --git a/google/cloud/pubsub/subscriber_connection.cc b/google/cloud/pubsub/subscriber_connection.cc index 18dcbca03c79b..4281a5bf029e1 100644 --- a/google/cloud/pubsub/subscriber_connection.cc +++ b/google/cloud/pubsub/subscriber_connection.cc @@ -24,6 +24,7 @@ #include "google/cloud/pubsub/options.h" #include "google/cloud/pubsub/retry_policy.h" #include "google/cloud/credentials.h" +#include "google/cloud/internal/make_status.h" #include "google/cloud/internal/random.h" #include "google/cloud/log.h" #include @@ -63,14 +64,18 @@ SubscriberConnection::~SubscriberConnection() = default; // NOLINTNEXTLINE(performance-unnecessary-value-param) future SubscriberConnection::Subscribe(SubscribeParams) { return make_ready_future( - Status{StatusCode::kUnimplemented, "needs-override"}); + internal::UnimplementedError("needs-override", GCP_ERROR_INFO())); } future SubscriberConnection::ExactlyOnceSubscribe( // NOLINTNEXTLINE(performance-unnecessary-value-param) ExactlyOnceSubscribeParams) { return make_ready_future( - Status{StatusCode::kUnimplemented, "needs-override"}); + internal::UnimplementedError("needs-override", GCP_ERROR_INFO())); +} + +StatusOr SubscriberConnection::Pull() { + return internal::UnimplementedError("needs-override", GCP_ERROR_INFO()); } std::shared_ptr MakeSubscriberConnection( diff --git a/google/cloud/pubsub/subscriber_connection.h b/google/cloud/pubsub/subscriber_connection.h index 8538a79b467c3..01386cfaba3df 100644 --- a/google/cloud/pubsub/subscriber_connection.h +++ b/google/cloud/pubsub/subscriber_connection.h @@ -15,12 +15,12 @@ #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_SUBSCRIBER_CONNECTION_H #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_SUBSCRIBER_CONNECTION_H -#include "google/cloud/pubsub/ack_handler.h" #include "google/cloud/pubsub/application_callback.h" #include "google/cloud/pubsub/backoff_policy.h" #include "google/cloud/pubsub/connection_options.h" #include "google/cloud/pubsub/internal/subscriber_stub.h" #include "google/cloud/pubsub/message.h" +#include "google/cloud/pubsub/pull_response.h" #include "google/cloud/pubsub/retry_policy.h" #include "google/cloud/pubsub/subscriber_options.h" #include "google/cloud/pubsub/subscription.h" @@ -79,6 +79,8 @@ class SubscriberConnection { */ virtual future ExactlyOnceSubscribe(ExactlyOnceSubscribeParams p); + virtual StatusOr Pull(); + /// Returns the configuration parameters for this object virtual Options options() { return Options{}; } }; diff --git a/google/cloud/pubsub/subscriber_connection_test.cc b/google/cloud/pubsub/subscriber_connection_test.cc index 8e256f3a6c0e1..58575f1d0baf2 100644 --- a/google/cloud/pubsub/subscriber_connection_test.cc +++ b/google/cloud/pubsub/subscriber_connection_test.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/pubsub/subscriber_connection.h" +#include "google/cloud/pubsub/ack_handler.h" #include "google/cloud/pubsub/exactly_once_ack_handler.h" #include "google/cloud/pubsub/internal/defaults.h" #include "google/cloud/pubsub/testing/fake_streaming_pull.h"