Skip to content

Commit

Permalink
Implementing Filesystem Hooks - CMK
Browse files Browse the repository at this point in the history
  • Loading branch information
alanprot committed Sep 19, 2024
1 parent 8adc244 commit 819758d
Show file tree
Hide file tree
Showing 16 changed files with 660 additions and 6 deletions.
4 changes: 4 additions & 0 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,10 @@ func (am *MultitenantAlertmanager) setConfig(cfg alertspb.AlertConfigDesc) error
var userTemplateDir = filepath.Join(am.getTenantDirectory(cfg.User), templatesDir)
var pathsToRemove = make(map[string]struct{})

if err := am.preCreationHook(cfg.User, am.getTenantDirectory(cfg.User)); err != nil {
return err
}

// List existing files to keep track the ones to be removed
if oldTemplateFiles, err := os.ReadDir(userTemplateDir); err == nil {
for _, file := range oldTemplateFiles {
Expand Down
19 changes: 19 additions & 0 deletions pkg/alertmanager/multitenant_cmk_extensions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package alertmanager

import (
"github.com/cortexproject/cortex/pkg/cmk"
logutil "github.com/cortexproject/cortex/pkg/util/log"
)

func (am *MultitenantAlertmanager) preCreationHook(userID string, udir string) error {
if cmk.Config.PreCreationHook != nil {
userLogger := logutil.WithUserID(userID, am.logger)
// We don't need to suspend as the folder will be deleted on next sync
return cmk.Config.PreCreationHook(userID, udir, userLogger,
func() {},
func(err error) error { return nil },
)
}

return nil
}
15 changes: 15 additions & 0 deletions pkg/cmk/hooks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package cmk

import (
"github.com/go-kit/log"
)

var (
Config FilesystemHooksConfig
)

type PreCreationHook func(user string, folder string, logger log.Logger, open func(), close func(err error) error) error

type FilesystemHooksConfig struct {
PreCreationHook PreCreationHook `yaml:"-"`
}
11 changes: 11 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,17 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
// out of order chunks or index file too big.
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(ulogger, bucket, c.compactorCfg.MetaSyncConcurrency)

// Run the prehook and skip this tenant if there is an error
if err := c.preCreationHook(userID, c.compactDirForUser(userID)); err != nil {
level.Error(ulogger).Log("msg", "failed to run pre hook", "err", err)
// Deleting the folder in case of errors
if err := os.RemoveAll(c.compactDirForUser(userID)); err != nil {
level.Error(ulogger).Log("msg", "failed to remove compaction work directory", "path", c.compactDirForUser(userID), "err", err)
}

return nil
}

var blockLister block.Lister
switch cortex_tsdb.BlockDiscoveryStrategy(c.storageCfg.BucketStore.BlockDiscoveryStrategy) {
case cortex_tsdb.ConcurrentDiscovery:
Expand Down
18 changes: 18 additions & 0 deletions pkg/compactor/compactor_cmk_extensions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package compactor

import (
"github.com/cortexproject/cortex/pkg/cmk"
logutil "github.com/cortexproject/cortex/pkg/util/log"
)

func (c *Compactor) preCreationHook(userID string, udir string) error {
if cmk.Config.PreCreationHook != nil {
userLogger := logutil.WithUserID(userID, c.logger)
return cmk.Config.PreCreationHook(userID, udir, userLogger,
func() {},
func(err error) error { return nil },
)
}

return nil
}
72 changes: 72 additions & 0 deletions pkg/compactor/compactor_cmk_extensions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package compactor

import (
"context"
"os"
"testing"
"time"

"github.com/go-kit/log"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/pkg/cmk"
"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util/services"
cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
)

func TestCompactor_ShouldCallHooks(t *testing.T) {
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{"user-1"}, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockIter("__markers__", nil, nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil)
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/no-compact-mark.json", "", nil)
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)

c, _, tsdbPlanner, _, _ := prepare(t, prepareConfig(), bucketClient, nil)
preHookCalled := false

cmk.Config.PreCreationHook = func(user string, folder string, _ log.Logger, _ func(), _ func(err error) error) error {
require.Equal(t, "user-1", user)
require.Equal(t, c.compactDirForUser("user-1"), folder)
preHookCalled = true
return nil
}

// Make sure the user folder is created and is being used
// This will be called during compaction
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
_, err := os.Stat(c.compactDirForUser("user-1"))
require.NoError(t, err)
}).Return([]*metadata.Meta{}, nil)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

// Wait until a run has completed.
cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} {
return prom_testutil.ToFloat64(c.compactionRunsCompleted)
})
require.True(t, preHookCalled)
_, err := os.Stat(c.compactDirForUser("user-1"))
require.True(t, os.IsNotExist(err))

// Reset hooks config
cmk.Config = cmk.FilesystemHooksConfig{}
}
38 changes: 36 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"sync"
"time"

"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/tsdb/chunks"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/status"
Expand All @@ -24,14 +27,12 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -231,6 +232,10 @@ type Ingester struct {

inflightQueryRequests atomic.Int64
maxInflightQueryRequests util_math.MaxTracker

// Suspended TSDBs
suspendedTsdbsMtx sync.RWMutex
suspendedTsdbs map[string]error
}

// Shipper interface is used to have an easy way to mock it in tests.
Expand Down Expand Up @@ -1066,7 +1071,19 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
}
}

err = i.getSuspendedTsdb(userID)

if err != nil {
return &cortexpb.WriteResponse{}, httpgrpc.Errorf(http.StatusUnprocessableEntity, wrapWithUser(err, userID).Error())
}

db, err := i.getOrCreateTSDB(userID, false)

