-
Notifications
You must be signed in to change notification settings - Fork 7
/
pubsub_source.go
173 lines (142 loc) · 4.8 KB
/
pubsub_source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/**
* Copyright (c) 2020-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/
package pubsubsource
import (
"context"
"fmt"
"time"
"cloud.google.com/go/pubsub"
"github.com/google/uuid"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/models"
"github.com/snowplow/snowbridge/pkg/source/sourceiface"
)
// Configuration configures the source for records pulled
type Configuration struct {
ProjectID string `hcl:"project_id" env:"SOURCE_PUBSUB_PROJECT_ID"`
SubscriptionID string `hcl:"subscription_id" env:"SOURCE_PUBSUB_SUBSCRIPTION_ID"`
ConcurrentWrites int `hcl:"concurrent_writes,optional" env:"SOURCE_CONCURRENT_WRITES"`
}
// pubSubSource holds a new client for reading messages from PubSub
type pubSubSource struct {
projectID string
client *pubsub.Client
subscriptionID string
concurrentWrites int
log *log.Entry
// cancel function to be used to halt reading
cancel context.CancelFunc
}
// configFunction returns a pubsub source from a config
func configFunction(c *Configuration) (sourceiface.Source, error) {
return newPubSubSource(
c.ConcurrentWrites,
c.ProjectID,
c.SubscriptionID,
)
}
// The adapter type is an adapter for functions to be used as
// pluggable components for PubSub Source. It implements the Pluggable interface.
type adapter func(i interface{}) (interface{}, error)
// Create implements the ComponentCreator interface.
func (f adapter) Create(i interface{}) (interface{}, error) {
return f(i)
}
// ProvideDefault implements the ComponentConfigurable interface
func (f adapter) ProvideDefault() (interface{}, error) {
// Provide defaults
cfg := &Configuration{
ConcurrentWrites: 50,
}
return cfg, nil
}
// adapterGenerator returns a PubSub Source adapter.
func adapterGenerator(f func(c *Configuration) (sourceiface.Source, error)) adapter {
return func(i interface{}) (interface{}, error) {
cfg, ok := i.(*Configuration)
if !ok {
return nil, errors.New("invalid input, expected PubSubSourceConfig")
}
return f(cfg)
}
}
// ConfigPair is passed to configuration to determine when to build a Pubsub source.
var ConfigPair = config.ConfigurationPair{
Name: "pubsub",
Handle: adapterGenerator(configFunction),
}
// newPubSubSource creates a new client for reading messages from PubSub
func newPubSubSource(concurrentWrites int, projectID string, subscriptionID string) (*pubSubSource, error) {
ctx := context.Background()
// Ensures as even as possible distribution of UUIDs
uuid.EnableRandPool()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, errors.Wrap(err, "Failed to create PubSub client")
}
return &pubSubSource{
projectID: projectID,
client: client,
subscriptionID: subscriptionID,
concurrentWrites: concurrentWrites,
log: log.WithFields(log.Fields{"source": "pubsub", "cloud": "GCP", "project": projectID, "subscription": subscriptionID}),
}, nil
}
// Read will pull messages from the noted PubSub topic forever
func (ps *pubSubSource) Read(sf *sourceiface.SourceFunctions) error {
ctx := context.Background()
ps.log.Info("Reading messages from subscription ...")
sub := ps.client.Subscription(ps.subscriptionID)
sub.ReceiveSettings.NumGoroutines = ps.concurrentWrites
cctx, cancel := context.WithCancel(ctx)
// Store reference to cancel
ps.cancel = cancel
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
timePulled := time.Now().UTC()
ps.log.Debugf("Read message with ID: %s", msg.ID)
ackFunc := func() {
ps.log.Debugf("Ack'ing message with ID: %s", msg.ID)
msg.Ack()
}
timeCreated := msg.PublishTime.UTC()
messages := []*models.Message{
{
Data: msg.Data,
PartitionKey: uuid.New().String(),
AckFunc: ackFunc,
TimeCreated: timeCreated,
TimePulled: timePulled,
},
}
err := sf.WriteToTarget(messages)
if err != nil {
ps.log.WithFields(log.Fields{"error": err}).Error(err)
}
})
if err != nil {
return errors.Wrap(err, "Failed to read from PubSub topic")
}
return nil
}
// Stop will halt the reader processing more events
func (ps *pubSubSource) Stop() {
if ps.cancel != nil {
ps.log.Warn("Cancelling PubSub receive ...")
ps.cancel()
}
ps.cancel = nil
}
// GetID returns the identifier for this source
func (ps *pubSubSource) GetID() string {
return fmt.Sprintf("projects/%s/subscriptions/%s", ps.projectID, ps.subscriptionID)
}