Skip to content

Commit

Permalink
[AlertManager] Handling CMK AccessDenied errors (cortexproject#5542)
Browse files Browse the repository at this point in the history
* Handling access denied errors

Signed-off-by: Alan Protasio <[email protected]>

* changelog

Signed-off-by: Alan Protasio <[email protected]>

* lint

Signed-off-by: Alan Protasio <[email protected]>

---------

Signed-off-by: Alan Protasio <[email protected]>
  • Loading branch information
alanprot authored Sep 5, 2023
1 parent 63b372c commit 7b9db50
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
* [ENHANCEMENT] Compactor: allow unregisteronshutdown to be configurable. #5503
* [ENHANCEMENT] Store Gateway: add metric `cortex_bucket_store_chunk_refetches_total` for number of chunk refetches. #5532
* [ENHANCEMENT] BasicLifeCycler: allow final-sleep during shutdown #5517
* [ENHANCEMENT] All: Handling CMK Access Denied errors. #5420 #5542
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
3 changes: 2 additions & 1 deletion pkg/alertmanager/alertspb/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package alertspb
import "errors"

var (
ErrNotFound = errors.New("alertmanager storage object not found")
ErrNotFound = errors.New("alertmanager storage object not found")
ErrAccessDenied = errors.New("alertmanager storage object access denied")
)

// ToProto transforms a yaml Alertmanager config and map of template files to an AlertConfigDesc
Expand Down
30 changes: 21 additions & 9 deletions pkg/alertmanager/alertstore/bucketclient/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/pkg/errors"
"github.com/thanos-io/objstore"

"github.com/cortexproject/cortex/pkg/storage/tsdb"

"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/util/concurrency"
Expand Down Expand Up @@ -76,8 +78,8 @@ func (s *BucketAlertStore) GetAlertConfigs(ctx context.Context, userIDs []string
err := concurrency.ForEach(ctx, concurrency.CreateJobsFromStrings(userIDs), fetchConcurrency, func(ctx context.Context, job interface{}) error {
userID := job.(string)

cfg, err := s.getAlertConfig(ctx, userID)
if s.alertsBucket.IsObjNotFoundErr(err) {
cfg, uBucket, err := s.getAlertConfig(ctx, userID)
if uBucket.IsObjNotFoundErr(err) || uBucket.IsAccessDeniedErr(err) {
return nil
} else if err != nil {
return errors.Wrapf(err, "failed to fetch alertmanager config for user %s", userID)
Expand All @@ -95,11 +97,15 @@ func (s *BucketAlertStore) GetAlertConfigs(ctx context.Context, userIDs []string

// GetAlertConfig implements alertstore.AlertStore.
func (s *BucketAlertStore) GetAlertConfig(ctx context.Context, userID string) (alertspb.AlertConfigDesc, error) {
cfg, err := s.getAlertConfig(ctx, userID)
if s.alertsBucket.IsObjNotFoundErr(err) {
cfg, uBucket, err := s.getAlertConfig(ctx, userID)
if uBucket.IsObjNotFoundErr(err) {
return cfg, alertspb.ErrNotFound
}

if uBucket.IsAccessDeniedErr(err) {
return cfg, alertspb.ErrAccessDenied
}

return cfg, err
}

Expand Down Expand Up @@ -142,10 +148,14 @@ func (s *BucketAlertStore) GetFullState(ctx context.Context, userID string) (ale
fs := alertspb.FullStateDesc{}

err := s.get(ctx, bkt, fullStateName, &fs)
if s.amBucket.IsObjNotFoundErr(err) {
if bkt.IsObjNotFoundErr(err) {
return fs, alertspb.ErrNotFound
}

if bkt.IsAccessDeniedErr(err) {
return fs, alertspb.ErrAccessDenied
}

return fs, err
}

Expand All @@ -172,10 +182,11 @@ func (s *BucketAlertStore) DeleteFullState(ctx context.Context, userID string) e
return err
}

func (s *BucketAlertStore) getAlertConfig(ctx context.Context, userID string) (alertspb.AlertConfigDesc, error) {
func (s *BucketAlertStore) getAlertConfig(ctx context.Context, userID string) (alertspb.AlertConfigDesc, objstore.Bucket, error) {
config := alertspb.AlertConfigDesc{}
err := s.get(ctx, s.getUserBucket(userID), userID, &config)
return config, err
userBkt := s.getUserBucket(userID)
err := s.get(ctx, userBkt, userID, &config)
return config, userBkt, err
}

func (s *BucketAlertStore) get(ctx context.Context, bkt objstore.Bucket, name string, msg proto.Message) error {
Expand Down Expand Up @@ -205,5 +216,6 @@ func (s *BucketAlertStore) getUserBucket(userID string) objstore.Bucket {
}

func (s *BucketAlertStore) getAlertmanagerUserBucket(userID string) objstore.Bucket {
return bucket.NewUserBucketClient(userID, s.amBucket, s.cfgProvider).WithExpectedErrs(s.amBucket.IsObjNotFoundErr)
uBucket := bucket.NewUserBucketClient(userID, s.amBucket, s.cfgProvider)
return uBucket.WithExpectedErrs(tsdb.IsOneOfTheExpectedErrors(uBucket.IsAccessDeniedErr, uBucket.IsObjNotFoundErr))
}
71 changes: 61 additions & 10 deletions pkg/alertmanager/alertstore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package alertstore

import (
"context"
"fmt"
"io"
"testing"

"github.com/go-kit/log"
Expand All @@ -14,8 +16,12 @@ import (
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore/bucketclient"
)

var (
errAccessDenied = fmt.Errorf("access denied")
)

func TestAlertStore_ListAllUsers(t *testing.T) {
runForEachAlertStore(t, func(t *testing.T, store AlertStore, client interface{}) {
runForEachAlertStore(t, func(t *testing.T, store AlertStore, m *mockBucket, client interface{}) {
ctx := context.Background()
user1Cfg := alertspb.AlertConfigDesc{User: "user-1", RawConfig: "content-1"}
user2Cfg := alertspb.AlertConfigDesc{User: "user-2", RawConfig: "content-2"}
Expand All @@ -40,7 +46,7 @@ func TestAlertStore_ListAllUsers(t *testing.T) {
}

func TestAlertStore_SetAndGetAlertConfig(t *testing.T) {
runForEachAlertStore(t, func(t *testing.T, store AlertStore, client interface{}) {
runForEachAlertStore(t, func(t *testing.T, store AlertStore, m *mockBucket, client interface{}) {
ctx := context.Background()
user1Cfg := alertspb.AlertConfigDesc{User: "user-1", RawConfig: "content-1"}
user2Cfg := alertspb.AlertConfigDesc{User: "user-2", RawConfig: "content-2"}
Expand Down Expand Up @@ -78,7 +84,7 @@ func TestAlertStore_SetAndGetAlertConfig(t *testing.T) {
}

func TestStore_GetAlertConfigs(t *testing.T) {
runForEachAlertStore(t, func(t *testing.T, store AlertStore, client interface{}) {
runForEachAlertStore(t, func(t *testing.T, store AlertStore, m *mockBucket, client interface{}) {
ctx := context.Background()
user1Cfg := alertspb.AlertConfigDesc{User: "user-1", RawConfig: "content-1"}
user2Cfg := alertspb.AlertConfigDesc{User: "user-2", RawConfig: "content-2"}
Expand All @@ -90,6 +96,15 @@ func TestStore_GetAlertConfigs(t *testing.T) {
assert.Empty(t, configs)
}

// Treat Access denied as empty.
{
m.err = errAccessDenied
configs, err := store.GetAlertConfigs(ctx, []string{"user-1", "user-2"})
require.NoError(t, err)
assert.Empty(t, configs)
m.err = nil
}

// The storage contains some configs.
{
require.NoError(t, store.SetAlertConfig(ctx, user1Cfg))
Expand All @@ -114,7 +129,7 @@ func TestStore_GetAlertConfigs(t *testing.T) {
}

func TestAlertStore_DeleteAlertConfig(t *testing.T) {
runForEachAlertStore(t, func(t *testing.T, store AlertStore, client interface{}) {
runForEachAlertStore(t, func(t *testing.T, store AlertStore, m *mockBucket, client interface{}) {
ctx := context.Background()
user1Cfg := alertspb.AlertConfigDesc{User: "user-1", RawConfig: "content-1"}
user2Cfg := alertspb.AlertConfigDesc{User: "user-2", RawConfig: "content-2"}
Expand All @@ -132,6 +147,12 @@ func TestAlertStore_DeleteAlertConfig(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, user2Cfg, config)

// Ensure we capture access denied error
m.err = errAccessDenied
_, err = store.GetAlertConfig(ctx, "user-1")
assert.Equal(t, alertspb.ErrAccessDenied, err)
m.err = nil

// Delete the config for user-1.
require.NoError(t, store.DeleteAlertConfig(ctx, "user-1"))

Expand All @@ -148,26 +169,27 @@ func TestAlertStore_DeleteAlertConfig(t *testing.T) {
})
}

func runForEachAlertStore(t *testing.T, testFn func(t *testing.T, store AlertStore, client interface{})) {
func runForEachAlertStore(t *testing.T, testFn func(t *testing.T, store AlertStore, b *mockBucket, client interface{})) {
bucketClient := objstore.NewInMemBucket()
bucketStore := bucketclient.NewBucketAlertStore(bucketClient, nil, log.NewNopLogger())
mBucketClient := &mockBucket{Bucket: bucketClient}
bucketStore := bucketclient.NewBucketAlertStore(mBucketClient, nil, log.NewNopLogger())

stores := map[string]struct {
store AlertStore
client interface{}
}{
"bucket": {store: bucketStore, client: bucketClient},
"bucket": {store: bucketStore, client: mBucketClient},
}

for name, data := range stores {
t.Run(name, func(t *testing.T) {
testFn(t, data.store, data.client)
testFn(t, data.store, mBucketClient, data.client)
})
}
}

func objectExists(bucketClient interface{}, key string) (bool, error) {
if typed, ok := bucketClient.(*objstore.InMemBucket); ok {
if typed, ok := bucketClient.(objstore.Bucket); ok {
return typed.Exists(context.Background(), key)
}

Expand All @@ -189,7 +211,8 @@ func makeTestFullState(content string) alertspb.FullStateDesc {

func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) {
bucket := objstore.NewInMemBucket()
store := bucketclient.NewBucketAlertStore(bucket, nil, log.NewNopLogger())
mBucketClient := &mockBucket{Bucket: bucket}
store := bucketclient.NewBucketAlertStore(mBucketClient, nil, log.NewNopLogger())
ctx := context.Background()

state1 := makeTestFullState("one")
Expand All @@ -208,6 +231,18 @@ func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) {
assert.ElementsMatch(t, []string{}, users)
}

// Test Access Denied
{
mBucketClient.err = errAccessDenied
_, err := store.GetFullState(ctx, "user-1")
assert.Equal(t, alertspb.ErrAccessDenied, err)

users, err := store.ListUsersWithFullState(ctx)
assert.NoError(t, err)
assert.ElementsMatch(t, []string{}, users)
mBucketClient.err = nil
}

// The storage contains users.
{
require.NoError(t, store.SetFullState(ctx, "user-1", state1))
Expand Down Expand Up @@ -256,3 +291,19 @@ func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) {
require.NoError(t, store.DeleteFullState(ctx, "user-1"))
}
}

type mockBucket struct {
objstore.Bucket
err error
}

func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
if m.err != nil {
return nil, m.err
}
return m.Bucket.Get(ctx, name)
}

func (m *mockBucket) IsAccessDeniedErr(err error) bool {
return err == errAccessDenied
}
9 changes: 6 additions & 3 deletions pkg/alertmanager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,12 @@ func (am *MultitenantAlertmanager) GetUserConfig(w http.ResponseWriter, r *http.

cfg, err := am.store.GetAlertConfig(r.Context(), userID)
if err != nil {
if err == alertspb.ErrNotFound {
switch {
case err == alertspb.ErrNotFound:
http.Error(w, err.Error(), http.StatusNotFound)
} else {
case err == alertspb.ErrAccessDenied:
http.Error(w, err.Error(), http.StatusForbidden)
default:
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
Expand Down Expand Up @@ -285,7 +288,7 @@ func (am *MultitenantAlertmanager) ListAllConfigs(w http.ResponseWriter, r *http

err = concurrency.ForEachUser(r.Context(), userIDs, fetchConcurrency, func(ctx context.Context, userID string) error {
cfg, err := am.store.GetAlertConfig(ctx, userID)
if errors.Is(err, alertspb.ErrNotFound) {
if errors.Is(err, alertspb.ErrNotFound) || errors.Is(err, alertspb.ErrAccessDenied) {
return nil
} else if err != nil {
return errors.Wrapf(err, "failed to fetch alertmanager config for user %s", userID)
Expand Down
6 changes: 6 additions & 0 deletions pkg/alertmanager/state_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
syncFromReplica = "from-replica"
syncFromStorage = "from-storage"
syncUserNotFound = "user-not-found"
syncAccessDenied = "user-access-denied"
syncFailed = "failed"
)

Expand Down Expand Up @@ -232,6 +233,11 @@ func (s *state) starting(ctx context.Context) error {
s.initialSyncCompleted.WithLabelValues(syncUserNotFound).Inc()
return nil
}
if errors.Is(err, alertspb.ErrAccessDenied) {
level.Info(s.logger).Log("msg", "access deinied when trying to access user storage; proceeding", "user", s.userID)
s.initialSyncCompleted.WithLabelValues(syncAccessDenied).Inc()
return nil
}
if err == nil {
if err = s.mergeFullStates([]*clusterpb.FullState{fullState.State}); err == nil {
level.Info(s.logger).Log("msg", "state read from storage; proceeding")
Expand Down

0 comments on commit 7b9db50

Please sign in to comment.