Skip to content

Commit

Permalink
Distributor: Add checkStartedMiddleware (#9317)
Browse files Browse the repository at this point in the history
* Distributor: Add checkStartedMiddleware

Add distributor middleware that checks if the underlying distributor
service has started and if not, blocks until it has.

During normal startup this is not necessary because Mimir will not mark
itself as ready until all its services have started.

However, if a distributor pod was already up and marked ready and then
the distributor container restarted, k8s may still consider the pod
ready before realizing that it is not and the pod may still end up
receiving push requests.

In this state the push requests may end up being serviced incorrectly
because other distributor middlewares expect their underlying services
to already have started but they may not have been.

* Update CHANGELOG.md

* Use d.State(), remove d.isServiceStarted atomic bool

* Move check into limitsMiddleware, reject instead of block

* Update CHANGELOG.md

Co-authored-by: Peter Štibraný <[email protected]>

---------

Co-authored-by: Peter Štibraný <[email protected]>
  • Loading branch information
leizor and pstibrany authored Sep 19, 2024
1 parent bbd2a24 commit d882bb7
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* [CHANGE] Ruler: Removed `-ruler.drain-notification-queue-on-shutdown` option, which is now enabled by default. #9115
* [CHANGE] Querier: allow wrapping errors with context errors only when the former actually correspond to `context.Canceled` and `context.DeadlineExceeded`. #9175
* [CHANGE] Query-scheduler: Remove the experimental `-query-scheduler.use-multi-algorithm-query-queue` flag. The new multi-algorithm tree queue is always used for the scheduler. #9210
* [CHANGE] Distributor: reject incoming requests until the distributor service has started. #9317
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280
Expand Down
9 changes: 8 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ func (d *Distributor) wrapPushWithMiddlewares(next PushFunc) PushFunc {
// The middlewares will be applied to the request (!) in the specified order, from first to last.
// To guarantee that, middleware functions will be called in reversed order, wrapping the
// result from previous call.
middlewares = append(middlewares, d.limitsMiddleware) // should run first because it checks limits before other middlewares need to read the request body
middlewares = append(middlewares, d.limitsMiddleware) // Should run first because it checks limits before other middlewares need to read the request body.
middlewares = append(middlewares, d.metricsMiddleware)
middlewares = append(middlewares, d.prePushHaDedupeMiddleware)
middlewares = append(middlewares, d.prePushRelabelMiddleware)
Expand Down Expand Up @@ -1293,6 +1293,13 @@ func (d *Distributor) cleanupAfterPushFinished(rs *requestState) {
// limitsMiddleware checks for instance limits and rejects request if this instance cannot process it at the moment.
func (d *Distributor) limitsMiddleware(next PushFunc) PushFunc {
return func(ctx context.Context, pushReq *Request) error {
// Make sure the distributor service and all its dependent services have been
// started. The following checks in this middleware depend on the runtime config
// service having been started.
if s := d.State(); s != services.Running {
return newUnavailableError(s)
}

// We don't know request size yet, will check it later.
ctx, rs, err := d.startPushRequest(ctx, -1)
if err != nil {
Expand Down
76 changes: 76 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7903,3 +7903,79 @@ func clonePreallocTimeseries(orig mimirpb.PreallocTimeseries) (mimirpb.PreallocT

return mimirpb.PreallocTimeseries{TimeSeries: clonedSeries}, nil
}

func TestCheckStartedMiddleware(t *testing.T) {
// Create an in-memory KV store for the ring with 1 ingester registered.
kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { require.NoError(t, closer.Close()) })

err := kvStore.CAS(context.Background(), ingester.IngesterRingKey,
func(_ interface{}) (interface{}, bool, error) {
d := &ring.Desc{}
d.AddIngester("ingester-1", "127.0.0.1", "", ring.NewRandomTokenGenerator().GenerateTokens(128, nil), ring.ACTIVE, time.Now(), false, time.Time{})
return d, true, nil
},
)
require.NoError(t, err)

ingestersRing, err := ring.New(ring.Config{
KVStore: kv.Config{Mock: kvStore},
HeartbeatTimeout: 60 * time.Minute,
ReplicationFactor: 1,
}, ingester.IngesterRingKey, ingester.IngesterRingKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ingestersRing))
})

test.Poll(t, time.Second, 1, func() interface{} {
return ingestersRing.InstancesCount()
})

var distributorConfig Config
var clientConfig client.Config
limits := validation.Limits{}
flagext.DefaultValues(&distributorConfig, &clientConfig, &limits)
distributorConfig.DistributorRing.Common.KVStore.Store = "inmemory"

limits.IngestionRate = float64(rate.Inf) // Unlimited.

distributorConfig.IngesterClientFactory = ring_client.PoolInstFunc(func(ring.InstanceDesc) (ring_client.PoolClient, error) {
return &noopIngester{}, nil
})

overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

distributor, err := New(distributorConfig, clientConfig, overrides, nil, ingestersRing, nil, true, nil, log.NewNopLogger())
require.NoError(t, err)

ctx := user.InjectOrgID(context.Background(), "user")
ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel()

_, err = distributor.Push(ctx, mimirpb.ToWriteRequest(
[][]mimirpb.LabelAdapter{
{
{
Name: "__name__",
Value: "foobar",
},
},
},
[]mimirpb.Sample{
{
TimestampMs: 1000,
Value: 100,
},
},
nil,
nil,
mimirpb.API,
))

// We expect the push request to be rejected with an unavailable error.
require.NotNil(t, err)
require.ErrorContains(t, err, "rpc error: code = Internal desc = distributor is unavailable (current state: New)")
}
19 changes: 19 additions & 0 deletions pkg/distributor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/gogo/status"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"

Expand Down Expand Up @@ -358,3 +359,21 @@ func isIngestionClientError(err error) bool {

return false
}

type unavailableError struct {
state services.State
}

var _ Error = unavailableError{}

func newUnavailableError(state services.State) unavailableError {
return unavailableError{state: state}
}

func (e unavailableError) Error() string {
return fmt.Sprintf("distributor is unavailable (current state: %s)", e.state.String())
}

func (e unavailableError) Cause() mimirpb.ErrorCause {
return mimirpb.SERVICE_UNAVAILABLE
}

0 comments on commit d882bb7

Please sign in to comment.