Skip to content

Commit

Permalink
Merge #67028
Browse files Browse the repository at this point in the history
67028: changefeedccl: implement concurrent execution for webhook sink r=spiffyyeng a=spiffyyeng

Previously, messages sent to the webhook sink were sent synchronously
one after another. This change improves performance by using concurrent
worker goroutines to send sink messages while maintaining ordering
guarantees.

Release note: None

Co-authored-by: Ryan Min <[email protected]>
  • Loading branch information
craig[bot] and spiffyy99 committed Jul 8, 2021
2 parents e003348 + b5bbe1e commit 36ed61e
Show file tree
Hide file tree
Showing 5 changed files with 379 additions and 200 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_library(
"//pkg/util",
"//pkg/util/bitarray",
"//pkg/util/bufalloc",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/errorutil",
"//pkg/util/hlc",
Expand All @@ -94,6 +95,7 @@ go_library(
"//pkg/util/retry",
"//pkg/util/span",
"//pkg/util/syncutil",
"//pkg/util/system",
"//pkg/util/timeofday",
"//pkg/util/timeutil",
"//pkg/util/tracing",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func getSink(
case u.Scheme == changefeedbase.SinkSchemeKafka:
return makeKafkaSink(ctx, sinkURL{URL: u}, feedCfg.Targets, feedCfg.Opts, acc)
case isWebhookSink(u):
return makeWebhookSink(ctx, sinkURL{URL: u}, feedCfg.Opts)
return makeWebhookSink(ctx, sinkURL{URL: u}, feedCfg.Opts, defaultWorkerCount())
case isCloudStorageSink(u):
return makeCloudStorageSink(
ctx, sinkURL{URL: u}, serverCfg.NodeID.SQLInstanceID(), serverCfg.Settings,
Expand Down
175 changes: 154 additions & 21 deletions pkg/ccl/changefeedccl/sink_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"hash/crc32"
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/system"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -63,14 +68,20 @@ func encodePayloadWebhook(value []byte) ([]byte, error) {
}

type webhookSink struct {
ctx context.Context
url sinkURL
authHeader string
client *httputil.Client
cancelFunc func()
ctx context.Context
url sinkURL
authHeader string
parallelism int
client *httputil.Client
workerGroup ctxgroup.Group
cancelFunc func()
eventsChans []chan []byte
inflight *inflightTracker
}

func makeWebhookSink(ctx context.Context, u sinkURL, opts map[string]string) (Sink, error) {
func makeWebhookSink(
ctx context.Context, u sinkURL, opts map[string]string, parallelism int,
) (Sink, error) {
if u.Scheme != changefeedbase.SinkSchemeWebhookHTTPS {
return nil, errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeHTTPS)
}
Expand Down Expand Up @@ -115,9 +126,10 @@ func makeWebhookSink(ctx context.Context, u sinkURL, opts map[string]string) (Si
ctx, cancel := context.WithCancel(ctx)

sink := &webhookSink{
ctx: ctx,
cancelFunc: cancel,
authHeader: opts[changefeedbase.OptWebhookAuthHeader],
ctx: ctx,
authHeader: opts[changefeedbase.OptWebhookAuthHeader],
cancelFunc: cancel,
parallelism: parallelism,
}

var err error
Expand All @@ -137,6 +149,7 @@ func makeWebhookSink(ctx context.Context, u sinkURL, opts map[string]string) (Si
sinkURLParsed.RawQuery = params.Encode()
sink.url = sinkURL{URL: sinkURLParsed}

sink.inflight = makeInflightTracker()
return sink, nil
}

Expand Down Expand Up @@ -187,9 +200,43 @@ func makeWebhookClient(u sinkURL, timeout time.Duration) (*httputil.Client, erro
return client, nil
}

// Dial is a no-op for this sink since we don't necessarily have
// a "health check" endpoint to use.
// defaultWorkerCount() is the number of CPU's on the machine
func defaultWorkerCount() int {
return system.NumCPU()
}

func (s *webhookSink) setupWorkers() {
s.eventsChans = make([]chan []byte, s.parallelism)
s.workerGroup = ctxgroup.WithContext(s.ctx)
for i := 0; i < s.parallelism; i++ {
s.eventsChans[i] = make(chan []byte)
j := i
s.workerGroup.GoCtx(func(ctx context.Context) error {
s.workerLoop(s.eventsChans[j])
return nil
})
}
}

func (s *webhookSink) workerLoop(workerCh chan []byte) {
for {
select {
case <-s.ctx.Done():
return
case value := <-workerCh:
msg, err := encodePayloadWebhook(value)
if err == nil {
err = s.sendMessage(s.ctx, msg)
}
s.inflight.maybeSetError(err)
// reduce inflight count by one
s.inflight.FinishRequest()
}
}
}

func (s *webhookSink) Dial() error {
s.setupWorkers()
return nil
}

Expand Down Expand Up @@ -220,15 +267,88 @@ func (s *webhookSink) sendMessage(ctx context.Context, reqBody []byte) error {
return nil
}

// workerIndex assigns rows each to a worker goroutine based on the hash of its
// primary key. This is to ensure that each message with the same key gets
// deterministically assigned to the same worker. Since we have a channel per
// worker, we can ensure per-worker ordering and therefore guarantee per-key
// ordering.
func (s *webhookSink) workerIndex(key []byte) uint32 {
return crc32.ChecksumIEEE(key) % uint32(s.parallelism)
}

// TODO (ryan min): add memory monitoring for inflight messages
// inflightTracker wraps logic for counting number of inflight messages to
// track when flushing sink, with error handling functionality. Implemented
// as a wrapper for WaitGroup under a lock to block additional messages while
// flushing.
type inflightTracker struct {
inflightGroup sync.WaitGroup
errChan chan error
// lock used here to allow flush to block any new messages being enqueued
// from EmitRow or EmitResolvedTimestamp
flushMu syncutil.Mutex
}

func makeInflightTracker() *inflightTracker {
inflight := &inflightTracker{
inflightGroup: sync.WaitGroup{},
errChan: make(chan error, 1),
}
return inflight
}

// maybeSetError sets flushErr to be err if it has not already been set.
func (i *inflightTracker) maybeSetError(err error) {
if err == nil {
return
}
// errChan has buffer size 1, first error will be saved to the buffer and
// subsequent errors will be ignored
select {
case i.errChan <- err:
default:
}
}

// StartRequest enqueues one inflight message to be flushed.
func (i *inflightTracker) StartRequest() {
i.flushMu.Lock()
defer i.flushMu.Unlock()
i.inflightGroup.Add(1)
}

// FinishRequest tells the inflight tracker one message has been delivered.
func (i *inflightTracker) FinishRequest() {
i.inflightGroup.Done()
}

// Wait waits for all inflight messages to be delivered (inflight = 0) and
// returns a possible error. New messages delivered via EmitRow and
// EmitResolvedTimestamp will be blocked until this returns.
func (i *inflightTracker) Wait() error {
i.flushMu.Lock()
defer i.flushMu.Unlock()
i.inflightGroup.Wait()
var err error
select {
case err = <-i.errChan:
default:
}
return err
}

func (s *webhookSink) EmitRow(
ctx context.Context, _ TopicDescriptor, _, value []byte, _ hlc.Timestamp,
ctx context.Context, _ TopicDescriptor, key, value []byte, _ hlc.Timestamp,
) error {
j, err := encodePayloadWebhook(value)
if err != nil {
return err
}
s.inflight.StartRequest()

return s.sendMessage(ctx, j)
select {
case <-ctx.Done():
return ctx.Err()
// Errors resulting from sending the message will be expressed in Flush.
case s.eventsChans[s.workerIndex(key)] <- value:
}
return nil
}

func (s *webhookSink) EmitResolvedTimestamp(
Expand All @@ -238,17 +358,30 @@ func (s *webhookSink) EmitResolvedTimestamp(
if err != nil {
return err
}

return s.sendMessage(ctx, j)
s.inflight.StartRequest()

// do worker logic directly here instead (there's no point using workers for
// resolved timestamps since there are no keys and everything must be
// inflight order)
err = s.sendMessage(ctx, j)
s.inflight.maybeSetError(err)
s.inflight.FinishRequest()
return nil
}

// Flush() is a no-op for now since calls to EmitRow() are synchronous
// TODO (ryan min): Account for context cancellation.
func (s *webhookSink) Flush(ctx context.Context) error {
return nil
return s.inflight.Wait()
}

func (s *webhookSink) Close() error {
// ignore errors here since we're closing the sink anyway
_ = s.Flush(s.ctx)
s.cancelFunc()
_ = s.workerGroup.Wait()
for _, eventsChan := range s.eventsChans {
close(eventsChan)
}
s.client.CloseIdleConnections()
return nil
}
Expand Down
Loading

0 comments on commit 36ed61e

Please sign in to comment.