From 3b083ade2ce0d180e9ede1761885de740543e0e7 Mon Sep 17 00:00:00 2001 From: Rodolfo Carvalho Date: Fri, 21 May 2021 15:52:59 +0200 Subject: [PATCH] feat(transports): Category-based Rate Limiting (#354) This adds support for parsing the X-Sentry-Rate-Limits and rate limiting errors and transactions independently. --- internal/ratelimit/category.go | 41 ++++ internal/ratelimit/category_test.go | 25 +++ internal/ratelimit/deadline.go | 22 ++ internal/ratelimit/doc.go | 3 + internal/ratelimit/map.go | 64 ++++++ internal/ratelimit/map_test.go | 269 +++++++++++++++++++++++++ internal/ratelimit/rate_limits.go | 76 +++++++ internal/ratelimit/rate_limits_test.go | 150 ++++++++++++++ internal/ratelimit/retry_after.go | 40 ++++ internal/ratelimit/retry_after_test.go | 58 ++++++ transport.go | 128 +++++++----- transport_test.go | 108 +++++++--- 12 files changed, 904 insertions(+), 80 deletions(-) create mode 100644 internal/ratelimit/category.go create mode 100644 internal/ratelimit/category_test.go create mode 100644 internal/ratelimit/deadline.go create mode 100644 internal/ratelimit/doc.go create mode 100644 internal/ratelimit/map.go create mode 100644 internal/ratelimit/map_test.go create mode 100644 internal/ratelimit/rate_limits.go create mode 100644 internal/ratelimit/rate_limits_test.go create mode 100644 internal/ratelimit/retry_after.go create mode 100644 internal/ratelimit/retry_after_test.go diff --git a/internal/ratelimit/category.go b/internal/ratelimit/category.go new file mode 100644 index 000000000..44d0a5688 --- /dev/null +++ b/internal/ratelimit/category.go @@ -0,0 +1,41 @@ +package ratelimit + +import "strings" + +// Reference: +// https://github.com/getsentry/relay/blob/0424a2e017d193a93918053c90cdae9472d164bf/relay-common/src/constants.rs#L116-L127 + +// Category classifies supported payload types that can be ingested by Sentry +// and, therefore, rate limited. +type Category string + +// Known rate limit categories. As a special case, the CategoryAll applies to +// all known payload types. +const ( + CategoryAll Category = "" + CategoryError Category = "error" + CategoryTransaction Category = "transaction" +) + +// knownCategories is the set of currently known categories. Other categories +// are ignored for the purpose of rate-limiting. +var knownCategories = map[Category]struct{}{ + CategoryAll: {}, + CategoryError: {}, + CategoryTransaction: {}, +} + +// String returns the category formatted for debugging. +func (c Category) String() string { + switch c { + case "": + return "CategoryAll" + default: + var b strings.Builder + b.WriteString("Category") + for _, w := range strings.Fields(string(c)) { + b.WriteString(strings.Title(w)) + } + return b.String() + } +} diff --git a/internal/ratelimit/category_test.go b/internal/ratelimit/category_test.go new file mode 100644 index 000000000..7b686449c --- /dev/null +++ b/internal/ratelimit/category_test.go @@ -0,0 +1,25 @@ +package ratelimit + +import "testing" + +func TestCategoryString(t *testing.T) { + tests := []struct { + Category + want string + }{ + {CategoryAll, "CategoryAll"}, + {CategoryError, "CategoryError"}, + {CategoryTransaction, "CategoryTransaction"}, + {Category("unknown"), "CategoryUnknown"}, + {Category("two words"), "CategoryTwoWords"}, + } + for _, tt := range tests { + tt := tt + t.Run(tt.want, func(t *testing.T) { + got := tt.Category.String() + if got != tt.want { + t.Errorf("got %q, want %q", got, tt.want) + } + }) + } +} diff --git a/internal/ratelimit/deadline.go b/internal/ratelimit/deadline.go new file mode 100644 index 000000000..c00258335 --- /dev/null +++ b/internal/ratelimit/deadline.go @@ -0,0 +1,22 @@ +package ratelimit + +import "time" + +// A Deadline is a time instant when a rate limit expires. +type Deadline time.Time + +// After reports whether the deadline d is after other. +func (d Deadline) After(other Deadline) bool { + return time.Time(d).After(time.Time(other)) +} + +// Equal reports whether d and e represent the same deadline. +func (d Deadline) Equal(e Deadline) bool { + return time.Time(d).Equal(time.Time(e)) +} + +// String returns the deadline formatted for debugging. +func (d Deadline) String() string { + // Like time.Time.String, but without the monotonic clock reading. + return time.Time(d).Round(0).String() +} diff --git a/internal/ratelimit/doc.go b/internal/ratelimit/doc.go new file mode 100644 index 000000000..80b9fdda2 --- /dev/null +++ b/internal/ratelimit/doc.go @@ -0,0 +1,3 @@ +// Package ratelimit provides tools to work with rate limits imposed by Sentry's +// data ingestion pipeline. +package ratelimit diff --git a/internal/ratelimit/map.go b/internal/ratelimit/map.go new file mode 100644 index 000000000..e590430ec --- /dev/null +++ b/internal/ratelimit/map.go @@ -0,0 +1,64 @@ +package ratelimit + +import ( + "net/http" + "time" +) + +// Map maps categories to rate limit deadlines. +// +// A rate limit is in effect for a given category if either the category's +// deadline or the deadline for the special CategoryAll has not yet expired. +// +// Use IsRateLimited to check whether a category is rate-limited. +type Map map[Category]Deadline + +// IsRateLimited returns true if the category is currently rate limited. +func (m Map) IsRateLimited(c Category) bool { + return m.isRateLimited(c, time.Now()) +} + +func (m Map) isRateLimited(c Category, now time.Time) bool { + return m.Deadline(c).After(Deadline(now)) +} + +// Deadline returns the deadline when the rate limit for the given category or +// the special CategoryAll expire, whichever is furthest into the future. +func (m Map) Deadline(c Category) Deadline { + categoryDeadline := m[c] + allDeadline := m[CategoryAll] + if categoryDeadline.After(allDeadline) { + return categoryDeadline + } + return allDeadline +} + +// Merge merges the other map into m. +// +// If a category appears in both maps, the deadline that is furthest into the +// future is preserved. +func (m Map) Merge(other Map) { + for c, d := range other { + if d.After(m[c]) { + m[c] = d + } + } +} + +// FromResponse returns a rate limit map from an HTTP response. +func FromResponse(r *http.Response) Map { + return fromResponse(r, time.Now()) +} + +func fromResponse(r *http.Response, now time.Time) Map { + s := r.Header.Get("X-Sentry-Rate-Limits") + if s != "" { + return parseXSentryRateLimits(s, now) + } + if r.StatusCode == http.StatusTooManyRequests { + s := r.Header.Get("Retry-After") + deadline, _ := parseRetryAfter(s, now) + return Map{CategoryAll: deadline} + } + return Map{} +} diff --git a/internal/ratelimit/map_test.go b/internal/ratelimit/map_test.go new file mode 100644 index 000000000..4e6b555e0 --- /dev/null +++ b/internal/ratelimit/map_test.go @@ -0,0 +1,269 @@ +package ratelimit + +import ( + "net/http" + "testing" + "time" + + "github.com/google/go-cmp/cmp" +) + +func TestFromResponse(t *testing.T) { + tests := []struct { + name string + response *http.Response + want Map + }{ + { + "200 no rate limit", + &http.Response{ + StatusCode: http.StatusOK, + }, + Map{}, + }, + { + "200 ignored Retry-After", + &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{ + "Retry-After": []string{"100"}, // ignored + }, + }, + Map{}, + }, + { + "200 Retry-After + X-Sentry-Rate-Limits", + &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{ + "Retry-After": []string{"100"}, // ignored + "X-Sentry-Rate-Limits": []string{"50:transaction"}, + }, + }, + Map{CategoryTransaction: Deadline(now.Add(50 * time.Second))}, + }, + { + "200 X-Sentry-Rate-Limits", + &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{ + "X-Sentry-Rate-Limits": []string{"50:transaction"}, + }, + }, + Map{CategoryTransaction: Deadline(now.Add(50 * time.Second))}, + }, + { + "429 no rate limit, use default", + &http.Response{ + StatusCode: http.StatusTooManyRequests, + }, + Map{CategoryAll: Deadline(now.Add(defaultRetryAfter))}, + }, + { + "429 Retry-After", + &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{ + "Retry-After": []string{"100"}, + }, + }, + Map{CategoryAll: Deadline(now.Add(100 * time.Second))}, + }, + { + "429 X-Sentry-Rate-Limits", + &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{ + "X-Sentry-Rate-Limits": []string{"50:error"}, + }, + }, + Map{CategoryError: Deadline(now.Add(50 * time.Second))}, + }, + { + "429 Retry-After + X-Sentry-Rate-Limits", + &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{ + "Retry-After": []string{"100"}, // ignored + "X-Sentry-Rate-Limits": []string{"50:error"}, + }, + }, + Map{CategoryError: Deadline(now.Add(50 * time.Second))}, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + got := fromResponse(tt.response, now) + if diff := cmp.Diff(tt.want, got); diff != "" { + t.Errorf("(-want +got):\n%s", diff) + } + }) + } +} + +func TestMapDeadlineIsRateLimited(t *testing.T) { + noDeadline := Deadline{} + plus5s := Deadline(now.Add(5 * time.Second)) + plus10s := Deadline(now.Add(10 * time.Second)) + future := now.Add(time.Hour) + + tests := []struct { + name string + m Map + want map[Category]Deadline + }{ + { + "Empty map = no deadlines", + Map{}, + map[Category]Deadline{ + CategoryAll: noDeadline, + CategoryError: noDeadline, + CategoryTransaction: noDeadline, + Category("unknown"): noDeadline, + }, + }, + { + "Only one category", + Map{ + CategoryError: plus5s, + }, + map[Category]Deadline{ + CategoryAll: noDeadline, + CategoryError: plus5s, + CategoryTransaction: noDeadline, + Category("unknown"): noDeadline, + }, + }, + { + "Only CategoryAll", + Map{ + CategoryAll: plus5s, + }, + map[Category]Deadline{ + CategoryAll: plus5s, + CategoryError: plus5s, + CategoryTransaction: plus5s, + Category("unknown"): plus5s, + }, + }, + { + "Two categories", + Map{ + CategoryError: plus5s, + CategoryTransaction: plus10s, + }, + map[Category]Deadline{ + CategoryAll: noDeadline, + CategoryError: plus5s, + CategoryTransaction: plus10s, + Category("unknown"): noDeadline, + }, + }, + { + "CategoryAll earlier", + Map{ + CategoryAll: plus5s, + CategoryTransaction: plus10s, + }, + map[Category]Deadline{ + CategoryAll: plus5s, + CategoryError: plus5s, + CategoryTransaction: plus10s, + Category("unknown"): plus5s, + }, + }, + { + "CategoryAll later", + Map{ + CategoryAll: plus10s, + CategoryTransaction: plus5s, + }, + map[Category]Deadline{ + CategoryAll: plus10s, + CategoryError: plus10s, + CategoryTransaction: plus10s, + Category("unknown"): plus10s, + }, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + for c, want := range tt.want { + got := tt.m.Deadline(c) + if got != want { + t.Fatalf("Deadline(%v): got %v, want %v", c, got, want) + } + limited := tt.m.isRateLimited(c, now) + wantLimited := want != noDeadline + if limited != wantLimited { + t.Errorf("isRateLimited(%v, now): got %v, want %v", c, limited, wantLimited) + } + // Nothing should be rate-limited in the future + limited = tt.m.isRateLimited(c, future) + wantLimited = false + if limited != wantLimited { + t.Errorf("isRateLimited(%v, future): got %v, want %v", c, limited, wantLimited) + } + } + }) + } +} + +func TestMapMerge(t *testing.T) { + tests := []struct { + name string + old, new Map + want Map + }{ + { + name: "both empty", + old: Map{}, + new: Map{}, + want: Map{}, + }, + { + name: "old empty", + old: Map{}, + new: Map{CategoryError: Deadline(now)}, + want: Map{CategoryError: Deadline(now)}, + }, + { + name: "new empty", + old: Map{CategoryError: Deadline(now)}, + new: Map{}, + want: Map{CategoryError: Deadline(now)}, + }, + { + name: "no overlap = union", + old: Map{CategoryTransaction: Deadline(now)}, + new: Map{CategoryError: Deadline(now)}, + want: Map{ + CategoryTransaction: Deadline(now), + CategoryError: Deadline(now), + }, + }, + { + name: "overlap keep old", + old: Map{CategoryError: Deadline(now.Add(time.Minute))}, + new: Map{CategoryError: Deadline(now)}, + want: Map{CategoryError: Deadline(now.Add(time.Minute))}, + }, + { + name: "overlap replace with new", + old: Map{CategoryError: Deadline(now)}, + new: Map{CategoryError: Deadline(now.Add(time.Minute))}, + want: Map{CategoryError: Deadline(now.Add(time.Minute))}, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + tt.old.Merge(tt.new) + if diff := cmp.Diff(tt.want, tt.old); diff != "" { + t.Errorf("(-want +got):\n%s", diff) + } + }) + } +} diff --git a/internal/ratelimit/rate_limits.go b/internal/ratelimit/rate_limits.go new file mode 100644 index 000000000..2dcda27c2 --- /dev/null +++ b/internal/ratelimit/rate_limits.go @@ -0,0 +1,76 @@ +package ratelimit + +import ( + "errors" + "math" + "strconv" + "strings" + "time" +) + +var errInvalidXSRLRetryAfter = errors.New("invalid retry-after value") + +// parseXSentryRateLimits returns a RateLimits map by parsing an input string in +// the format of the X-Sentry-Rate-Limits header. +// +// Example +// +// X-Sentry-Rate-Limits: 60:transaction, 2700:default;error;security +// +// This will rate limit transactions for the next 60 seconds and errors for the +// next 2700 seconds. +// +// Limits for unknown categories are ignored. +func parseXSentryRateLimits(s string, now time.Time) Map { + // https://github.com/getsentry/relay/blob/0424a2e017d193a93918053c90cdae9472d164bf/relay-server/src/utils/rate_limits.rs#L44-L82 + m := make(Map, len(knownCategories)) + for _, limit := range strings.Split(s, ",") { + limit = strings.TrimSpace(limit) + if limit == "" { + continue + } + components := strings.Split(limit, ":") + if len(components) == 0 { + continue + } + retryAfter, err := parseXSRLRetryAfter(strings.TrimSpace(components[0]), now) + if err != nil { + continue + } + categories := "" + if len(components) > 1 { + categories = components[1] + } + for _, category := range strings.Split(categories, ";") { + c := Category(strings.ToLower(strings.TrimSpace(category))) + if _, ok := knownCategories[c]; !ok { + // skip unknown categories, keep m small + continue + } + // always keep the deadline furthest into the future + if retryAfter.After(m[c]) { + m[c] = retryAfter + } + } + } + return m +} + +// parseXSRLRetryAfter parses a string into a retry-after rate limit deadline. +// +// Valid input is a number, possibly signed and possibly floating-point, +// indicating the number of seconds to wait before sending another request. +// Negative values are treated as zero. Fractional values are rounded to the +// next integer. +func parseXSRLRetryAfter(s string, now time.Time) (Deadline, error) { + // https://github.com/getsentry/relay/blob/0424a2e017d193a93918053c90cdae9472d164bf/relay-quotas/src/rate_limit.rs#L88-L96 + f, err := strconv.ParseFloat(s, 64) + if err != nil { + return Deadline{}, errInvalidXSRLRetryAfter + } + d := time.Duration(math.Ceil(math.Max(f, 0.0))) * time.Second + if d < 0 { + d = 0 + } + return Deadline(now.Add(d)), nil +} diff --git a/internal/ratelimit/rate_limits_test.go b/internal/ratelimit/rate_limits_test.go new file mode 100644 index 000000000..78cd64d2e --- /dev/null +++ b/internal/ratelimit/rate_limits_test.go @@ -0,0 +1,150 @@ +package ratelimit + +import ( + "fmt" + "testing" + "time" + + "github.com/google/go-cmp/cmp" +) + +var now = time.Date(2008, 5, 12, 16, 26, 19, 0, time.UTC) + +func TestParseXSentryRateLimits(t *testing.T) { + tests := []struct { + input string + wantLimits Map + }{ + // Empty rate limits == nothing is rate-limited + {"", Map{}}, + {",", Map{}}, + {",,,,", Map{}}, + {", , , ,", Map{}}, + {":", Map{}}, + {":::", Map{}}, + {"::,,:,", Map{}}, + {":,:;;;:", Map{}}, + + { + "1", + Map{CategoryAll: Deadline(now.Add(1 * time.Second))}, + }, + { + "2::ignored_scope:ignored_reason", + Map{CategoryAll: Deadline(now.Add(2 * time.Second))}, + }, + { + "3::ignored_scope:ignored_reason", + Map{CategoryAll: Deadline(now.Add(3 * time.Second))}, + }, + + { + "4:error", + Map{CategoryError: Deadline(now.Add(4 * time.Second))}, + }, + { + "5:error;transaction", + Map{ + CategoryError: Deadline(now.Add(5 * time.Second)), + CategoryTransaction: Deadline(now.Add(5 * time.Second)), + }, + }, + { + "6:error, 7:transaction", + Map{ + CategoryError: Deadline(now.Add(6 * time.Second)), + CategoryTransaction: Deadline(now.Add(7 * time.Second)), + }, + }, + { + // ignore unknown categories + "8:error;default;unknown", + Map{CategoryError: Deadline(now.Add(8 * time.Second))}, + }, + { + "30:error:scope1, 20:error:scope2, 40:error", + Map{CategoryError: Deadline(now.Add(40 * time.Second))}, + }, + { + "30:error:scope1, 20:error:scope2, 40::", + Map{ + CategoryAll: Deadline(now.Add(40 * time.Second)), + CategoryError: Deadline(now.Add(30 * time.Second)), + }, + }, + } + for _, tt := range tests { + tt := tt + t.Run(fmt.Sprintf("%q", tt.input), func(t *testing.T) { + got, want := parseXSentryRateLimits(tt.input, now), tt.wantLimits + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("(-want +got):\n%s", diff) + } + }) + } +} +func TestParseXSRLRetryAfterValidInput(t *testing.T) { + // https://github.com/getsentry/relay/blob/0424a2e017d193a93918053c90cdae9472d164bf/relay-quotas/src/rate_limit.rs#L88-L96 + tests := []struct { + input string + want Deadline + }{ + // Integers are the common case + {"0", Deadline(now)}, + {"1", Deadline(now.Add(1 * time.Second))}, + {"60", Deadline(now.Add(1 * time.Minute))}, + + // Any fractional increment round up to the next full second + // (replicating implementation in getsentry/relay) + {"3.1", Deadline(now.Add(4 * time.Second))}, + {"3.5", Deadline(now.Add(4 * time.Second))}, + {"3.9", Deadline(now.Add(4 * time.Second))}, + + // Overflows are treated like zero + {"100000000000000000", Deadline(now)}, + + // Negative numbers are treated like zero + {"-Inf", Deadline(now)}, + {"-0", Deadline(now)}, + {"-1", Deadline(now)}, + + // Special floats are treated like zero + {"Inf", Deadline(now)}, + {"NaN", Deadline(now)}, + } + for _, tt := range tests { + tt := tt + t.Run(fmt.Sprintf("%q", tt.input), func(t *testing.T) { + d, err := parseXSRLRetryAfter(tt.input, now) + if err != nil { + t.Fatalf("got %v, want nil", err) + } + got, want := time.Time(d), time.Time(tt.want) + if !got.Equal(want) { + t.Errorf("got %v, want %v", got, want) + } + }) + } +} + +func TestParseXSRLRetryAfterInvalidInput(t *testing.T) { + // https://github.com/getsentry/relay/blob/0424a2e017d193a93918053c90cdae9472d164bf/relay-quotas/src/rate_limit.rs#L88-L96 + tests := []struct { + input string + }{ + {""}, + {"invalid"}, + {" 2 "}, + {"6 0"}, + } + for _, tt := range tests { + tt := tt + t.Run(tt.input, func(t *testing.T) { + _, err := parseXSRLRetryAfter(tt.input, now) + if err == nil { + t.Fatalf("got %v, want nil", err) + } + t.Log(err) + }) + } +} diff --git a/internal/ratelimit/retry_after.go b/internal/ratelimit/retry_after.go new file mode 100644 index 000000000..576e29dcd --- /dev/null +++ b/internal/ratelimit/retry_after.go @@ -0,0 +1,40 @@ +package ratelimit + +import ( + "errors" + "strconv" + "time" +) + +const defaultRetryAfter = 1 * time.Minute + +var errInvalidRetryAfter = errors.New("invalid input") + +// parseRetryAfter parses a string s as in the standard Retry-After HTTP header +// and returns a deadline until when requests are rate limited and therefore new +// requests should not be sent. The input may be either a date or a non-negative +// integer number of seconds. +// +// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After +// +// parseRetryAfter always returns a usable deadline, even in case of an error. +// +// This is the original rate limiting mechanism used by Sentry, superseeded by +// the X-Sentry-Rate-Limits response header. +func parseRetryAfter(s string, now time.Time) (Deadline, error) { + if s == "" { + goto invalid + } + if n, err := strconv.Atoi(s); err == nil { + if n < 0 { + goto invalid + } + d := time.Duration(n) * time.Second + return Deadline(now.Add(d)), nil + } + if date, err := time.Parse(time.RFC1123, s); err == nil { + return Deadline(date), nil + } +invalid: + return Deadline(now.Add(defaultRetryAfter)), errInvalidRetryAfter +} diff --git a/internal/ratelimit/retry_after_test.go b/internal/ratelimit/retry_after_test.go new file mode 100644 index 000000000..82b5e5be0 --- /dev/null +++ b/internal/ratelimit/retry_after_test.go @@ -0,0 +1,58 @@ +package ratelimit + +import ( + "testing" + "time" +) + +func TestParseRetryAfter(t *testing.T) { + defaultDeadline := Deadline(now.Add(defaultRetryAfter)) + tests := map[string]struct { + input string + want Deadline + wantErr bool // if true, want is set to defaultDeadline + }{ + // Invalid input + "Empty": { + input: "", + wantErr: true, + }, + "BadString": { + input: "x", + wantErr: true, + }, + "Negative": { + input: "-1", + wantErr: true, + }, + "Float": { + input: "5.0", + wantErr: true, + }, + // Valid input + "Integer": { + input: "1337", + want: Deadline(now.Add(1337 * time.Second)), + }, + "Date": { + input: "Fri, 08 Mar 2019 11:17:09 GMT", + want: Deadline(time.Date(2019, 3, 8, 11, 17, 9, 0, time.UTC)), + }, + } + for name, tt := range tests { + tt := tt + t.Run(name, func(t *testing.T) { + got, err := parseRetryAfter(tt.input, now) + want := tt.want + if tt.wantErr { + want = defaultDeadline + if err == nil { + t.Errorf("got err = nil, want non-nil") + } + } + if !got.Equal(want) { + t.Errorf("got %v, want %v", got, want) + } + }) + } +} diff --git a/transport.go b/transport.go index d5d241bad..2d5a22af2 100644 --- a/transport.go +++ b/transport.go @@ -10,13 +10,13 @@ import ( "io/ioutil" "net/http" "net/url" - "strconv" "sync" "time" + + "github.com/getsentry/sentry-go/internal/ratelimit" ) const defaultBufferSize = 30 -const defaultRetryAfter = time.Second * 60 const defaultTimeout = time.Second * 30 // maxDrainResponseBytes is the maximum number of bytes that transport @@ -61,26 +61,6 @@ func getTLSConfig(options ClientOptions) *tls.Config { return nil } -func retryAfter(now time.Time, h http.Header) time.Duration { - // TODO(tracing): handle x-sentry-rate-limits, separate rate limiting - // per data type (error event, transaction, etc). - retryAfterHeader := h["Retry-After"] - - if retryAfterHeader == nil { - return defaultRetryAfter - } - - if date, err := time.Parse(time.RFC1123, retryAfterHeader[0]); err == nil { - return date.Sub(now) - } - - if seconds, err := strconv.Atoi(retryAfterHeader[0]); err == nil { - return time.Second * time.Duration(seconds) - } - - return defaultRetryAfter -} - func getRequestBodyFromEvent(event *Event) []byte { body, err := json.Marshal(event) if err == nil { @@ -167,17 +147,33 @@ func getRequestFromEvent(event *Event, dsn *Dsn) (*http.Request, error) { ) } +func categoryFor(eventType string) ratelimit.Category { + switch eventType { + case "": + return ratelimit.CategoryError + case transactionType: + return ratelimit.CategoryTransaction + default: + return ratelimit.Category(eventType) + } +} + // ================================ // HTTPTransport // ================================ // A batch groups items that are processed sequentially. type batch struct { - items chan *http.Request + items chan batchItem started chan struct{} // closed to signal items started to be worked on done chan struct{} // closed to signal completion of all items } +type batchItem struct { + request *http.Request + category ratelimit.Category +} + // HTTPTransport is the default, non-blocking, implementation of Transport. // // Clients using this transport will enqueue requests in a buffer and return to @@ -199,8 +195,8 @@ type HTTPTransport struct { // HTTP Client request timeout. Defaults to 30 seconds. Timeout time.Duration - mu sync.RWMutex - disabledUntil time.Time + mu sync.RWMutex + limits ratelimit.Map } // NewHTTPTransport returns a new pre-configured instance of HTTPTransport. @@ -208,6 +204,7 @@ func NewHTTPTransport() *HTTPTransport { transport := HTTPTransport{ BufferSize: defaultBufferSize, Timeout: defaultTimeout, + limits: make(ratelimit.Map), } return &transport } @@ -226,7 +223,7 @@ func (t *HTTPTransport) Configure(options ClientOptions) { // synchronized by reading from and writing to the channel. t.buffer = make(chan batch, 1) t.buffer <- batch{ - items: make(chan *http.Request, t.BufferSize), + items: make(chan batchItem, t.BufferSize), started: make(chan struct{}), done: make(chan struct{}), } @@ -259,10 +256,10 @@ func (t *HTTPTransport) SendEvent(event *Event) { if t.dsn == nil { return } - t.mu.RLock() - disabled := time.Now().Before(t.disabledUntil) - t.mu.RUnlock() - if disabled { + + category := categoryFor(event.Type) + + if t.disabled(category) { return } @@ -289,7 +286,10 @@ func (t *HTTPTransport) SendEvent(event *Event) { b := <-t.buffer select { - case b.items <- request: + case b.items <- batchItem{ + request: request, + category: category, + }: var eventType string if event.Type == transactionType { eventType = "transaction" @@ -350,7 +350,7 @@ started: close(b.items) // Start a new batch for subsequent events. t.buffer <- batch{ - items: make(chan *http.Request, t.BufferSize), + items: make(chan batchItem, t.BufferSize), started: make(chan struct{}), done: make(chan struct{}), } @@ -379,26 +379,19 @@ func (t *HTTPTransport) worker() { t.buffer <- b // Process all batch items. - for request := range b.items { - t.mu.RLock() - disabled := time.Now().Before(t.disabledUntil) - t.mu.RUnlock() - if disabled { + for item := range b.items { + if t.disabled(item.category) { continue } - response, err := t.client.Do(request) + response, err := t.client.Do(item.request) if err != nil { Logger.Printf("There was an issue with sending an event: %v", err) continue } - if response.StatusCode == http.StatusTooManyRequests { - deadline := time.Now().Add(retryAfter(time.Now(), response.Header)) - t.mu.Lock() - t.disabledUntil = deadline - t.mu.Unlock() - Logger.Printf("Too many requests, backing off till: %s\n", deadline) - } + t.mu.Lock() + t.limits.Merge(ratelimit.FromResponse(response)) + t.mu.Unlock() // Drain body up to a limit and close it, allowing the // transport to reuse TCP connections. _, _ = io.CopyN(ioutil.Discard, response.Body, maxDrainResponseBytes) @@ -410,6 +403,16 @@ func (t *HTTPTransport) worker() { } } +func (t *HTTPTransport) disabled(c ratelimit.Category) bool { + t.mu.RLock() + defer t.mu.RUnlock() + disabled := t.limits.IsRateLimited(c) + if disabled { + Logger.Printf("Too many requests for %q, backing off till: %v", c, t.limits.Deadline(c)) + } + return disabled +} + // ================================ // HTTPSyncTransport // ================================ @@ -426,10 +429,12 @@ func (t *HTTPTransport) worker() { // // For most cases, prefer HTTPTransport. type HTTPSyncTransport struct { - dsn *Dsn - client *http.Client - transport http.RoundTripper - disabledUntil time.Time + dsn *Dsn + client *http.Client + transport http.RoundTripper + + mu sync.Mutex + limits ratelimit.Map // HTTP Client request timeout. Defaults to 30 seconds. Timeout time.Duration @@ -439,6 +444,7 @@ type HTTPSyncTransport struct { func NewHTTPSyncTransport() *HTTPSyncTransport { transport := HTTPSyncTransport{ Timeout: defaultTimeout, + limits: make(ratelimit.Map), } return &transport @@ -474,7 +480,11 @@ func (t *HTTPSyncTransport) Configure(options ClientOptions) { // SendEvent assembles a new packet out of Event and sends it to remote server. func (t *HTTPSyncTransport) SendEvent(event *Event) { - if t.dsn == nil || time.Now().Before(t.disabledUntil) { + if t.dsn == nil { + return + } + + if t.disabled(categoryFor(event.Type)) { return } @@ -506,10 +516,10 @@ func (t *HTTPSyncTransport) SendEvent(event *Event) { Logger.Printf("There was an issue with sending an event: %v", err) return } - if response.StatusCode == http.StatusTooManyRequests { - t.disabledUntil = time.Now().Add(retryAfter(time.Now(), response.Header)) - Logger.Printf("Too many requests, backing off till: %s\n", t.disabledUntil) - } + t.mu.Lock() + t.limits.Merge(ratelimit.FromResponse(response)) + t.mu.Unlock() + // Drain body up to a limit and close it, allowing the // transport to reuse TCP connections. _, _ = io.CopyN(ioutil.Discard, response.Body, maxDrainResponseBytes) @@ -521,6 +531,16 @@ func (t *HTTPSyncTransport) Flush(_ time.Duration) bool { return true } +func (t *HTTPSyncTransport) disabled(c ratelimit.Category) bool { + t.mu.Lock() + defer t.mu.Unlock() + disabled := t.limits.IsRateLimited(c) + if disabled { + Logger.Printf("Too many requests for %q, backing off till: %v", c, t.limits.Deadline(c)) + } + return disabled +} + // ================================ // noopTransport // ================================ diff --git a/transport_test.go b/transport_test.go index 1844d592d..efcee7d2e 100644 --- a/transport_test.go +++ b/transport_test.go @@ -1,8 +1,10 @@ package sentry import ( + "bytes" "encoding/json" "fmt" + "io/ioutil" "net/http" "net/http/httptest" "net/http/httptrace" @@ -197,32 +199,6 @@ func TestGetRequestFromEvent(t *testing.T) { } } -func TestRetryAfterNoHeader(t *testing.T) { - assertEqual(t, retryAfter(time.Now(), nil), time.Second*60) -} - -func TestRetryAfterIncorrectHeader(t *testing.T) { - h := http.Header{ - "Retry-After": {"x"}, - } - assertEqual(t, retryAfter(time.Now(), h), time.Second*60) -} - -func TestRetryAfterDelayHeader(t *testing.T) { - h := http.Header{ - "Retry-After": {"1337"}, - } - assertEqual(t, retryAfter(time.Now(), h), time.Second*1337) -} - -func TestRetryAfterDateHeader(t *testing.T) { - now, _ := time.Parse(time.RFC1123, "Wed, 21 Oct 2015 07:28:00 GMT") - h := http.Header{ - "Retry-After": {"Wed, 21 Oct 2015 07:28:13 GMT"}, - } - assertEqual(t, retryAfter(now, h), time.Second*13) -} - // A testHTTPServer counts events sent to it. It requires a call to Unblock // before incrementing its internal counter and sending a response to the HTTP // client. This allows for coordinating the execution flow when needed. @@ -454,3 +430,83 @@ func TestKeepAlive(t *testing.T) { testKeepAlive(t, NewHTTPSyncTransport()) }) } + +func TestRateLimiting(t *testing.T) { + t.Run("AsyncTransport", func(t *testing.T) { + testRateLimiting(t, NewHTTPTransport()) + }) + t.Run("SyncTransport", func(t *testing.T) { + testRateLimiting(t, NewHTTPSyncTransport()) + }) +} + +func testRateLimiting(t *testing.T, tr Transport) { + errorEvent := &Event{} + transactionEvent := &Event{Type: transactionType} + + var errorEventCount, transactionEventCount uint64 + + writeRateLimits := func(w http.ResponseWriter, s string) { + w.Header().Add("Retry-After", "50") + w.Header().Add("X-Sentry-Rate-Limits", s) + w.WriteHeader(http.StatusTooManyRequests) + fmt.Fprint(w, `{"id":"636205708f6846c8821e6576a9d05921"}`) + } + + // Test server that simulates responses with rate limits. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + panic(err) + } + if bytes.Contains(b, []byte("transaction")) { + atomic.AddUint64(&transactionEventCount, 1) + writeRateLimits(w, "20:transaction") + } else { + atomic.AddUint64(&errorEventCount, 1) + writeRateLimits(w, "50:error") + } + })) + defer srv.Close() + + dsn := strings.Replace(srv.URL, "//", "//pubkey@", 1) + "/1" + + tr.Configure(ClientOptions{ + Dsn: dsn, + }) + + // Send several errors and transactions concurrently. + // + // Because the server always returns a rate limit for the payload type + // in the request, the expectation is that, for both errors and + // transactions, the first event is sent successfully, and then all + // others are discarded before hitting the server. + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + tr.SendEvent(errorEvent) + } + }() + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + tr.SendEvent(transactionEvent) + } + }() + wg.Wait() + + if !tr.Flush(time.Second) { + t.Fatal("Flush timed out") + } + + // Only one event of each kind should have hit the transport, all other + // events discarded because of rate limiting. + if n := atomic.LoadUint64(&errorEventCount); n != 1 { + t.Errorf("got errorEvent = %d, want %d", n, 1) + } + if n := atomic.LoadUint64(&transactionEventCount); n != 1 { + t.Errorf("got transactionEvent = %d, want %d", n, 1) + } +}