var pErr *errPreHook
if errors.As(err, &pErr) {
return &cortexpb.WriteResponse{}, httpgrpc.Errorf(http.StatusUnprocessableEntity, wrapWithUser(pErr.cause, userID).Error())
}

if err != nil {
return nil, wrapWithUser(err, userID)
}
Expand Down Expand Up @@ -1906,6 +1923,12 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
return err
}

err = i.getSuspendedTsdb(userID)

if err != nil {
return httpgrpc.Errorf(http.StatusUnprocessableEntity, wrapWithUser(err, userID).Error())
}

from, through, matchers, err := client.FromQueryRequest(req)
if err != nil {
return err
Expand Down Expand Up @@ -2089,6 +2112,10 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error)
return db, nil
}

if err := i.preCreationHook(userID); err != nil {
return nil, err
}

i.stoppedMtx.Lock()
defer i.stoppedMtx.Unlock()

Expand Down Expand Up @@ -2304,6 +2331,13 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
startTime := time.Now()

db, err := i.createTSDB(userID)

var pErr *errPreHook
if errors.As(err, &pErr) {
level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "skipping opening user tsdb due pre hook error", "err", pErr)
continue
}

if err != nil {
level.Error(logutil.WithContext(ctx, i.logger)).Log("msg", "unable to open TSDB", "err", err, "user", userID)
return errors.Wrapf(err, "unable to open TSDB for user %s", userID)
Expand Down
121 changes: 121 additions & 0 deletions pkg/ingester/ingester_cmk_extensions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package ingester

import (
"github.com/go-kit/log/level"
"github.com/pkg/errors"

"github.com/cortexproject/cortex/pkg/cmk"
logutil "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/validation"
)

type errPreHook struct {
cause error
}

func (e *errPreHook) Error() string {
return e.cause.Error()
}

func (i *Ingester) getSuspendedTsdb(userID string) error {
i.suspendedTsdbsMtx.RLock()
defer i.suspendedTsdbsMtx.RUnlock()
return i.suspendedTsdbs[userID]
}

func (i *Ingester) resumeTsdb(userID string) bool {
// Create the database and a shipper for a user
db, err := i.createTSDB(userID)
if err != nil {
return false
}
i.stoppedMtx.Lock()
i.TSDBState.dbs[userID] = db
i.stoppedMtx.Unlock()
i.metrics.memUsers.Inc()

i.suspendedTsdbsMtx.Lock()
if i.suspendedTsdbs == nil {
i.suspendedTsdbs = map[string]error{}
}
delete(i.suspendedTsdbs, userID)
i.suspendedTsdbsMtx.Unlock()
return true
}

func (i *Ingester) preCreationHook(userID string) error {
udir := i.cfg.BlocksStorageConfig.TSDB.BlocksDir(userID)
if cmk.Config.PreCreationHook != nil {
userLogger := logutil.WithUserID(userID, i.logger)
err := cmk.Config.PreCreationHook(userID, udir, userLogger,
func() { i.resumeTsdb(userID) },
func(err error) error { return i.suspendTsdb(userID, err) },
)

if err != nil {
if e := i.suspendTsdb(userID, err); e != nil {
level.Warn(userLogger).Log("msg", "error suspending cmk workspace", "err", e)
}
return &errPreHook{cause: err}
}
}

return nil
}

func (i *Ingester) suspendTsdb(userID string, err error) error {
userLogger := logutil.WithUserID(userID, i.logger)
// Adding user on the suspended TSDB map
i.suspendedTsdbsMtx.Lock()
if i.suspendedTsdbs == nil {
i.suspendedTsdbs = map[string]error{}
}
i.suspendedTsdbs[userID] = err
i.suspendedTsdbsMtx.Unlock()

userDB := i.getTSDB(userID)

if userDB == nil || userDB.db == nil {
return nil
}
// This disables pushes and force-compactions. Not allowed to close while shipping is in progress.
if !userDB.casState(active, closing) {
return errors.New("force compaction")
}

// If TSDB is fully closed, we will set state to 'closed', which will prevent this defered closing -> active transition.
defer userDB.casState(closing, active)

// Make sure we don't ignore any possible inflight pushes.
userDB.pushesInFlight.Wait()

if err := userDB.Close(); err != nil {
level.Error(userLogger).Log("msg", "failed to close idle TSDB", "err", err)
return err
}

// This will prevent going back to "active" state in deferred statement.
userDB.casState(closing, closed)

// Only remove user from TSDBState when everything is cleaned up
// This will prevent concurrency problems when cortex are trying to open new TSDB - Ie: New request for a given tenant
// came in - while closing the tsdb for the same tenant.
// If this happens now, the request will get reject as the push will not be able to acquire the lock as the tsdb will be
// in closed state
defer func() {
i.stoppedMtx.Lock()
delete(i.TSDBState.dbs, userDB.userID)
userDB.db = nil
i.stoppedMtx.Unlock()
}()

i.metrics.memUsers.Dec()
i.TSDBState.tsdbMetrics.removeRegistryForUser(userDB.userID)

i.deleteUserMetadata(userDB.userID)
i.metrics.deletePerUserMetrics(userDB.userID)

validation.DeletePerUserValidationMetrics(i.validateMetrics, userDB.userID, i.logger)

return nil
}
17 changes: 17 additions & 0 deletions pkg/ingester/ingester_cmk_extensions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package ingester

import (
"testing"

"github.com/pkg/errors"

"github.com/stretchr/testify/require"
)

func Test_preHookError(t *testing.T) {
err := errors.New("internalError")
var pErr *errPreHook
preHookError := &errPreHook{cause: err}
require.True(t, errors.As(preHookError, &pErr))
require.Equal(t, pErr.Error(), "internalError")
}
Loading

0 comments on commit 819758d

Please sign in to comment.