-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
subscription.go
1528 lines (1366 loc) · 55.6 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2016 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
"time"
"cloud.google.com/go/iam"
"cloud.google.com/go/internal/optional"
ipubsub "cloud.google.com/go/internal/pubsub"
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"cloud.google.com/go/pubsub/internal/scheduler"
"github.com/google/uuid"
gax "github.com/googleapis/gax-go/v2"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
durpb "google.golang.org/protobuf/types/known/durationpb"
fmpb "google.golang.org/protobuf/types/known/fieldmaskpb"
vkit "cloud.google.com/go/pubsub/apiv1"
)
// Subscription is a reference to a PubSub subscription.
type Subscription struct {
c *Client
// The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
name string
// Settings for pulling messages. Configure these before calling Receive.
ReceiveSettings ReceiveSettings
mu sync.Mutex
receiveActive bool
// clientID to be used across all streaming pull connections that are created.
// This indicates to the server that any guarantees made for a stream that
// disconnected will be made for the stream that is created to replace it.
clientID string
// enableTracing enable otel tracing of Pub/Sub messages on this subscription.
// This is configured at client instantiation, and allows
// disabling of tracing even when a tracer provider is detected.
enableTracing bool
}
// Subscription creates a reference to a subscription.
func (c *Client) Subscription(id string) *Subscription {
return c.SubscriptionInProject(id, c.projectID)
}
// SubscriptionInProject creates a reference to a subscription in a given project.
func (c *Client) SubscriptionInProject(id, projectID string) *Subscription {
return newSubscription(c, fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id))
}
func newSubscription(c *Client, name string) *Subscription {
return &Subscription{
c: c,
name: name,
clientID: uuid.NewString(),
ReceiveSettings: DefaultReceiveSettings,
enableTracing: c.enableTracing,
}
}
// String returns the globally unique printable name of the subscription.
func (s *Subscription) String() string {
return s.name
}
// ID returns the unique identifier of the subscription within its project.
func (s *Subscription) ID() string {
slash := strings.LastIndex(s.name, "/")
if slash == -1 {
// name is not a fully-qualified name.
panic("bad subscription name")
}
return s.name[slash+1:]
}
// Subscriptions returns an iterator which returns all of the subscriptions for the client's project.
func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator {
it := c.subc.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{
Project: c.fullyQualifiedProjectName(),
})
return &SubscriptionIterator{
c: c,
it: it,
next: func() (string, error) {
sub, err := it.Next()
if err != nil {
return "", err
}
return sub.Name, nil
},
}
}
// SubscriptionIterator is an iterator that returns a series of subscriptions.
type SubscriptionIterator struct {
c *Client
it *vkit.SubscriptionIterator
next func() (string, error)
}
// Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned.
func (subs *SubscriptionIterator) Next() (*Subscription, error) {
subName, err := subs.next()
if err != nil {
return nil, err
}
return newSubscription(subs.c, subName), nil
}
// NextConfig returns the next subscription config. If there are no more subscriptions,
// iterator.Done will be returned.
// This call shares the underlying iterator with calls to `SubscriptionIterator.Next`.
// If you wish to use mix calls, create separate iterator instances for both.
func (subs *SubscriptionIterator) NextConfig() (*SubscriptionConfig, error) {
spb, err := subs.it.Next()
if err != nil {
return nil, err
}
cfg, err := protoToSubscriptionConfig(spb, subs.c)
if err != nil {
return nil, err
}
return &cfg, nil
}
// PushConfig contains configuration for subscriptions that operate in push mode.
type PushConfig struct {
// A URL locating the endpoint to which messages should be pushed.
Endpoint string
// Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details.
Attributes map[string]string
// AuthenticationMethod is used by push endpoints to verify the source
// of push requests.
// It can be used with push endpoints that are private by default to
// allow requests only from the Cloud Pub/Sub system, for example.
// This field is optional and should be set only by users interested in
// authenticated push.
AuthenticationMethod AuthenticationMethod
// The format of the delivered message to the push endpoint is defined by
// the chosen wrapper. When unset, `PubsubWrapper` is used.
Wrapper Wrapper
}
func (pc *PushConfig) toProto() *pb.PushConfig {
if pc == nil {
return nil
}
pbCfg := &pb.PushConfig{
Attributes: pc.Attributes,
PushEndpoint: pc.Endpoint,
}
if authMethod := pc.AuthenticationMethod; authMethod != nil {
switch am := authMethod.(type) {
case *OIDCToken:
pbCfg.AuthenticationMethod = am.toProto()
default: // TODO: add others here when GAIC adds more definitions.
}
}
if w := pc.Wrapper; w != nil {
switch wt := w.(type) {
case *PubsubWrapper:
pbCfg.Wrapper = wt.toProto()
case *NoWrapper:
pbCfg.Wrapper = wt.toProto()
default:
}
}
return pbCfg
}
// AuthenticationMethod is used by push subscriptions to verify the source of push requests.
type AuthenticationMethod interface {
isAuthMethod() bool
}
// OIDCToken allows PushConfigs to be authenticated using
// the OpenID Connect protocol https://openid.net/connect/
type OIDCToken struct {
// Audience to be used when generating OIDC token. The audience claim
// identifies the recipients that the JWT is intended for. The audience
// value is a single case-sensitive string. Having multiple values (array)
// for the audience field is not supported. More info about the OIDC JWT
// token audience here: https://tools.ietf.org/html/rfc7519#section-4.1.3
// Note: if not specified, the Push endpoint URL will be used.
Audience string
// The service account email to be used for generating the OpenID Connect token.
// The caller of:
// * CreateSubscription
// * UpdateSubscription
// * ModifyPushConfig
// calls must have the iam.serviceAccounts.actAs permission for the service account.
// See https://cloud.google.com/iam/docs/understanding-roles#service-accounts-roles.
ServiceAccountEmail string
}
var _ AuthenticationMethod = (*OIDCToken)(nil)
func (oidcToken *OIDCToken) isAuthMethod() bool { return true }
func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ {
if oidcToken == nil {
return nil
}
return &pb.PushConfig_OidcToken_{
OidcToken: &pb.PushConfig_OidcToken{
Audience: oidcToken.Audience,
ServiceAccountEmail: oidcToken.ServiceAccountEmail,
},
}
}
// Wrapper defines the format of message delivered to push endpoints.
type Wrapper interface {
isWrapper() bool
}
// PubsubWrapper denotes sending the payload to the push endpoint in the form of the JSON
// representation of a PubsubMessage
// (https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage).
type PubsubWrapper struct{}
var _ Wrapper = (*PubsubWrapper)(nil)
func (p *PubsubWrapper) isWrapper() bool { return true }
func (p *PubsubWrapper) toProto() *pb.PushConfig_PubsubWrapper_ {
if p == nil {
return nil
}
return &pb.PushConfig_PubsubWrapper_{
PubsubWrapper: &pb.PushConfig_PubsubWrapper{},
}
}
// NoWrapper denotes not wrapping the payload sent to the push endpoint.
type NoWrapper struct {
WriteMetadata bool
}
var _ Wrapper = (*NoWrapper)(nil)
func (n *NoWrapper) isWrapper() bool { return true }
func (n *NoWrapper) toProto() *pb.PushConfig_NoWrapper_ {
if n == nil {
return nil
}
return &pb.PushConfig_NoWrapper_{
NoWrapper: &pb.PushConfig_NoWrapper{
WriteMetadata: n.WriteMetadata,
},
}
}
// BigQueryConfigState denotes the possible states for a BigQuery Subscription.
type BigQueryConfigState int
const (
// BigQueryConfigStateUnspecified is the default value. This value is unused.
BigQueryConfigStateUnspecified = iota
// BigQueryConfigActive means the subscription can actively send messages to BigQuery.
BigQueryConfigActive
// BigQueryConfigPermissionDenied means the subscription cannot write to the BigQuery table because of permission denied errors.
BigQueryConfigPermissionDenied
// BigQueryConfigNotFound means the subscription cannot write to the BigQuery table because it does not exist.
BigQueryConfigNotFound
// BigQueryConfigSchemaMismatch means the subscription cannot write to the BigQuery table due to a schema mismatch.
BigQueryConfigSchemaMismatch
)
// BigQueryConfig configures the subscription to deliver to a BigQuery table.
type BigQueryConfig struct {
// The name of the table to which to write data, of the form
// {projectId}:{datasetId}.{tableId}
Table string
// When true, use the topic's schema as the columns to write to in BigQuery,
// if it exists.
UseTopicSchema bool
// When true, write the subscription name, message_id, publish_time,
// attributes, and ordering_key to additional columns in the table. The
// subscription name, message_id, and publish_time fields are put in their own
// columns while all other message properties (other than data) are written to
// a JSON object in the attributes column.
WriteMetadata bool
// When true and use_topic_schema is true, any fields that are a part of the
// topic schema that are not part of the BigQuery table schema are dropped
// when writing to BigQuery. Otherwise, the schemas must be kept in sync and
// any messages with extra fields are not written and remain in the
// subscription's backlog.
DropUnknownFields bool
// This is an output-only field that indicates whether or not the subscription can
// receive messages. This field is set only in responses from the server;
// it is ignored if it is set in any requests.
State BigQueryConfigState
}
func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig {
if bc == nil {
return nil
}
// If the config is zero valued, this is the sentinel for
// clearing bigquery config and switch back to pull.
if *bc == (BigQueryConfig{}) {
return nil
}
pbCfg := &pb.BigQueryConfig{
Table: bc.Table,
UseTopicSchema: bc.UseTopicSchema,
WriteMetadata: bc.WriteMetadata,
DropUnknownFields: bc.DropUnknownFields,
State: pb.BigQueryConfig_State(bc.State),
}
return pbCfg
}
// CloudStorageConfigState denotes the possible states for a Cloud Storage Subscription.
type CloudStorageConfigState int
const (
// CloudStorageConfigStateUnspecified is the default value. This value is unused.
CloudStorageConfigStateUnspecified = iota
// CloudStorageConfigActive means the subscription can actively send messages to Cloud Storage.
CloudStorageConfigActive
// CloudStorageConfigPermissionDenied means the subscription cannot write to the Cloud storage bucket because of permission denied errors.
CloudStorageConfigPermissionDenied
// CloudStorageConfigNotFound means the subscription cannot write to the Cloud Storage bucket because it does not exist.
CloudStorageConfigNotFound
)
// Configuration options for how to write the message data to Cloud Storage.
type isCloudStorageOutputFormat interface {
isCloudStorageOutputFormat()
}
// CloudStorageOutputFormatTextConfig is the configuration for writing
// message data in text format. Message payloads will be written to files
// as raw text, separated by a newline.
type CloudStorageOutputFormatTextConfig struct{}
// CloudStorageOutputFormatAvroConfig is the configuration for writing
// message data in Avro format. Message payloads and metadata will be written
// to the files as an Avro binary.
type CloudStorageOutputFormatAvroConfig struct {
// When true, write the subscription name, message_id, publish_time,
// attributes, and ordering_key as additional fields in the output.
WriteMetadata bool
}
func (*CloudStorageOutputFormatTextConfig) isCloudStorageOutputFormat() {}
func (*CloudStorageOutputFormatAvroConfig) isCloudStorageOutputFormat() {}
// CloudStorageConfig configures the subscription to deliver to Cloud Storage.
type CloudStorageConfig struct {
// User-provided name for the Cloud Storage bucket.
// The bucket must be created by the user. The bucket name must be without
// any prefix like "gs://". See the [bucket naming
// requirements] (https://cloud.google.com/storage/docs/buckets#naming).
Bucket string
// User-provided prefix for Cloud Storage filename. See the [object naming
// requirements](https://cloud.google.com/storage/docs/objects#naming).
FilenamePrefix string
// User-provided suffix for Cloud Storage filename. See the [object naming
// requirements](https://cloud.google.com/storage/docs/objects#naming).
FilenameSuffix string
// Configuration for how to write message data. Options are
// CloudStorageOutputFormat_TextConfig and CloudStorageOutputFormat_AvroConfig.
// Defaults to text format.
OutputFormat isCloudStorageOutputFormat
// The maximum duration that can elapse before a new Cloud Storage file is
// created. Min 1 minute, max 10 minutes, default 5 minutes. May not exceed
// the subscription's acknowledgement deadline.
MaxDuration optional.Duration
// The maximum bytes that can be written to a Cloud Storage file before a new
// file is created. Min 1 KB, max 10 GiB. The max_bytes limit may be exceeded
// in cases where messages are larger than the limit.
MaxBytes int64
// Output only. An output-only field that indicates whether or not the
// subscription can receive messages.
State CloudStorageConfigState
}
func (cs *CloudStorageConfig) toProto() *pb.CloudStorageConfig {
if cs == nil {
return nil
}
// For the purposes of the live service, an empty/zero-valued config
// is treated the same as nil and clearing this setting.
if (CloudStorageConfig{}) == *cs {
return nil
}
var dur *durationpb.Duration
if cs.MaxDuration != nil {
dur = durationpb.New(optional.ToDuration(cs.MaxDuration))
}
pbCfg := &pb.CloudStorageConfig{
Bucket: cs.Bucket,
FilenamePrefix: cs.FilenamePrefix,
FilenameSuffix: cs.FilenameSuffix,
MaxDuration: dur,
MaxBytes: cs.MaxBytes,
State: pb.CloudStorageConfig_State(cs.State),
}
if out := cs.OutputFormat; out != nil {
if _, ok := out.(*CloudStorageOutputFormatTextConfig); ok {
pbCfg.OutputFormat = &pb.CloudStorageConfig_TextConfig_{}
} else if cfg, ok := out.(*CloudStorageOutputFormatAvroConfig); ok {
pbCfg.OutputFormat = &pb.CloudStorageConfig_AvroConfig_{
AvroConfig: &pb.CloudStorageConfig_AvroConfig{
WriteMetadata: cfg.WriteMetadata,
},
}
}
}
return pbCfg
}
// SubscriptionState denotes the possible states for a Subscription.
type SubscriptionState int
const (
// SubscriptionStateUnspecified is the default value. This value is unused.
SubscriptionStateUnspecified = iota
// SubscriptionStateActive means the subscription can actively send messages to BigQuery.
SubscriptionStateActive
// SubscriptionStateResourceError means the subscription receive messages because of an
// error with the resource to which it pushes messages.
// See the more detailed error state in the corresponding configuration.
SubscriptionStateResourceError
)
// SubscriptionConfig describes the configuration of a subscription. If none of
// PushConfig, BigQueryConfig, or CloudStorageConfig is set, then the subscriber will
// pull and ack messages using API methods. At most one of these fields may be set.
type SubscriptionConfig struct {
// The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
name string
// The topic from which this subscription is receiving messages.
Topic *Topic
// If push delivery is used with this subscription, this field is
// used to configure it. At most one of `PushConfig`, `BigQueryConfig`,
// or `CloudStorageConfig` can be set. If all are empty, then the
// subscriber will pull and ack messages using API methods.
PushConfig PushConfig
// If delivery to BigQuery is used with this subscription, this field is
// used to configure it. At most one of `PushConfig`, `BigQueryConfig`,
// or `CloudStorageConfig` can be set. If all are empty, then the
// subscriber will pull and ack messages using API methods.
BigQueryConfig BigQueryConfig
// If delivery to Cloud Storage is used with this subscription, this field is
// used to configure it. At most one of `PushConfig`, `BigQueryConfig`,
// or `CloudStorageConfig` can be set. If all are empty, then the
// subscriber will pull and ack messages using API methods.
CloudStorageConfig CloudStorageConfig
// The default maximum time after a subscriber receives a message before
// the subscriber should acknowledge the message. Note: messages which are
// obtained via Subscription.Receive need not be acknowledged within this
// deadline, as the deadline will be automatically extended.
AckDeadline time.Duration
// Whether to retain acknowledged messages. If true, acknowledged messages
// will not be expunged until they fall out of the RetentionDuration window.
RetainAckedMessages bool
// How long to retain messages in backlog, from the time of publish. If
// RetainAckedMessages is true, this duration affects the retention of
// acknowledged messages, otherwise only unacknowledged messages are retained.
// Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes.
RetentionDuration time.Duration
// Expiration policy specifies the conditions for a subscription's expiration.
// A subscription is considered active as long as any connected subscriber is
// successfully consuming messages from the subscription or is issuing
// operations on the subscription. If `expiration_policy` is not set, a
// *default policy* with `ttl` of 31 days will be used. The minimum allowed
// value for `expiration_policy.ttl` is 1 day.
//
// Use time.Duration(0) to indicate that the subscription should never expire.
ExpirationPolicy optional.Duration
// The set of labels for the subscription.
Labels map[string]string
// EnableMessageOrdering enables message ordering on this subscription.
// This value is only used for subscription creation and update, and
// is not read locally in calls like Subscription.Receive().
//
// If set to false, even if messages are published with ordering keys,
// messages will not be delivered in order.
//
// When calling Subscription.Receive(), the client will check this
// value with a call to Subscription.Config(), which requires the
// roles/viewer or roles/pubsub.viewer role on your service account.
// If that call fails, mesages with ordering keys will be delivered in order.
EnableMessageOrdering bool
// DeadLetterPolicy specifies the conditions for dead lettering messages in
// a subscription. If not set, dead lettering is disabled.
DeadLetterPolicy *DeadLetterPolicy
// Filter is an expression written in the Cloud Pub/Sub filter language. If
// non-empty, then only `PubsubMessage`s whose `attributes` field matches the
// filter are delivered on this subscription. If empty, then no messages are
// filtered out. Cannot be changed after the subscription is created.
Filter string
// RetryPolicy specifies how Cloud Pub/Sub retries message delivery.
RetryPolicy *RetryPolicy
// Detached indicates whether the subscription is detached from its topic.
// Detached subscriptions don't receive messages from their topic and don't
// retain any backlog. `Pull` and `StreamingPull` requests will return
// FAILED_PRECONDITION. If the subscription is a push subscription, pushes to
// the endpoint will not be made.
Detached bool
// TopicMessageRetentionDuration indicates the minimum duration for which a message is
// retained after it is published to the subscription's topic. If this field is
// set, messages published to the subscription's topic in the last
// `TopicMessageRetentionDuration` are always available to subscribers.
// You can enable both topic and subscription retention for the same topic.
// In this situation, the maximum of the retention durations takes effect.
//
// This is an output only field, meaning it will only appear in responses from the backend
// and will be ignored if sent in a request.
TopicMessageRetentionDuration time.Duration
// EnableExactlyOnceDelivery configures Pub/Sub to provide the following guarantees
// for the delivery of a message with a given MessageID on this subscription:
//
// The message sent to a subscriber is guaranteed not to be resent
// before the message's acknowledgement deadline expires.
// An acknowledged message will not be resent to a subscriber.
//
// Note that subscribers may still receive multiple copies of a message
// when `enable_exactly_once_delivery` is true if the message was published
// multiple times by a publisher client. These copies are considered distinct
// by Pub/Sub and have distinct MessageID values.
//
// Lastly, to guarantee messages have been acked or nacked properly, you must
// call Message.AckWithResult() or Message.NackWithResult(). These return an
// AckResult which will be ready if the message has been acked (or failed to be acked).
EnableExactlyOnceDelivery bool
// State indicates whether or not the subscription can receive messages.
// This is an output-only field that indicates whether or not the subscription can
// receive messages. This field is set only in responses from the server;
// it is ignored if it is set in any requests.
State SubscriptionState
}
// String returns the globally unique printable name of the subscription config.
// This method only works when the subscription config is returned from the server,
// such as when calling `client.Subscription` or `client.Subscriptions`.
// Otherwise, this will return an empty string.
func (s *SubscriptionConfig) String() string {
return s.name
}
// ID returns the unique identifier of the subscription within its project.
// This method only works when the subscription config is returned from the server,
// such as when calling `client.Subscription` or `client.Subscriptions`.
// Otherwise, this will return an empty string.
func (s *SubscriptionConfig) ID() string {
slash := strings.LastIndex(s.name, "/")
if slash == -1 {
return ""
}
return s.name[slash+1:]
}
func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
var pbPushConfig *pb.PushConfig
if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil {
pbPushConfig = cfg.PushConfig.toProto()
}
pbBigQueryConfig := cfg.BigQueryConfig.toProto()
pbCloudStorageConfig := cfg.CloudStorageConfig.toProto()
var retentionDuration *durpb.Duration
if cfg.RetentionDuration != 0 {
retentionDuration = durpb.New(cfg.RetentionDuration)
}
var pbDeadLetter *pb.DeadLetterPolicy
if cfg.DeadLetterPolicy != nil {
pbDeadLetter = cfg.DeadLetterPolicy.toProto()
}
var pbRetryPolicy *pb.RetryPolicy
if cfg.RetryPolicy != nil {
pbRetryPolicy = cfg.RetryPolicy.toProto()
}
return &pb.Subscription{
Name: name,
Topic: cfg.Topic.name,
PushConfig: pbPushConfig,
BigqueryConfig: pbBigQueryConfig,
CloudStorageConfig: pbCloudStorageConfig,
AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())),
RetainAckedMessages: cfg.RetainAckedMessages,
MessageRetentionDuration: retentionDuration,
Labels: cfg.Labels,
ExpirationPolicy: expirationPolicyToProto(cfg.ExpirationPolicy),
EnableMessageOrdering: cfg.EnableMessageOrdering,
DeadLetterPolicy: pbDeadLetter,
Filter: cfg.Filter,
RetryPolicy: pbRetryPolicy,
Detached: cfg.Detached,
EnableExactlyOnceDelivery: cfg.EnableExactlyOnceDelivery,
}
}
func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionConfig, error) {
rd := time.Hour * 24 * 7
if pbSub.MessageRetentionDuration != nil {
rd = pbSub.MessageRetentionDuration.AsDuration()
}
var expirationPolicy time.Duration
if ttl := pbSub.ExpirationPolicy.GetTtl(); ttl != nil {
expirationPolicy = ttl.AsDuration()
}
dlp := protoToDLP(pbSub.DeadLetterPolicy)
rp := protoToRetryPolicy(pbSub.RetryPolicy)
subC := SubscriptionConfig{
name: pbSub.Name,
Topic: newTopic(c, pbSub.Topic),
AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds),
RetainAckedMessages: pbSub.RetainAckedMessages,
RetentionDuration: rd,
Labels: pbSub.Labels,
ExpirationPolicy: expirationPolicy,
EnableMessageOrdering: pbSub.EnableMessageOrdering,
DeadLetterPolicy: dlp,
Filter: pbSub.Filter,
RetryPolicy: rp,
Detached: pbSub.Detached,
TopicMessageRetentionDuration: pbSub.TopicMessageRetentionDuration.AsDuration(),
EnableExactlyOnceDelivery: pbSub.EnableExactlyOnceDelivery,
State: SubscriptionState(pbSub.State),
}
if pc := protoToPushConfig(pbSub.PushConfig); pc != nil {
subC.PushConfig = *pc
}
if bq := protoToBQConfig(pbSub.GetBigqueryConfig()); bq != nil {
subC.BigQueryConfig = *bq
}
if cs := protoToStorageConfig(pbSub.GetCloudStorageConfig()); cs != nil {
subC.CloudStorageConfig = *cs
}
return subC, nil
}
func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig {
if pbPc == nil {
return nil
}
pc := &PushConfig{
Endpoint: pbPc.PushEndpoint,
Attributes: pbPc.Attributes,
}
if am := pbPc.AuthenticationMethod; am != nil {
if oidcToken, ok := am.(*pb.PushConfig_OidcToken_); ok && oidcToken != nil && oidcToken.OidcToken != nil {
pc.AuthenticationMethod = &OIDCToken{
Audience: oidcToken.OidcToken.GetAudience(),
ServiceAccountEmail: oidcToken.OidcToken.GetServiceAccountEmail(),
}
}
}
if w := pbPc.Wrapper; w != nil {
switch wt := w.(type) {
case *pb.PushConfig_PubsubWrapper_:
pc.Wrapper = &PubsubWrapper{}
case *pb.PushConfig_NoWrapper_:
pc.Wrapper = &NoWrapper{
WriteMetadata: wt.NoWrapper.WriteMetadata,
}
}
}
return pc
}
func protoToBQConfig(pbBQ *pb.BigQueryConfig) *BigQueryConfig {
if pbBQ == nil {
return nil
}
bq := &BigQueryConfig{
Table: pbBQ.GetTable(),
UseTopicSchema: pbBQ.GetUseTopicSchema(),
DropUnknownFields: pbBQ.GetDropUnknownFields(),
WriteMetadata: pbBQ.GetWriteMetadata(),
State: BigQueryConfigState(pbBQ.State),
}
return bq
}
func protoToStorageConfig(pbCSC *pb.CloudStorageConfig) *CloudStorageConfig {
if pbCSC == nil {
return nil
}
csc := &CloudStorageConfig{
Bucket: pbCSC.GetBucket(),
FilenamePrefix: pbCSC.GetFilenamePrefix(),
FilenameSuffix: pbCSC.GetFilenameSuffix(),
MaxBytes: pbCSC.GetMaxBytes(),
State: CloudStorageConfigState(pbCSC.GetState()),
}
if dur := pbCSC.GetMaxDuration().AsDuration(); dur != 0 {
csc.MaxDuration = dur
}
if out := pbCSC.OutputFormat; out != nil {
if _, ok := out.(*pb.CloudStorageConfig_TextConfig_); ok {
csc.OutputFormat = &CloudStorageOutputFormatTextConfig{}
} else if cfg, ok := out.(*pb.CloudStorageConfig_AvroConfig_); ok {
csc.OutputFormat = &CloudStorageOutputFormatAvroConfig{WriteMetadata: cfg.AvroConfig.GetWriteMetadata()}
}
}
return csc
}
// DeadLetterPolicy specifies the conditions for dead lettering messages in
// a subscription.
type DeadLetterPolicy struct {
DeadLetterTopic string
MaxDeliveryAttempts int
}
func (dlp *DeadLetterPolicy) toProto() *pb.DeadLetterPolicy {
if dlp == nil || dlp.DeadLetterTopic == "" {
return nil
}
return &pb.DeadLetterPolicy{
DeadLetterTopic: dlp.DeadLetterTopic,
MaxDeliveryAttempts: int32(dlp.MaxDeliveryAttempts),
}
}
func protoToDLP(pbDLP *pb.DeadLetterPolicy) *DeadLetterPolicy {
if pbDLP == nil {
return nil
}
return &DeadLetterPolicy{
DeadLetterTopic: pbDLP.GetDeadLetterTopic(),
MaxDeliveryAttempts: int(pbDLP.MaxDeliveryAttempts),
}
}
// RetryPolicy specifies how Cloud Pub/Sub retries message delivery.
//
// Retry delay will be exponential based on provided minimum and maximum
// backoffs. https://en.wikipedia.org/wiki/Exponential_backoff.
//
// RetryPolicy will be triggered on NACKs or acknowledgement deadline exceeded
// events for a given message.
//
// Retry Policy is implemented on a best effort basis. At times, the delay
// between consecutive deliveries may not match the configuration. That is,
// delay can be more or less than configured backoff.
type RetryPolicy struct {
// MinimumBackoff is the minimum delay between consecutive deliveries of a
// given message. Value should be between 0 and 600 seconds. Defaults to 10 seconds.
MinimumBackoff optional.Duration
// MaximumBackoff is the maximum delay between consecutive deliveries of a
// given message. Value should be between 0 and 600 seconds. Defaults to 600 seconds.
MaximumBackoff optional.Duration
}
func (rp *RetryPolicy) toProto() *pb.RetryPolicy {
if rp == nil {
return nil
}
// If RetryPolicy is the empty struct, take this as an instruction
// to remove RetryPolicy from the subscription.
if rp.MinimumBackoff == nil && rp.MaximumBackoff == nil {
return nil
}
// Initialize minDur and maxDur to be negative, such that if the conversion from an
// optional fails, RetryPolicy won't be updated in the proto as it will remain nil.
var minDur time.Duration = -1
var maxDur time.Duration = -1
if rp.MinimumBackoff != nil {
minDur = optional.ToDuration(rp.MinimumBackoff)
}
if rp.MaximumBackoff != nil {
maxDur = optional.ToDuration(rp.MaximumBackoff)
}
var minDurPB, maxDurPB *durpb.Duration
if minDur > 0 {
minDurPB = durpb.New(minDur)
}
if maxDur > 0 {
maxDurPB = durpb.New(maxDur)
}
return &pb.RetryPolicy{
MinimumBackoff: minDurPB,
MaximumBackoff: maxDurPB,
}
}
func protoToRetryPolicy(rp *pb.RetryPolicy) *RetryPolicy {
if rp == nil {
return nil
}
var minBackoff, maxBackoff time.Duration
if rp.MinimumBackoff != nil {
minBackoff = rp.MinimumBackoff.AsDuration()
}
if rp.MaximumBackoff != nil {
maxBackoff = rp.MaximumBackoff.AsDuration()
}
retryPolicy := &RetryPolicy{
MinimumBackoff: minBackoff,
MaximumBackoff: maxBackoff,
}
return retryPolicy
}
// ReceiveSettings configure the Receive method.
// A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
type ReceiveSettings struct {
// MaxExtension is the maximum period for which the Subscription should
// automatically extend the ack deadline for each message.
//
// The Subscription will automatically extend the ack deadline of all
// fetched Messages up to the duration specified. Automatic deadline
// extension beyond the initial receipt may be disabled by specifying a
// duration less than 0.
MaxExtension time.Duration
// MaxExtensionPeriod is the maximum duration by which to extend the ack
// deadline at a time. The ack deadline will continue to be extended by up
// to this duration until MaxExtension is reached. Setting MaxExtensionPeriod
// bounds the maximum amount of time before a message redelivery in the
// event the subscriber fails to extend the deadline.
//
// MaxExtensionPeriod must be between 10s and 600s (inclusive). This configuration
// can be disabled by specifying a duration less than (or equal to) 0.
MaxExtensionPeriod time.Duration
// MinExtensionPeriod is the the min duration for a single lease extension attempt.
// By default the 99th percentile of ack latency is used to determine lease extension
// periods but this value can be set to minimize the number of extraneous RPCs sent.
//
// MinExtensionPeriod must be between 10s and 600s (inclusive). This configuration
// can be disabled by specifying a duration less than (or equal to) 0.
// Defaults to off but set to 60 seconds if the subscription has exactly-once delivery enabled,
// which will be added in a future release.
MinExtensionPeriod time.Duration
// MaxOutstandingMessages is the maximum number of unprocessed messages
// (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it
// will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages.
// If the value is negative, then there will be no limit on the number of
// unprocessed messages.
MaxOutstandingMessages int
// MaxOutstandingBytes is the maximum size of unprocessed messages
// (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will
// be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If
// the value is negative, then there will be no limit on the number of bytes
// for unprocessed messages.
MaxOutstandingBytes int
// UseLegacyFlowControl disables enforcing flow control settings at the Cloud
// PubSub server and the less accurate method of only enforcing flow control
// at the client side is used.
// The default is false.
UseLegacyFlowControl bool
// NumGoroutines sets the number of StreamingPull streams to pull messages
// from the subscription.
//
// NumGoroutines defaults to DefaultReceiveSettings.NumGoroutines.
//
// NumGoroutines does not limit the number of messages that can be processed
// concurrently. Even with one goroutine, many messages might be processed at
// once, because that goroutine may continually receive messages and invoke the
// function passed to Receive on them. To limit the number of messages being
// processed concurrently, set MaxOutstandingMessages.
NumGoroutines int
// Synchronous switches the underlying receiving mechanism to unary Pull.
// When Synchronous is false, the more performant StreamingPull is used.
// StreamingPull also has the benefit of subscriber affinity when using
// ordered delivery.
// When Synchronous is true, NumGoroutines is set to 1 and only one Pull
// RPC will be made to poll messages at a time.
// The default is false.
//
// Deprecated.
// Previously, users might use Synchronous mode since StreamingPull had a limitation
// where MaxOutstandingMessages was not always respected with large batches of
// small messages. With server side flow control, this is no longer an issue
// and we recommend switching to the default StreamingPull mode by setting
// Synchronous to false.
// Synchronous mode does not work with exactly once delivery.
Synchronous bool
}
// For synchronous receive, the time to wait if we are already processing
// MaxOutstandingMessages. There is no point calling Pull and asking for zero
// messages, so we pause to allow some message-processing callbacks to finish.
//
// The wait time is large enough to avoid consuming significant CPU, but
// small enough to provide decent throughput. Users who want better
// throughput should not be using synchronous mode.
//
// Waiting might seem like polling, so it's natural to think we could do better by
// noticing when a callback is finished and immediately calling Pull. But if
// callbacks finish in quick succession, this will result in frequent Pull RPCs that
// request a single message, which wastes network bandwidth. Better to wait for a few
// callbacks to finish, so we make fewer RPCs fetching more messages.
//
// This value is unexported so the user doesn't have another knob to think about. Note that
// it is the same value as the one used for nackTicker, so it matches this client's
// idea of a duration that is short, but not so short that we perform excessive RPCs.
const synchronousWaitTime = 100 * time.Millisecond
// DefaultReceiveSettings holds the default values for ReceiveSettings.
var DefaultReceiveSettings = ReceiveSettings{
MaxExtension: 60 * time.Minute,
MaxExtensionPeriod: 0,
MinExtensionPeriod: 0,
MaxOutstandingMessages: 1000,
MaxOutstandingBytes: 1e9, // 1G
NumGoroutines: 10,
}
// Delete deletes the subscription.
func (s *Subscription) Delete(ctx context.Context) error {
return s.c.subc.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: s.name})
}
// Exists reports whether the subscription exists on the server.
func (s *Subscription) Exists(ctx context.Context) (bool, error) {
_, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name})
if err == nil {
return true, nil
}
if status.Code(err) == codes.NotFound {
return false, nil
}
return false, err
}