Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configuration settings for PubSub source #374

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion assets/docs/configuration/sources/pubsub-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,11 @@ source {

# Maximum concurrent goroutines (lightweight threads) for message processing (default: 50)
concurrent_writes = 20

# Maximum number of unprocessed messages (default 1000)
max_outstanding_messages = 2000

# Maximum size of unprocessed messages (default 1e9)
max_outstanding_bytes = 2e9

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with the go sdk for pubsub, but I am very familiar with the java sdk for pubsub.

In the java sdk, these settings apply per streaming pull. E.g. if the app opens 50 streaming pulls, then your default setting will allow 50 * 2e9 outstanding bytes in total across all streaming pulls, i.e. 100 GB.

In Snowbridge the number of streaming pulls is set on this line (NumGoRoutines). Snowbridge sets it equal to the value of concurrent writes.

Copy link
Contributor Author

@adatzer adatzer Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment, @istreeter, it made me look a bit deeper.

To start with what this PR does, before exposing these settings, we were implicitly using these defaults from the client library (1000 and 1e9) already. In this PR these are not changed, only made explicit, so that they can be configured.

This particular file shows 2000 and 2e9 as example values, the defaults have been and still are half of that. Still, as example values are valid (based on these recommendations).

Indeed NumGoRoutines is the number of streaming pull connections maintained by the client.
However as far as i can tell so far, the MaxOutstanding* settings are meant to throttle the throughput no matter how many goroutines for pulling messages there are.

So my understanding is that these flow control settings do limit the outstanding messages/bytes per subscriber client. For example, if max_outstanding_bytes is set to 2e9 then, as long as the setting is respected(caveats seem to exist), it will allow up to 2GB of outstanding size across all streaming pulls.

Here is how the client flow control acquires a message. This happens for each message, that was received for each goroutine/streaming pull.

Having said that, i'd be surprised if the java and go clients are different by such a margin in memory settings, which makes me doubt my understanding. Please do share any more pointers!

Copy link

@istreeter istreeter Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the java sdk, it also does exactly what you describe if "legacy flow control" is enabled, i.e. it uses a local FlowController which is shared across all streaming pulls.

But if it's using non-legacy flow control then it's slightly different. In the java sdk if you pick non-legacy flow control, then instead of using the local FlowController, it instead sets the options .setMaxOutstandingBytes on the streaming pull request. (Edit: Forgot to add link)

I notice the Go sdk also has an option UseLegacyFlowControl here on the ReceiveSettings.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate your comment about the purpose of this PR, and how this just makes explicit a setting that was previously non-configurable. That all makes sense 👍

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for these comments Ian - very helpful in my current investigation of pubsub issues and how these settings should be configured!

}
}
}
40 changes: 26 additions & 14 deletions pkg/source/pubsub/pubsub_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,21 @@ import (

// Configuration configures the source for records pulled
type Configuration struct {
ProjectID string `hcl:"project_id"`
SubscriptionID string `hcl:"subscription_id"`
ConcurrentWrites int `hcl:"concurrent_writes,optional"`
ProjectID string `hcl:"project_id"`
SubscriptionID string `hcl:"subscription_id"`
ConcurrentWrites int `hcl:"concurrent_writes,optional"`
MaxOutstandingMessages int `hcl:"max_outstanding_messages,optional"`
MaxOutstandingBytes int `hcl:"max_outstanding_bytes,optional"`
}

// pubSubSource holds a new client for reading messages from PubSub
type pubSubSource struct {
projectID string
client *pubsub.Client
subscriptionID string
concurrentWrites int
projectID string
client *pubsub.Client
subscriptionID string
concurrentWrites int
maxOutstandingMessages int
maxOutstandingBytes int

log *log.Entry

Expand All @@ -52,6 +56,8 @@ func configFunction(c *Configuration) (sourceiface.Source, error) {
c.ConcurrentWrites,
c.ProjectID,
c.SubscriptionID,
c.MaxOutstandingMessages,
c.MaxOutstandingBytes,
)
}

Expand All @@ -68,7 +74,9 @@ func (f adapter) Create(i interface{}) (interface{}, error) {
func (f adapter) ProvideDefault() (interface{}, error) {
// Provide defaults
cfg := &Configuration{
ConcurrentWrites: 50,
ConcurrentWrites: 50,
MaxOutstandingMessages: 1000,
MaxOutstandingBytes: 1e9,
}

return cfg, nil
Expand All @@ -93,7 +101,7 @@ var ConfigPair = config.ConfigurationPair{
}

// newPubSubSource creates a new client for reading messages from PubSub
func newPubSubSource(concurrentWrites int, projectID string, subscriptionID string) (*pubSubSource, error) {
func newPubSubSource(concurrentWrites int, projectID string, subscriptionID string, maxOutstandingMessages, maxOutstandingBytes int) (*pubSubSource, error) {
ctx := context.Background()

// Ensures as even as possible distribution of UUIDs
Expand All @@ -105,11 +113,13 @@ func newPubSubSource(concurrentWrites int, projectID string, subscriptionID stri
}

return &pubSubSource{
projectID: projectID,
client: client,
subscriptionID: subscriptionID,
concurrentWrites: concurrentWrites,
log: log.WithFields(log.Fields{"source": "pubsub", "cloud": "GCP", "project": projectID, "subscription": subscriptionID}),
projectID: projectID,
client: client,
subscriptionID: subscriptionID,
concurrentWrites: concurrentWrites,
maxOutstandingMessages: maxOutstandingMessages,
maxOutstandingBytes: maxOutstandingBytes,
log: log.WithFields(log.Fields{"source": "pubsub", "cloud": "GCP", "project": projectID, "subscription": subscriptionID}),
}, nil
}

Expand All @@ -121,6 +131,8 @@ func (ps *pubSubSource) Read(sf *sourceiface.SourceFunctions) error {

sub := ps.client.Subscription(ps.subscriptionID)
sub.ReceiveSettings.NumGoroutines = ps.concurrentWrites
sub.ReceiveSettings.MaxOutstandingMessages = ps.maxOutstandingMessages
sub.ReceiveSettings.MaxOutstandingBytes = ps.maxOutstandingBytes

cctx, cancel := context.WithCancel(ctx)

Expand Down
4 changes: 2 additions & 2 deletions pkg/source/pubsub/pubsub_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestNewPubSubSource_Success(t *testing.T) {

testutil.InitMockPubsubServer(8010, nil, t)

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub")
pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9)
assert.Nil(err)
assert.IsType(&pubSubSource{}, pubsubSource)
// This should return an error when we can't connect, rather than proceeding to the Write() function before we hit a problem.
Expand All @@ -141,7 +141,7 @@ func TestPubSubSource_ReadAndReturnSuccessWithMock(t *testing.T) {
}
wg.Wait()

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub")
pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9)

assert.NotNil(pubsubSource)
assert.Nil(err)
Expand Down
Loading