Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: copy request body when registering schemas, add schema registry retry metric #98135

Merged
merged 1 commit into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions pkg/ccl/changefeedccl/cdctest/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (

// SchemaRegistry is the kafka schema registry used in tests.
type SchemaRegistry struct {
server *httptest.Server
mu struct {
server *httptest.Server
statusCode int
mu struct {
syncutil.Mutex
idAlloc int32
schemas map[int32]string
Expand All @@ -42,6 +43,15 @@ func StartTestSchemaRegistry() *SchemaRegistry {
return r
}

// StartErrorTestSchemaRegistry creates and starts schema registry for
// tests which will return the supplied statusCode on each request.
func StartErrorTestSchemaRegistry(statusCode int) *SchemaRegistry {
r := makeTestSchemaRegistry()
r.statusCode = statusCode
r.server.Start()
return r
}

// StartTestSchemaRegistryWithTLS creates and starts schema registry
// for tests with TLS enabled.
func StartTestSchemaRegistryWithTLS(
Expand Down Expand Up @@ -114,6 +124,11 @@ var (

// requestHandler routes requests based on the Method and Path of the request.
func (r *SchemaRegistry) requestHandler(hw http.ResponseWriter, hr *http.Request) {
if r.statusCode != 0 {
hw.WriteHeader(r.statusCode)
return
}

path := hr.URL.Path
method := hr.Method

Expand Down
8 changes: 7 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,13 @@ func newChangeFrontierProcessor(
if err != nil {
return nil, err
}
if cf.encoder, err = getEncoder(encodingOpts, AllTargets(spec.Feed), makeExternalConnectionProvider(ctx, flowCtx.Cfg.DB)); err != nil {

sliMertics, err := flowCtx.Cfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics(cf.spec.Feed.Opts[changefeedbase.OptMetricsScope])
if err != nil {
return nil, err
}

if cf.encoder, err = getEncoder(encodingOpts, AllTargets(spec.Feed), makeExternalConnectionProvider(ctx, flowCtx.Cfg.DB), sliMertics); err != nil {
return nil, err
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,11 +562,12 @@ func createChangefeedJobRecord(
return nil, err
}

// Validate the encoder. We can pass an empty slimetrics struct here since the encoder will not be used.
encodingOpts, err := opts.GetEncodingOptions()
if err != nil {
return nil, err
}
if _, err := getEncoder(encodingOpts, AllTargets(details), makeExternalConnectionProvider(ctx, p.ExecCfg().InternalDB)); err != nil {
if _, err := getEncoder(encodingOpts, AllTargets(details), makeExternalConnectionProvider(ctx, p.ExecCfg().InternalDB), nil); err != nil {
return nil, err
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ type Encoder interface {
}

func getEncoder(
opts changefeedbase.EncodingOptions, targets changefeedbase.Targets, p externalConnectionProvider,
opts changefeedbase.EncodingOptions,
targets changefeedbase.Targets,
p externalConnectionProvider,
sliMetrics *sliMetrics,
) (Encoder, error) {
switch opts.Format {
case changefeedbase.OptFormatJSON:
return makeJSONEncoder(opts)
case changefeedbase.OptFormatAvro, changefeedbase.DeprecatedOptFormatAvro:
return newConfluentAvroEncoder(opts, targets, p)
return newConfluentAvroEncoder(opts, targets, p, sliMetrics)
case changefeedbase.OptFormatCSV:
return newCSVEncoder(opts), nil
case changefeedbase.OptFormatParquet:
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/changefeedccl/encoder_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ var encoderCacheConfig = cache.Config{
}

func newConfluentAvroEncoder(
opts changefeedbase.EncodingOptions, targets changefeedbase.Targets, p externalConnectionProvider,
opts changefeedbase.EncodingOptions,
targets changefeedbase.Targets,
p externalConnectionProvider,
sliMetrics *sliMetrics,
) (*confluentAvroEncoder, error) {
e := &confluentAvroEncoder{
schemaPrefix: opts.AvroSchemaPrefix,
Expand Down Expand Up @@ -102,7 +105,7 @@ func newConfluentAvroEncoder(
changefeedbase.OptConfluentSchemaRegistry, changefeedbase.OptFormat, changefeedbase.OptFormatAvro)
}

reg, err := newConfluentSchemaRegistry(opts.SchemaRegistryURI, p)
reg, err := newConfluentSchemaRegistry(opts.SchemaRegistryURI, p, sliMetrics)
if err != nil {
return nil, err
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/ccl/changefeedccl/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestEncoders(t *testing.T) {
return
}
require.NoError(t, o.Validate())
e, err := getEncoder(o, targets, nil)
e, err := getEncoder(o, targets, nil, nil)
require.NoError(t, err)

rowInsert := cdcevent.TestingMakeEventRow(tableDesc, 0, row, false)
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestAvroEncoderWithTLS(t *testing.T) {
StatementTimeName: changefeedbase.StatementTimeName(tableDesc.GetName()),
})

e, err := getEncoder(opts, targets, nil)
e, err := getEncoder(opts, targets, nil, nil)
require.NoError(t, err)

rowInsert := cdcevent.TestingMakeEventRow(tableDesc, 0, row, false)
Expand Down Expand Up @@ -414,7 +414,7 @@ func TestAvroEncoderWithTLS(t *testing.T) {
defer noCertReg.Close()
opts.SchemaRegistryURI = noCertReg.URL()

enc, err := getEncoder(opts, targets, nil)
enc, err := getEncoder(opts, targets, nil, nil)
require.NoError(t, err)
_, err = enc.EncodeKey(context.Background(), rowInsert)
require.Regexp(t, "x509", err)
Expand All @@ -427,7 +427,7 @@ func TestAvroEncoderWithTLS(t *testing.T) {
defer wrongCertReg.Close()
opts.SchemaRegistryURI = wrongCertReg.URL()

enc, err = getEncoder(opts, targets, nil)
enc, err = getEncoder(opts, targets, nil, nil)
require.NoError(t, err)
_, err = enc.EncodeKey(context.Background(), rowInsert)
require.Regexp(t, `contacting confluent schema registry.*: x509`, err)
Expand Down Expand Up @@ -916,7 +916,8 @@ func BenchmarkEncoders(b *testing.B) {
bench := func(b *testing.B, fn encodeFn, opts changefeedbase.EncodingOptions, updatedRows, prevRows []cdcevent.Row) {
b.ReportAllocs()
b.StopTimer()
encoder, err := getEncoder(opts, targets, nil)

encoder, err := getEncoder(opts, targets, nil, nil)
if err != nil {
b.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func newEventConsumer(

makeConsumer := func(s EventSink, frontier frontier) (eventConsumer, error) {
var err error
encoder, err := getEncoder(encodingOpts, feed.Targets, makeExternalConnectionProvider(ctx, cfg.DB))
encoder, err := getEncoder(encodingOpts, feed.Targets, makeExternalConnectionProvider(ctx, cfg.DB), sliMetrics)
if err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type AggMetrics struct {
RunningCount *aggmetric.AggGauge
BatchReductionCount *aggmetric.AggGauge
InternalRetryMessageCount *aggmetric.AggGauge
SchemaRegistryRetries *aggmetric.AggCounter

// There is always at least 1 sliMetrics created for defaultSLI scope.
mu struct {
Expand Down Expand Up @@ -117,6 +118,7 @@ type sliMetrics struct {
RunningCount *aggmetric.Gauge
BatchReductionCount *aggmetric.Gauge
InternalRetryMessageCount *aggmetric.Gauge
SchemaRegistryRetries *aggmetric.Counter
}

// sinkDoesNotCompress is a sentinel value indicating the sink
Expand Down Expand Up @@ -494,6 +496,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Measurement: "Messages",
Unit: metric.Unit_COUNT,
}
metaSchemaRegistryRetriesCount := metric.Metadata{
Name: "changefeed.schema_registry.retry_count",
Help: "Number of retries encountered when sending requests to the schema registry",
Measurement: "Retries",
Unit: metric.Unit_COUNT,
}
// NB: When adding new histograms, use sigFigs = 1. Older histograms
// retain significant figures of 2.
b := aggmetric.MakeBuilder("scope")
Expand Down Expand Up @@ -546,6 +554,7 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
RunningCount: b.Gauge(metaChangefeedRunning),
BatchReductionCount: b.Gauge(metaBatchReductionCount),
InternalRetryMessageCount: b.Gauge(metaInternalRetryMessageCount),
SchemaRegistryRetries: b.Counter(metaSchemaRegistryRetriesCount),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
_, err := a.getOrCreateScope(defaultSLIScope)
Expand Down Expand Up @@ -601,6 +610,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
RunningCount: a.RunningCount.AddChild(scope),
BatchReductionCount: a.BatchReductionCount.AddChild(scope),
InternalRetryMessageCount: a.InternalRetryMessageCount.AddChild(scope),
SchemaRegistryRetries: a.SchemaRegistryRetries.AddChild(scope),
}

a.mu.sliMetrics[scope] = sm
Expand Down
27 changes: 16 additions & 11 deletions pkg/ccl/changefeedccl/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ type confluentSchemaRegistry struct {
// The current defaults for httputil.Client sets
// DisableKeepAlive's true so we don't have persistent
// connections to clean up on teardown.
client *httputil.Client
retryOpts retry.Options
client *httputil.Client
retryOpts retry.Options
sliMetrics *sliMetrics
}

var _ schemaRegistry = (*confluentSchemaRegistry)(nil)
Expand Down Expand Up @@ -97,7 +98,7 @@ func getAndDeleteParams(u *url.URL) (schemaRegistryParams, error) {
}

func newConfluentSchemaRegistry(
baseURL string, p externalConnectionProvider,
baseURL string, p externalConnectionProvider, sliMetrics *sliMetrics,
) (*confluentSchemaRegistry, error) {
u, err := url.Parse(baseURL)
if err != nil {
Expand All @@ -109,7 +110,7 @@ func newConfluentSchemaRegistry(
if err != nil {
return nil, err
}
return newConfluentSchemaRegistry(actual, p)
return newConfluentSchemaRegistry(actual, p, sliMetrics)
}

if u.Scheme != "http" && u.Scheme != "https" {
Expand All @@ -127,11 +128,12 @@ func newConfluentSchemaRegistry(
}

retryOpts := base.DefaultRetryOptions()
retryOpts.MaxRetries = 2
retryOpts.MaxRetries = 5
return &confluentSchemaRegistry{
baseURL: u,
client: httpClient,
retryOpts: retryOpts,
baseURL: u,
client: httpClient,
retryOpts: retryOpts,
sliMetrics: sliMetrics,
}, nil
}

Expand Down Expand Up @@ -195,8 +197,8 @@ func (r *confluentSchemaRegistry) RegisterSchemaForSubject(
}

var id int32
err := r.doWithRetry(ctx, func() error {
resp, err := r.client.Post(ctx, u, confluentSchemaContentType, &buf)
err := r.doWithRetry(ctx, func() (e error) {
resp, err := r.client.Post(ctx, u, confluentSchemaContentType, bytes.NewReader(buf.Bytes()))
if err != nil {
return errors.Wrap(err, "contacting confluent schema registry")
}
Expand Down Expand Up @@ -236,7 +238,10 @@ func (r *confluentSchemaRegistry) doWithRetry(ctx context.Context, fn func() err
if err == nil {
return nil
}
log.VInfof(ctx, 2, "retrying schema registry operation: %s", err.Error())
if r.sliMetrics != nil {
r.sliMetrics.SchemaRegistryRetries.Inc(1)
}
log.VInfof(ctx, 1, "retrying schema registry operation: %s", err.Error())
}
return changefeedbase.MarkRetryableError(err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I will let this PR cook for a few days before backporting. I'm worried that we'll get random roachtest failures bc of this.

}
Expand Down
49 changes: 41 additions & 8 deletions pkg/ccl/changefeedccl/schema_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"errors"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
Expand All @@ -24,12 +26,12 @@ func TestConfluentSchemaRegistry(t *testing.T) {
defer log.Scope(t).Close(t)

t.Run("errors with no scheme", func(t *testing.T) {
_, err := newConfluentSchemaRegistry("justsomestring", nil)
_, err := newConfluentSchemaRegistry("justsomestring", nil, nil)
require.Error(t, err)
})
t.Run("errors with unsupported scheme", func(t *testing.T) {
url := "gopher://myhost"
_, err := newConfluentSchemaRegistry(url, nil)
_, err := newConfluentSchemaRegistry(url, nil, nil)
require.Error(t, err)
})
}
Expand All @@ -56,16 +58,16 @@ func TestConfluentSchemaRegistryExternalConnection(t *testing.T) {
"bad_endpoint": "http://bad",
}

reg, err := newConfluentSchemaRegistry("external://good_endpoint", m)
reg, err := newConfluentSchemaRegistry("external://good_endpoint", m, nil)
require.NoError(t, err)
require.NoError(t, reg.Ping(context.Background()))

// We can load a bad endpoint, but ping should fail.
reg, err = newConfluentSchemaRegistry("external://bad_endpoint", m)
reg, err = newConfluentSchemaRegistry("external://bad_endpoint", m, nil)
require.NoError(t, err)
require.Error(t, reg.Ping(context.Background()))

_, err = newConfluentSchemaRegistry("external://no_endpoint", m)
_, err = newConfluentSchemaRegistry("external://no_endpoint", m, nil)
require.Error(t, err)

}
Expand All @@ -78,18 +80,49 @@ func TestConfluentSchemaRegistryPing(t *testing.T) {
defer regServer.Close()

t.Run("ping works when all is well", func(t *testing.T) {
reg, err := newConfluentSchemaRegistry(regServer.URL(), nil)
reg, err := newConfluentSchemaRegistry(regServer.URL(), nil, nil)
require.NoError(t, err)
require.NoError(t, reg.Ping(context.Background()))
})
t.Run("ping does not error from HTTP 404", func(t *testing.T) {
reg, err := newConfluentSchemaRegistry(regServer.URL()+"/path-does-not-exist-but-we-do-not-care", nil)
reg, err := newConfluentSchemaRegistry(regServer.URL()+"/path-does-not-exist-but-we-do-not-care", nil, nil)
require.NoError(t, err)
require.NoError(t, reg.Ping(context.Background()), "Ping")
})
t.Run("Ping errors with bad host", func(t *testing.T) {
reg, err := newConfluentSchemaRegistry("http://host-does-exist-and-we-care", nil)
reg, err := newConfluentSchemaRegistry("http://host-does-exist-and-we-care", nil, nil)
require.NoError(t, err)
require.Error(t, reg.Ping(context.Background()))
})
}

// TestConfluentSchemaRegistryRetryMetrics verifies that we retry request to the schema registry
// at least 5 times.
func TestConfluentSchemaRegistryRetryMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

regServer := cdctest.StartErrorTestSchemaRegistry(409)
defer regServer.Close()

sliMetrics, err := MakeMetrics(base.DefaultHistogramWindowInterval()).(*Metrics).AggMetrics.getOrCreateScope("")
require.NoError(t, err)

t.Run("ping works when all is well", func(t *testing.T) {
reg, err := newConfluentSchemaRegistry(regServer.URL(), nil, sliMetrics)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
go func() {
_, err = reg.RegisterSchemaForSubject(ctx, "subject1", "schema1")
}()
require.NoError(t, err)
testutils.SucceedsSoon(t, func() error {
if sliMetrics.SchemaRegistryRetries.Value() < 5 {
return errors.New("insufficient retries detected")
}
return nil
})
cancel()
})

}
6 changes: 6 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,12 @@ var charts = []sectionDescription{
"changefeed.internal_retry_message_count",
},
},
{
Title: "Schema Registry Retries",
Metrics: []string{
"changefeed.schema_registry.retry_count",
},
},
},
},
{
Expand Down