forked from googleapis/google-cloud-cpp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
publisher.h
204 lines (192 loc) · 8.14 KB
/
publisher.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_PUBLISHER_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_PUBLISHER_H
#include "google/cloud/pubsub/connection_options.h"
#include "google/cloud/pubsub/publisher_connection.h"
#include "google/cloud/pubsub/publisher_options.h"
#include "google/cloud/pubsub/version.h"
#include <string>
namespace google {
namespace cloud {
namespace pubsub {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
/**
* Publish messages to the Cloud Pub/Sub service.
*
* This class is used to publish messages to a fixed topic, with a fixed
* configuration such as credentials, batching, background threads, etc.
* Applications that publish messages to multiple topics need to create separate
* instances of this class. Applications wanting to publish events with
* different batching configuration also need to create separate instances.
*
* @see https://cloud.google.com/pubsub for an overview of the Cloud Pub/Sub
* service.
*
* @par Example
* @snippet samples.cc publish
*
* @par Message Ordering
* A `Publisher` configured to preserve message ordering will sequence the
* messages that share a common ordering key (see
* `MessageBuilder::SetOrderingKey()`). Messages will be batched by ordering
* key, and new batches will wait until the status of the previous batch is
* known. On an error, all pending and queued messages are discarded, and the
* publisher rejects any new messages for the ordering key that experienced
* problems. The application must call `Publisher::ResumePublishing()` to
* to restore publishing.
*
* @par Performance
* `Publisher` objects are relatively cheap to create, copy, and move. However,
* each `Publisher` object must be created with a
* `std::shared_ptr<PublisherConnection>`, which itself is relatively expensive
* to create. Therefore, connection instances should be shared when possible.
* See the `MakePublisherConnection()` method and the `PublisherConnection`
* interface for more details.
*
* @par Thread Safety
* Instances of this class created via copy-construction or copy-assignment
* share the underlying pool of connections. Access to these copies via multiple
* threads is guaranteed to work. Two threads operating on the same instance of
* this class is not guaranteed to work.
*
* @par Background Threads
* This class uses the background threads configured via the `Options` from
* `GrpcOptionList`. Applications can create their own pool of background
* threads by (a) creating their own #google::cloud::CompletionQueue, (b)
* passing this completion queue as a `GrpcCompletionQueueOption`, and (c)
* attaching any number of threads to the completion queue.
*
* @par Example: using a custom thread pool
* @snippet samples.cc custom-thread-pool-publisher
*
* @par Asynchronous Functions
* Some of the member functions in this class return a `future<T>` (or
* `future<StatusOr<T>>`) object. Readers are probably familiar with
* [`std::future<T>`][std-future-link]. Our version adds a `.then()` function to
* attach a callback to the future, which is invoked when the future is
* satisfied. This function returns a `future<U>` where `U` is the return value
* of the attached function. More details in the #google::cloud::future
* documentation.
*
* @par Error Handling
* This class uses `StatusOr<T>` to report errors. When an operation fails to
* perform its work the returned `StatusOr<T>` contains the error details. If
* the `ok()` member function in the `StatusOr<T>` returns `true` then it
* contains the expected result. Please consult the #google::cloud::StatusOr
* documentation for more details.
*
* @par Batching Configuration Example
* @snippet samples.cc publisher-options
*
* [std-future-link]: https://en.cppreference.com/w/cpp/thread/future
*/
class Publisher {
public:
explicit Publisher(std::shared_ptr<PublisherConnection> connection)
: connection_(std::move(connection)) {}
Publisher(Publisher const&) = default;
Publisher& operator=(Publisher const&) = default;
Publisher(Publisher&&) = default;
Publisher& operator=(Publisher&&) = default;
friend bool operator==(Publisher const& a, Publisher const& b) {
return a.connection_ == b.connection_;
}
friend bool operator!=(Publisher const& a, Publisher const& b) {
return !(a == b);
}
/**
* Publishes a message to this publisher's topic
*
* Note that the message may be batched, depending on the Publisher's
* configuration. It could be delayed until the batch has enough messages,
* or enough data, or enough time has elapsed. See the `PublisherOptionList`
* documentation for more details.
*
* @par Idempotency
* This is a non-idempotent operation, but the client library will
* automatically retry RPCs that fail with transient errors. As Cloud Pub/Sub
* has "at least once" delivery semantics applications are expected to handle
* duplicate messages without problems. The application can disable retries
* by changing the retry policy, please see the example below.
*
* @par Example
* @snippet samples.cc publish
*
* @par Disabling Retries Example
* @snippet samples.cc publisher-disable-retries
*
* @par Changing Retry Parameters Example
* @snippet samples.cc publisher-retry-settings
*
* @return a future that becomes satisfied when the message is published or on
* a unrecoverable error. On success, the future is satisfied with the
* server-assigned ID of the message. IDs are guaranteed to be unique
* within the topic.
*/
future<StatusOr<std::string>> Publish(Message m) {
return connection_->Publish({std::move(m)});
}
/**
* Forcibly publishes any batched messages.
*
* As applications can configure a `Publisher` to buffer messages, it is
* sometimes useful to flush them before any of the normal criteria to send
* the RPCs is met.
*
* @par Idempotency
* See the description in `Publish()`.
*
* @par Example
* @snippet samples.cc publish-custom-attributes
*
* @note This function does not return any status or error codes, the
* application can use the `future<StatusOr<std::string>>` returned in
* each `Publish()` call to find out what the results are.
*/
void Flush() { connection_->Flush({}); }
/**
* Resumes publishing after an error.
*
* If the publisher options have message ordering enabled (see
* `MessageOrderingOption`) all messages for a key that experience failure
* will be rejected until the application calls this function.
*
* @par Idempotency
* This function never initiates a remote RPC, so there are no considerations
* around retrying it. Note, however, that more than one `Publish()` request
* may fail for the same ordering key. The application needs to call this
* function after **each** error before it can resume publishing messages
* with the same ordering key.
*
* @par Example
* @snippet samples.cc resume-publish
*/
void ResumePublish(std::string ordering_key) {
connection_->ResumePublish({std::move(ordering_key)});
}
/// @deprecated Use `Publisher(connection)` and provide any configuration
/// options when initializing the @p connection object.
GOOGLE_CLOUD_CPP_DEPRECATED("use `Publisher(connection)` instead")
explicit Publisher(std::shared_ptr<PublisherConnection> connection,
PublisherOptions const& /* options*/)
: Publisher(std::move(connection)) {}
private:
std::shared_ptr<PublisherConnection> connection_;
};
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace pubsub
} // namespace cloud
} // namespace google
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_PUBLISHER_H