Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify branch protect interface #6688

Merged
merged 6 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1821,11 +1821,7 @@ func (c *Controller) SetBranchProtectionRules(w http.ResponseWriter, r *http.Req
Value: blockedActions,
}
}
eTag := params.IfMatch
if swag.StringValue(eTag) == "" {
eTag = nil
}
err := c.Catalog.SetBranchProtectionRules(ctx, repository, rules, eTag)
err := c.Catalog.SetBranchProtectionRules(ctx, repository, rules, params.IfMatch)
if c.handleAPIError(ctx, w, r, err) {
return
}
Expand Down Expand Up @@ -3092,7 +3088,7 @@ func (c *Controller) InternalDeleteBranchProtectionRule(w http.ResponseWriter, r
for p := range rules.BranchPatternToBlockedActions {
if p == body.Pattern {
delete(rules.BranchPatternToBlockedActions, p)
err = c.Catalog.SetBranchProtectionRules(ctx, repository, rules, swag.String(graveler.BranchProtectionSkipValidationChecksum))
err = c.Catalog.SetBranchProtectionRules(ctx, repository, rules, nil)
if c.handleAPIError(ctx, w, r, err) {
return
}
Expand Down Expand Up @@ -3146,7 +3142,7 @@ func (c *Controller) InternalCreateBranchProtectionRule(w http.ResponseWriter, r
Value: []graveler.BranchProtectionBlockedAction{graveler.BranchProtectionBlockedAction_STAGING_WRITE, graveler.BranchProtectionBlockedAction_COMMIT},
}
rules.BranchPatternToBlockedActions[body.Pattern] = blockedActions
err = c.Catalog.SetBranchProtectionRules(ctx, repository, rules, swag.String(graveler.BranchProtectionSkipValidationChecksum))
err = c.Catalog.SetBranchProtectionRules(ctx, repository, rules, nil)
if c.handleAPIError(ctx, w, r, err) {
return
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/catalog/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ type Interface interface {
// The returned checksum represents the current state of the rules, and can be passed to SetBranchProtectionRules for conditional updates.
GetBranchProtectionRules(ctx context.Context, repositoryID string) (*graveler.BranchProtectionRules, *string, error)
// SetBranchProtectionRules sets the branch protection rules for the given repository.
// If lastKnownChecksum doesn't match the current state, the update will fail with ErrPreconditionFailed.
// If lastKnownChecksum is nil, the update is performed only if no rules exist.
// If lastKnownChecksum is equal to BranchProtectionSkipValidationChecksum, the update is always performed.
// If lastKnownChecksum doesn't match the current state, the update fails with ErrPreconditionFailed.
// If lastKnownChecksum is the empty string, the update is performed only if no rules exist.
// If lastKnownChecksum is nil, the update is performed unconditionally.
SetBranchProtectionRules(ctx context.Context, repositoryID string, rules *graveler.BranchProtectionRules, lastKnownChecksum *string) error

// SetLinkAddress to validate single use limited in time of a given physical address
Expand Down
34 changes: 8 additions & 26 deletions pkg/graveler/branch/protection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ package branch
import (
"context"
"errors"
"fmt"
"time"

"github.com/gobwas/glob"
"github.com/treeverse/lakefs/pkg/cache"
"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/graveler/settings"
"github.com/treeverse/lakefs/pkg/kv"
"google.golang.org/protobuf/proto"
)

const ProtectionSettingKey = "protected_branches"
Expand All @@ -22,10 +19,6 @@ const (
matcherCacheJitter = 1 * time.Minute
)

var (
ErrRuleNotExists = fmt.Errorf("branch protection rule does not exist: %w", graveler.ErrNotFound)
)

type ProtectionManager struct {
settingManager *settings.Manager
matchers cache.Cache
Expand All @@ -36,39 +29,28 @@ func NewProtectionManager(settingManager *settings.Manager) *ProtectionManager {
}

func (m *ProtectionManager) GetRules(ctx context.Context, repository *graveler.RepositoryRecord) (*graveler.BranchProtectionRules, *string, error) {
rulesMsg, checksum, err := m.settingManager.GetLatest(ctx, repository, ProtectionSettingKey, &graveler.BranchProtectionRules{})
if errors.Is(err, graveler.ErrNotFound) {
return &graveler.BranchProtectionRules{}, nil, nil
}
rulesMsg := &graveler.BranchProtectionRules{}
checksum, err := m.settingManager.GetLatest(ctx, repository, ProtectionSettingKey, rulesMsg)
if err != nil {
return nil, nil, err
}
if proto.Size(rulesMsg) == 0 {
return &graveler.BranchProtectionRules{}, nil, nil
}
return rulesMsg.(*graveler.BranchProtectionRules), checksum, nil
}
func (m *ProtectionManager) SetRules(ctx context.Context, repository *graveler.RepositoryRecord, rules *graveler.BranchProtectionRules) error {
return m.settingManager.Save(ctx, repository, ProtectionSettingKey, rules)
return rulesMsg, checksum, nil
}

func (m *ProtectionManager) SetRulesIf(ctx context.Context, repository *graveler.RepositoryRecord, rules *graveler.BranchProtectionRules, lastKnownChecksum *string) error {
err := m.settingManager.SaveIf(ctx, repository, ProtectionSettingKey, rules, lastKnownChecksum)
if errors.Is(err, kv.ErrPredicateFailed) {
return graveler.ErrPreconditionFailed
}
return err
func (m *ProtectionManager) SetRules(ctx context.Context, repository *graveler.RepositoryRecord, rules *graveler.BranchProtectionRules, lastKnownChecksum *string) error {
nopcoder marked this conversation as resolved.
Show resolved Hide resolved
return m.settingManager.Save(ctx, repository, ProtectionSettingKey, rules, lastKnownChecksum)
}

func (m *ProtectionManager) IsBlocked(ctx context.Context, repository *graveler.RepositoryRecord, branchID graveler.BranchID, action graveler.BranchProtectionBlockedAction) (bool, error) {
rules, err := m.settingManager.Get(ctx, repository, ProtectionSettingKey, &graveler.BranchProtectionRules{})
rules := &graveler.BranchProtectionRules{}
err := m.settingManager.Get(ctx, repository, ProtectionSettingKey, rules)
if errors.Is(err, graveler.ErrNotFound) {
return false, nil
}
if err != nil {
return false, err
}
for pattern, blockedActions := range rules.(*graveler.BranchProtectionRules).BranchPatternToBlockedActions {
for pattern, blockedActions := range rules.BranchPatternToBlockedActions {
pattern := pattern
matcher, err := m.matchers.GetOrSet(pattern, func() (v interface{}, err error) {
return glob.Compile(pattern)
Expand Down
6 changes: 3 additions & 3 deletions pkg/graveler/branch/protection_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestSetAndGet(t *testing.T) {
if len(rules.BranchPatternToBlockedActions) != 0 {
t.Fatalf("expected no rules, got %d rules", len(rules.BranchPatternToBlockedActions))
}
testutil.Must(t, bpm.SetRulesIf(ctx, repository, &graveler.BranchProtectionRules{
testutil.Must(t, bpm.SetRules(ctx, repository, &graveler.BranchProtectionRules{
BranchPatternToBlockedActions: map[string]*graveler.BranchProtectionBlockedActions{
"main*": {Value: []graveler.BranchProtectionBlockedAction{
graveler.BranchProtectionBlockedAction_STAGING_WRITE},
Expand All @@ -57,7 +57,7 @@ func TestSetAndGet(t *testing.T) {
func TestSetWrongETag(t *testing.T) {
ctx := context.Background()
bpm := prepareTest(t, ctx)
err := bpm.SetRulesIf(ctx, repository, &graveler.BranchProtectionRules{
err := bpm.SetRules(ctx, repository, &graveler.BranchProtectionRules{
BranchPatternToBlockedActions: map[string]*graveler.BranchProtectionBlockedActions{
"main*": {Value: []graveler.BranchProtectionBlockedAction{
graveler.BranchProtectionBlockedAction_STAGING_WRITE},
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestIsBlocked(t *testing.T) {
bpm := prepareTest(t, ctx)
testutil.Must(t, bpm.SetRules(ctx, repository, &graveler.BranchProtectionRules{
BranchPatternToBlockedActions: tst.patternToBlockedActions,
}))
}, nil))

for branchID, expectedBlockedActions := range tst.expectedBlockedActions {
for _, action := range expectedBlockedActions.Value {
Expand Down
21 changes: 7 additions & 14 deletions pkg/graveler/graveler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/go-openapi/swag"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -46,8 +45,6 @@ const (
RepoMetadataUpdateRandomFactor = 0.5
)

const BranchProtectionSkipValidationChecksum = "skip-checksum-validation"

// Basic Types

// DiffType represents the type of the change
Expand Down Expand Up @@ -618,8 +615,8 @@ type VersionController interface {

// SetBranchProtectionRules sets the branch protection rules for the repository.
// If lastKnownChecksum doesn't match the current state, the update fails with ErrPreconditionFailed.
// If lastKnownChecksum is nil, the update is performed only if no rules exist.
// If lastKnownChecksum is equal to BranchProtectionSkipValidationChecksum, the update is always performed.
// If lastKnownChecksum is the empty string, the update is performed only if no rules exist.
// If lastKnownChecksum is nil, the update is performed unconditionally.
SetBranchProtectionRules(ctx context.Context, repository *RepositoryRecord, rules *BranchProtectionRules, lastKnownChecksum *string) error

// SetLinkAddress stores the address token under the repository. The token will be valid for addressTokenTime.
Expand Down Expand Up @@ -1502,11 +1499,7 @@ func (g *Graveler) GetBranchProtectionRules(ctx context.Context, repository *Rep
}

func (g *Graveler) SetBranchProtectionRules(ctx context.Context, repository *RepositoryRecord, rules *BranchProtectionRules, lastKnownChecksum *string) error {
if swag.StringValue(lastKnownChecksum) == BranchProtectionSkipValidationChecksum {
// TODO(johnnyaug): remove this logic and constant once the legacy API is dropped.
return g.protectedBranchesManager.SetRules(ctx, repository, rules)
}
return g.protectedBranchesManager.SetRulesIf(ctx, repository, rules, lastKnownChecksum)
return g.protectedBranchesManager.SetRules(ctx, repository, rules, lastKnownChecksum)
}

// getFromStagingArea returns the most updated value of a given key in a branch staging area.
Expand Down Expand Up @@ -3243,11 +3236,11 @@ type ProtectedBranchesManager interface {
// GetRules returns all branch protection rules for the repository.
// The returned checksum represents the current state of the rules, and can be passed to SetRulesIf for conditional updates.
GetRules(ctx context.Context, repository *RepositoryRecord) (*BranchProtectionRules, *string, error)
SetRules(ctx context.Context, repository *RepositoryRecord, rules *BranchProtectionRules) error
// SetRulesIf sets the branch protection rules for the repository.
// SetRules sets the branch protection rules for the repository.
// If lastKnownChecksum does not match the current checksum, returns ErrPreconditionFailed.
// If lastKnownChecksum is nil, the rules are set only if no rules are currently set.
SetRulesIf(ctx context.Context, repository *RepositoryRecord, rules *BranchProtectionRules, lastKnownChecksum *string) error
// If lastKnownChecksum is the empty string, the rules are set only if they are not currently set.
// If lastKnownChecksum is nil, the rules are set unconditionally.
SetRules(ctx context.Context, repository *RepositoryRecord, rules *BranchProtectionRules, lastKnownChecksum *string) error
// IsBlocked returns whether the action is blocked by any branch protection rule matching the given branch.
IsBlocked(ctx context.Context, repository *RepositoryRecord, branchID BranchID, action BranchProtectionBlockedAction) (bool, error)
}
Expand Down
22 changes: 4 additions & 18 deletions pkg/graveler/mock/graveler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 47 additions & 43 deletions pkg/graveler/settings/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,44 +59,47 @@ func NewManager(refManager graveler.RefManager, store kv.Store, options ...Manag
return m
}

// Save persists the given setting under the given repository and key. Overrides settings key in KV Store
func (m *Manager) Save(ctx context.Context, repository *graveler.RepositoryRecord, key string, setting proto.Message) error {
logSetting(logging.FromContext(ctx), repository.RepositoryID, key, setting, "saving repository-level setting")
// Write key in KV Store
return kv.SetMsg(ctx, m.store, graveler.RepoPartition(repository), []byte(graveler.SettingsPath(key)), setting)
}

// SaveIf persists the given setting under the given repository and key. Overrides settings key in KV Store.
// Save persists the given setting under the given repository and key. Overrides settings key in KV Store.
// The setting is persisted only if the current version of the setting matches the given checksum.
// If lastKnownChecksum is nil, the setting is persisted only if it does not exist.
func (m *Manager) SaveIf(ctx context.Context, repository *graveler.RepositoryRecord, key string, setting proto.Message, lastKnownChecksum *string) error {
// If lastKnownChecksum is the empty string, the setting is persisted only if it does not exist.
// If lastKnownChecksum is nil, the setting is persisted unconditionally.
func (m *Manager) Save(ctx context.Context, repository *graveler.RepositoryRecord, key string, setting proto.Message, lastKnownChecksum *string) error {
logSetting(logging.FromContext(ctx), repository.RepositoryID, key, setting, "saving repository-level setting")
if lastKnownChecksum == nil {
return kv.SetMsg(ctx, m.store, graveler.RepoPartition(repository), []byte(graveler.SettingsPath(key)), setting)
}
if *lastKnownChecksum == "" {
err := kv.SetMsgIf(ctx, m.store, graveler.RepoPartition(repository), []byte(graveler.SettingsPath(key)), setting, nil)
if errors.Is(err, kv.ErrPredicateFailed) {
return graveler.ErrPreconditionFailed
}
return err
}
valueWithPredicate, err := m.store.Get(ctx, []byte(graveler.RepoPartition(repository)), []byte(graveler.SettingsPath(key)))
if err != nil && !errors.Is(err, kv.ErrNotFound) {
if errors.Is(err, kv.ErrNotFound) {
return graveler.ErrPreconditionFailed
}
if err != nil {
return err
}
var currentChecksum *string
var currentPredicate kv.Predicate
if valueWithPredicate != nil {
if valueWithPredicate.Value != nil {
currentChecksum, err = computeChecksum(valueWithPredicate.Value)
}
if err != nil {
return err
}
currentPredicate = valueWithPredicate.Predicate
currentChecksum, err := computeChecksum(valueWithPredicate.Value)
if err != nil {
return err
}
if swag.StringValue(currentChecksum) != swag.StringValue(lastKnownChecksum) {
if *currentChecksum != *lastKnownChecksum {
return graveler.ErrPreconditionFailed
}
err = kv.SetMsgIf(ctx, m.store, graveler.RepoPartition(repository), []byte(graveler.SettingsPath(key)), setting, currentPredicate)
if err != nil && errors.Is(err, kv.ErrPredicateFailed) {
err = kv.SetMsgIf(ctx, m.store, graveler.RepoPartition(repository), []byte(graveler.SettingsPath(key)), setting, valueWithPredicate.Predicate)
if errors.Is(err, kv.ErrPredicateFailed) {
return graveler.ErrPreconditionFailed
}
return err
}

func computeChecksum(value []byte) (*string, error) {
if value == nil {
return nil, graveler.ErrInvalidValue
}
h := sha256.New()
_, err := h.Write(value)
if err != nil {
Expand All @@ -105,52 +108,53 @@ func computeChecksum(value []byte) (*string, error) {
return swag.String(hex.EncodeToString(h.Sum(nil))), nil
}

// GetLatest returns the latest setting under the given repository and key, without using the cache.
// The returned checksum represents the version of the setting, and can be passed to SaveIf for conditional updates.
func (m *Manager) GetLatest(ctx context.Context, repository *graveler.RepositoryRecord, key string, settingTemplate proto.Message) (proto.Message, *string, error) {
// GetLatest loads the latest setting into dst.
// It returns a checksum representing the version of the setting, which can be passed to SaveIf for conditional updates.
// The checksum of a non-existent setting is the empty string.
func (m *Manager) GetLatest(ctx context.Context, repository *graveler.RepositoryRecord, key string, dst proto.Message) (*string, error) {
settings, err := m.store.Get(ctx, []byte(graveler.RepoPartition(repository)), []byte(graveler.SettingsPath(key)))
if err != nil {
if errors.Is(err, kv.ErrNotFound) {
err = graveler.ErrNotFound
}
return nil, nil, err
return nil, err
}
checksum, err := computeChecksum(settings.Value)
if err != nil {
return nil, nil, err
return nil, err
}
data := settingTemplate.ProtoReflect().Interface()
err = proto.Unmarshal(settings.Value, data)
err = proto.Unmarshal(settings.Value, dst)
if err != nil {
return nil, nil, err
return nil, err
}
logSetting(logging.FromContext(ctx), repository.RepositoryID, key, data, "got repository-level setting")
return data, checksum, nil
logSetting(logging.FromContext(ctx), repository.RepositoryID, key, dst, "got repository-level setting")
return checksum, nil
}

// Get fetches the setting under the given repository and key, and returns the result.
// Get fetches the setting under the given repository and key, and loads it into dst.
// The result is eventually consistent: it is not guaranteed to be the most up-to-date setting. The cache expiry period is 1 second.
// The settingTemplate parameter is used to determine the returned type.
// The returned checksum represents the version of the setting, and can be used in SaveIf to perform an atomic update.
func (m *Manager) Get(ctx context.Context, repository *graveler.RepositoryRecord, key string, settingTemplate proto.Message) (proto.Message, error) {
func (m *Manager) Get(ctx context.Context, repository *graveler.RepositoryRecord, key string, dst proto.Message) error {
k := cacheKey{
RepositoryID: repository.RepositoryID,
Key: key,
}
tmp := proto.Clone(dst)
setting, err := m.cache.GetOrSet(k, func() (v interface{}, err error) {
setting, _, err := m.GetLatest(ctx, repository, key, settingTemplate)
_, err = m.GetLatest(ctx, repository, key, tmp)
if errors.Is(err, graveler.ErrNotFound) {
// do not return this error here, so that empty settings are cached
return nil, nil
}
return setting, err
return tmp, err
})
if err != nil {
return nil, err
return err
}
if setting == nil {
return nil, graveler.ErrNotFound
return graveler.ErrNotFound
}
return setting.(proto.Message), nil
proto.Merge(dst, setting.(proto.Message))
return nil
}

func logSetting(logger logging.Logger, repositoryID graveler.RepositoryID, key string, setting proto.Message, logMsg string) {
Expand Down
Loading
Loading