Skip to content

Commit

Permalink
feat(pubsub): add publisher flow control support (#4292)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex authored Sep 8, 2021
1 parent 9c9fbb2 commit bff24c3
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 120 deletions.
156 changes: 99 additions & 57 deletions pubsub/flow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,51 @@ package pubsub

import (
"context"
"errors"
"sync/atomic"

"golang.org/x/sync/semaphore"
)

// flowController implements flow control for Subscription.Receive.
// LimitExceededBehavior configures the behavior that flowController can use in case
// the flow control limits are exceeded.
type LimitExceededBehavior int

const (
// FlowControlBlock signals to wait until the request can be made without exceeding the limit.
FlowControlBlock LimitExceededBehavior = iota
// FlowControlIgnore disables flow control.
FlowControlIgnore
// FlowControlSignalError signals an error to the caller of acquire.
FlowControlSignalError
)

// FlowControlSettings controls flow control for messages while publishing or subscribing.
type FlowControlSettings struct {
// MaxOutstandingMessages is the maximum number of bufered messages to be published.
// If less than or equal to zero, this is disabled.
MaxOutstandingMessages int

// MaxOutstandingBytes is the maximum size of buffered messages to be published.
// If less than or equal to zero, this is disabled.
MaxOutstandingBytes int

// LimitExceededBehavior configures the behavior when trying to publish
// additional messages while the flow controller is full. The available options
// include Block (default), Ignore (disable), and SignalError (publish
// results will return an error).
LimitExceededBehavior LimitExceededBehavior
}

var (
// ErrFlowControllerMaxOutstandingMessages indicates that outstanding messages exceeds MaxOutstandingMessages.
ErrFlowControllerMaxOutstandingMessages = errors.New("pubsub: MaxOutstandingMessages flow controller limit exceeded")

// ErrFlowControllerMaxOutstandingBytes indicates that outstanding bytes of messages exceeds MaxOutstandingBytes.
ErrFlowControllerMaxOutstandingBytes = errors.New("pubsub: MaxOutstandingBytes flow control limit exceeded")
)

// flowController implements flow control for publishing and subscribing.
type flowController struct {
maxCount int
maxSize int // max total size of messages
Expand All @@ -33,93 +72,94 @@ type flowController struct {
countRemaining int64
// Number of outstanding bytes remaining. Atomic.
bytesRemaining int64
limitBehavior LimitExceededBehavior
}

// newFlowController creates a new flowController that ensures no more than
// maxCount messages or maxSize bytes are outstanding at once. If maxCount or
// maxSize is < 1, then an unlimited number of messages or bytes is permitted,
// respectively.
func newFlowController(maxCount, maxSize int) *flowController {
fc := &flowController{
maxCount: maxCount,
maxSize: maxSize,
semCount: nil,
semSize: nil,
func newFlowController(fc FlowControlSettings) flowController {
f := flowController{
maxCount: fc.MaxOutstandingMessages,
maxSize: fc.MaxOutstandingBytes,
semCount: nil,
semSize: nil,
limitBehavior: fc.LimitExceededBehavior,
}
if maxCount > 0 {
fc.semCount = semaphore.NewWeighted(int64(maxCount))
if fc.MaxOutstandingMessages > 0 {
f.semCount = semaphore.NewWeighted(int64(fc.MaxOutstandingMessages))
}
if maxSize > 0 {
fc.semSize = semaphore.NewWeighted(int64(maxSize))
if fc.MaxOutstandingBytes > 0 {
f.semSize = semaphore.NewWeighted(int64(fc.MaxOutstandingBytes))
}
return fc
return f
}

// acquire blocks until one message of size bytes can proceed or ctx is done.
// It returns nil in the first case, or ctx.Err() in the second.
// acquire allocates space for a message: the message count and its size.
//
// acquire allows large messages to proceed by treating a size greater than maxSize
// In FlowControlSignalError mode, large messages greater than maxSize
// will be result in an error. In other modes, large messages will be treated
// as if it were equal to maxSize.
func (f *flowController) acquire(ctx context.Context, size int) error {
if f.semCount != nil {
if err := f.semCount.Acquire(ctx, 1); err != nil {
return err
switch f.limitBehavior {
case FlowControlIgnore:
return nil
case FlowControlBlock:
if f.semCount != nil {
if err := f.semCount.Acquire(ctx, 1); err != nil {
return err
}
}
}
if f.semSize != nil {
if err := f.semSize.Acquire(ctx, f.bound(size)); err != nil {
if f.semCount != nil {
f.semCount.Release(1)
if f.semSize != nil {
if err := f.semSize.Acquire(ctx, f.bound(size)); err != nil {
if f.semCount != nil {
f.semCount.Release(1)
}
return err
}
}
case FlowControlSignalError:
if f.semCount != nil {
if !f.semCount.TryAcquire(1) {
return ErrFlowControllerMaxOutstandingMessages
}
}
if f.semSize != nil {
// Try to acquire the full size of the message here.
if !f.semSize.TryAcquire(int64(size)) {
if f.semCount != nil {
f.semCount.Release(1)
}
return ErrFlowControllerMaxOutstandingBytes
}
return err
}
}
outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)

return nil
}

// tryAcquire returns false if acquire would block. Otherwise, it behaves like
// acquire and returns true.
//
// tryAcquire allows large messages to proceed by treating a size greater than
// maxSize as if it were equal to maxSize.
func (f *flowController) tryAcquire(ctx context.Context, size int) bool {
if f.semCount != nil {
if !f.semCount.TryAcquire(1) {
return false
}
outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
}
if f.semSize != nil {
if !f.semSize.TryAcquire(f.bound(size)) {
if f.semCount != nil {
f.semCount.Release(1)
}
return false
}
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)
}
outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)

return true
return nil
}

// release notes that one message of size bytes is no longer outstanding.
func (f *flowController) release(ctx context.Context, size int) {
outstandingMessages := atomic.AddInt64(&f.countRemaining, -1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)
if f.limitBehavior == FlowControlIgnore {
return
}

if f.semCount != nil {
outstandingMessages := atomic.AddInt64(&f.countRemaining, -1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
f.semCount.Release(1)
}
if f.semSize != nil {
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)
f.semSize.Release(f.bound(size))
}
}
Expand All @@ -131,6 +171,8 @@ func (f *flowController) bound(size int) int64 {
return int64(size)
}

// count returns the number of outstanding messages.
// if maxCount is 0, this will always return 0.
func (f *flowController) count() int {
return int(atomic.LoadInt64(&f.countRemaining))
}
69 changes: 28 additions & 41 deletions pubsub/flow_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,18 @@ import (
"golang.org/x/sync/errgroup"
)

func fcSettings(c int, s int, l LimitExceededBehavior) FlowControlSettings {
return FlowControlSettings{
MaxOutstandingMessages: c,
MaxOutstandingBytes: s,
LimitExceededBehavior: l,
}
}

func TestFlowControllerCancel(t *testing.T) {
// Test canceling a flow controller's context.
t.Parallel()
fc := newFlowController(3, 10)
fc := newFlowController(fcSettings(3, 10, FlowControlBlock))
if err := fc.acquire(context.Background(), 5); err != nil {
t.Fatal(err)
}
Expand All @@ -51,7 +59,7 @@ func TestFlowControllerCancel(t *testing.T) {
func TestFlowControllerLargeRequest(t *testing.T) {
// Large requests succeed, consuming the entire allotment.
t.Parallel()
fc := newFlowController(3, 10)
fc := newFlowController(fcSettings(3, 10, FlowControlBlock))
err := fc.acquire(context.Background(), 11)
if err != nil {
t.Fatal(err)
Expand All @@ -64,7 +72,7 @@ func TestFlowControllerNoStarve(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
fc := newFlowController(10, 10)
fc := newFlowController(fcSettings(10, 10, FlowControlBlock))
first := make(chan int)
for i := 0; i < 20; i++ {
go func() {
Expand Down Expand Up @@ -120,7 +128,7 @@ func TestFlowControllerSaturation(t *testing.T) {
wantSize: 9,
},
} {
fc := newFlowController(maxCount, maxSize)
fc := newFlowController(fcSettings(maxCount, maxSize, FlowControlBlock))
// Atomically track flow controller state.
// The flowController itself tracks count.
var curSize int64
Expand Down Expand Up @@ -174,60 +182,39 @@ func TestFlowControllerSaturation(t *testing.T) {
}
}

func TestFlowControllerTryAcquire(t *testing.T) {
t.Parallel()
fc := newFlowController(3, 10)
ctx := context.Background()

// Successfully tryAcquire 4 bytes.
if !fc.tryAcquire(ctx, 4) {
t.Error("got false, wanted true")
}

// Fail to tryAcquire 7 bytes.
if fc.tryAcquire(ctx, 7) {
t.Error("got true, wanted false")
}

// Successfully tryAcquire 6 byte.
if !fc.tryAcquire(ctx, 6) {
t.Error("got false, wanted true")
}
}

func TestFlowControllerUnboundedCount(t *testing.T) {
t.Parallel()
ctx := context.Background()
fc := newFlowController(0, 10)
fc := newFlowController(fcSettings(0, 10, FlowControlSignalError))

// Successfully acquire 4 bytes.
if err := fc.acquire(ctx, 4); err != nil {
t.Errorf("got %v, wanted no error", err)
}

// Successfully tryAcquire 4 bytes.
if !fc.tryAcquire(ctx, 4) {
t.Error("got false, wanted true")
// Successfully acquire 4 bytes.
if err := fc.acquire(ctx, 4); err != nil {
t.Errorf("got %v, wanted no error", err)
}

// Fail to tryAcquire 3 bytes.
if fc.tryAcquire(ctx, 3) {
t.Error("got true, wanted false")
// Fail to acquire 3 bytes.
if err := fc.acquire(ctx, 3); err == nil {
t.Errorf("got nil, wanted %v", ErrFlowControllerMaxOutstandingBytes)
}
}

func TestFlowControllerUnboundedCount2(t *testing.T) {
t.Parallel()
ctx := context.Background()
fc := newFlowController(0, 0)
fc := newFlowController(fcSettings(0, 0, FlowControlSignalError))
// Successfully acquire 4 bytes.
if err := fc.acquire(ctx, 4); err != nil {
t.Errorf("got %v, wanted no error", err)
}
fc.release(ctx, 1)
fc.release(ctx, 1)
fc.release(ctx, 1)
wantCount := int64(-2)
wantCount := int64(0)
c := int64(fc.count())
if c != wantCount {
t.Fatalf("got count %d, want %d", c, wantCount)
Expand All @@ -237,20 +224,20 @@ func TestFlowControllerUnboundedCount2(t *testing.T) {
func TestFlowControllerUnboundedBytes(t *testing.T) {
t.Parallel()
ctx := context.Background()
fc := newFlowController(2, 0)
fc := newFlowController(fcSettings(2, 0, FlowControlSignalError))

// Successfully acquire 4GB.
if err := fc.acquire(ctx, 4e9); err != nil {
t.Errorf("got %v, wanted no error", err)
}

// Successfully tryAcquire 4GB bytes.
if !fc.tryAcquire(ctx, 4e9) {
t.Error("got false, wanted true")
// Successfully acquired 4GB bytes.
if err := fc.acquire(ctx, 4e9); err != nil {
t.Errorf("got %v, wanted no error", err)
}

// Fail to tryAcquire a third message.
if fc.tryAcquire(ctx, 3) {
t.Error("got true, wanted false")
// Fail to acquire a third message.
if err := fc.acquire(ctx, 3); err == nil {
t.Errorf("got nil, wanted %v", ErrFlowControllerMaxOutstandingMessages)
}
}
1 change: 1 addition & 0 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ func TestIntegration_LargePublishSize(t *testing.T) {
msg := &Message{
Data: bytes.Repeat([]byte{'A'}, maxLengthSingleMessage),
}
topic.PublishSettings.FlowControlSettings.LimitExceededBehavior = FlowControlSignalError
r := topic.Publish(ctx, msg)
if _, err := r.Get(ctx); err != nil {
t.Fatalf("Failed to publish max length message: %v", err)
Expand Down
8 changes: 6 additions & 2 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,11 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
maxOutstandingBytes: maxBytes,
useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl,
}
fc := newFlowController(maxCount, maxBytes)
fc := newFlowController(FlowControlSettings{
MaxOutstandingMessages: maxCount,
MaxOutstandingBytes: maxBytes,
LimitExceededBehavior: FlowControlBlock,
})

sched := scheduler.NewReceiveScheduler(maxCount)

Expand Down Expand Up @@ -982,7 +986,7 @@ func (s *Subscription) checkOrdering() {
type pullOptions struct {
maxExtension time.Duration // the maximum time to extend a message's ack deadline in total
maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc
maxPrefetch int32
maxPrefetch int32 // the max number of outstanding messages, used to calculate maxToPull
// If true, use unary Pull instead of StreamingPull, and never pull more
// than maxPrefetch messages.
synchronous bool
Expand Down
Loading

0 comments on commit bff24c3

Please sign in to comment.