From e5780eab4abd982e49c19f98c3a88029ae2b6323 Mon Sep 17 00:00:00 2001 From: adatzer Date: Fri, 4 Oct 2024 16:58:03 +0300 Subject: [PATCH] Add configuration settings for PubSub source --- .../sources/pubsub-full-example.hcl | 8 +++- pkg/source/pubsub/pubsub_source.go | 40 ++++++++++++------- pkg/source/pubsub/pubsub_source_test.go | 4 +- 3 files changed, 35 insertions(+), 17 deletions(-) diff --git a/assets/docs/configuration/sources/pubsub-full-example.hcl b/assets/docs/configuration/sources/pubsub-full-example.hcl index d74e828d..4a00ac8e 100644 --- a/assets/docs/configuration/sources/pubsub-full-example.hcl +++ b/assets/docs/configuration/sources/pubsub-full-example.hcl @@ -10,5 +10,11 @@ source { # Maximum concurrent goroutines (lightweight threads) for message processing (default: 50) concurrent_writes = 20 + + # Maximum number of unprocessed messages (default 1000) + max_outstanding_messages = 2000 + + # Maximum size of unprocessed messages (default 1e9) + max_outstanding_bytes = 2e9 } -} \ No newline at end of file +} diff --git a/pkg/source/pubsub/pubsub_source.go b/pkg/source/pubsub/pubsub_source.go index 9ec8ef99..76708b96 100644 --- a/pkg/source/pubsub/pubsub_source.go +++ b/pkg/source/pubsub/pubsub_source.go @@ -28,17 +28,21 @@ import ( // Configuration configures the source for records pulled type Configuration struct { - ProjectID string `hcl:"project_id"` - SubscriptionID string `hcl:"subscription_id"` - ConcurrentWrites int `hcl:"concurrent_writes,optional"` + ProjectID string `hcl:"project_id"` + SubscriptionID string `hcl:"subscription_id"` + ConcurrentWrites int `hcl:"concurrent_writes,optional"` + MaxOutstandingMessages int `hcl:"max_outstanding_messages,optional"` + MaxOutstandingBytes int `hcl:"max_outstanding_bytes,optional"` } // pubSubSource holds a new client for reading messages from PubSub type pubSubSource struct { - projectID string - client *pubsub.Client - subscriptionID string - concurrentWrites int + projectID string + client *pubsub.Client + subscriptionID string + concurrentWrites int + maxOutstandingMessages int + maxOutstandingBytes int log *log.Entry @@ -52,6 +56,8 @@ func configFunction(c *Configuration) (sourceiface.Source, error) { c.ConcurrentWrites, c.ProjectID, c.SubscriptionID, + c.MaxOutstandingMessages, + c.MaxOutstandingBytes, ) } @@ -68,7 +74,9 @@ func (f adapter) Create(i interface{}) (interface{}, error) { func (f adapter) ProvideDefault() (interface{}, error) { // Provide defaults cfg := &Configuration{ - ConcurrentWrites: 50, + ConcurrentWrites: 50, + MaxOutstandingMessages: 1000, + MaxOutstandingBytes: 1e9, } return cfg, nil @@ -93,7 +101,7 @@ var ConfigPair = config.ConfigurationPair{ } // newPubSubSource creates a new client for reading messages from PubSub -func newPubSubSource(concurrentWrites int, projectID string, subscriptionID string) (*pubSubSource, error) { +func newPubSubSource(concurrentWrites int, projectID string, subscriptionID string, maxOutstandingMessages, maxOutstandingBytes int) (*pubSubSource, error) { ctx := context.Background() // Ensures as even as possible distribution of UUIDs @@ -105,11 +113,13 @@ func newPubSubSource(concurrentWrites int, projectID string, subscriptionID stri } return &pubSubSource{ - projectID: projectID, - client: client, - subscriptionID: subscriptionID, - concurrentWrites: concurrentWrites, - log: log.WithFields(log.Fields{"source": "pubsub", "cloud": "GCP", "project": projectID, "subscription": subscriptionID}), + projectID: projectID, + client: client, + subscriptionID: subscriptionID, + concurrentWrites: concurrentWrites, + maxOutstandingMessages: maxOutstandingMessages, + maxOutstandingBytes: maxOutstandingBytes, + log: log.WithFields(log.Fields{"source": "pubsub", "cloud": "GCP", "project": projectID, "subscription": subscriptionID}), }, nil } @@ -121,6 +131,8 @@ func (ps *pubSubSource) Read(sf *sourceiface.SourceFunctions) error { sub := ps.client.Subscription(ps.subscriptionID) sub.ReceiveSettings.NumGoroutines = ps.concurrentWrites + sub.ReceiveSettings.MaxOutstandingMessages = ps.maxOutstandingMessages + sub.ReceiveSettings.MaxOutstandingBytes = ps.maxOutstandingBytes cctx, cancel := context.WithCancel(ctx) diff --git a/pkg/source/pubsub/pubsub_source_test.go b/pkg/source/pubsub/pubsub_source_test.go index 0e218acd..db632d44 100644 --- a/pkg/source/pubsub/pubsub_source_test.go +++ b/pkg/source/pubsub/pubsub_source_test.go @@ -116,7 +116,7 @@ func TestNewPubSubSource_Success(t *testing.T) { testutil.InitMockPubsubServer(8010, nil, t) - pubsubSource, err := newPubSubSource(10, "project-test", "test-sub") + pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9) assert.Nil(err) assert.IsType(&pubSubSource{}, pubsubSource) // This should return an error when we can't connect, rather than proceeding to the Write() function before we hit a problem. @@ -141,7 +141,7 @@ func TestPubSubSource_ReadAndReturnSuccessWithMock(t *testing.T) { } wg.Wait() - pubsubSource, err := newPubSubSource(10, "project-test", "test-sub") + pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9) assert.NotNil(pubsubSource) assert.Nil(err)