Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More robust system Tier creation / update #6696

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,12 @@ func installHandlers(c *ExtraConfig, s *genericapiserver.GenericAPIServer) {

// Install a post start hook to initialize Tiers on start-up
s.AddPostStartHook("initialize-tiers", func(context genericapiserver.PostStartHookContext) error {
go c.networkPolicyController.InitializeTiers()
go func() {
// context gets cancelled when the server stops.
if err := c.networkPolicyController.InitializeTiers(context); err != nil {
klog.ErrorS(err, "Failed to initialize system Tiers")
}
}()
return nil
})
}
Expand Down
140 changes: 74 additions & 66 deletions pkg/controller/networkpolicy/tier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package networkpolicy

import (
"context"
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

secv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1"
Expand Down Expand Up @@ -118,83 +120,89 @@ var (
// will first attempt to retrieve the Tier by it's name from K8s and if missing,
// create the CR. InitializeTiers will be called as part of a Post-Start hook
// of antrea-controller's APIServer.
func (n *NetworkPolicyController) InitializeTiers() {
func (n *NetworkPolicyController) InitializeTiers(ctx context.Context) error {
if !cache.WaitForCacheSync(ctx.Done(), n.tierListerSynced) {
// This happens when Done is closed because we are shutting down.
return fmt.Errorf("caches not synced for system Tier initialization")
}
for _, t := range systemGeneratedTiers {
// Check if Tier is already present.
oldTier, err := n.tierLister.Get(t.Name)
if err == nil {
// Tier is already present.
klog.V(2).Infof("%s Tier already created", t.Name)
// Update Tier Priority if it is not set to desired Priority.
expPrio := priorityMap[t.Name]
if oldTier.Spec.Priority != expPrio {
tToUpdate := oldTier.DeepCopy()
tToUpdate.Spec.Priority = expPrio
n.updateTier(tToUpdate)
}
continue
if err := n.initializeTier(ctx, t); err != nil {
return err
}
n.initTier(t)
}
return nil
}

// initTier attempts to create system Tiers until they are created using an
// exponential backoff period from 1 to max of 8secs.
func (n *NetworkPolicyController) initTier(t *secv1beta1.Tier) {
var err error
const maxBackoffTime = 8 * time.Second
backoff := 1 * time.Second
func (n *NetworkPolicyController) initializeTier(ctx context.Context, t *secv1beta1.Tier) error {
// Tier creation or update may fail because antrea APIService is not yet ready to accept
// requests for validation. We will keep retrying until it succeeds, using an exponential
// backoff (not exceeding 8s), unless the context is cancelled.
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 2.0,
Jitter: 0.0,
Steps: 3, // max duration of 8s
}
retryAttempt := 1
for {
klog.V(2).InfoS("Creating system Tier", "tier", t.Name)
_, err = n.crdClient.CrdV1beta1().Tiers().Create(context.TODO(), t, metav1.CreateOptions{})
// Attempt to recreate Tier after a backoff only if it does not exist.
if err != nil {
if errors.IsAlreadyExists(err) {
klog.InfoS("System Tier already exists", "tier", t.Name)
return
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if success := func() bool {
// Check if Tier is already present.
if oldTier, err := n.tierLister.Get(t.Name); err == nil {
// Tier is already present.
klog.V(2).InfoS("Tier already exists", "tier", klog.KObj(t))
// Update Tier Priority if it is not set to desired Priority.
expPrio := t.Spec.Priority
if oldTier.Spec.Priority == expPrio {
return true
}
tToUpdate := oldTier.DeepCopy()
tToUpdate.Spec.Priority = expPrio
if err := n.updateTier(ctx, tToUpdate); err != nil {
klog.InfoS("Failed to update system Tier on init, will retry", "tier", klog.KObj(t), "attempts", retryAttempt, "err", err)
return false
}
return true
}
klog.InfoS("Failed to create system Tier on init, will retry", "tier", t.Name, "attempts", retryAttempt, "err", err)
// Tier creation may fail because antrea APIService is not yet ready
// to accept requests for validation. Retry fixed number of times
// not exceeding 8s.
time.Sleep(backoff)
backoff *= 2
if backoff > maxBackoffTime {
backoff = maxBackoffTime
if err := n.createTier(ctx, t); err != nil {
// Error may be that the Tier already exists, in this case, we will
// call tierLister.Get again and compare priorities.
klog.InfoS("Failed to create system Tier on init, will retry", "tier", klog.KObj(t), "attempts", retryAttempt, "err", err)
return false
}
retryAttempt += 1
continue
return true
}(); success {
break
}
retryAttempt += 1
waitBeforeRetry := backoff.Step()
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(waitBeforeRetry):
}
klog.InfoS("Created system Tier", "tier", t.Name)
return
}
return nil
}

// updateTier attempts to update Tiers using an
// exponential backoff period from 1 to max of 8secs.
func (n *NetworkPolicyController) updateTier(t *secv1beta1.Tier) {
var err error
const maxBackoffTime = 8 * time.Second
backoff := 1 * time.Second
retryAttempt := 1
for {
klog.V(2).Infof("Updating %s Tier", t.Name)
_, err = n.crdClient.CrdV1beta1().Tiers().Update(context.TODO(), t, metav1.UpdateOptions{})
// Attempt to update Tier after a backoff.
if err != nil {
klog.Warningf("Failed to update %s Tier on init: %v. Retry attempt: %d", t.Name, err, retryAttempt)
// Tier update may fail because antrea APIService is not yet ready
// to accept requests for validation. Retry fixed number of times
// not exceeding 8s.
time.Sleep(backoff)
backoff *= 2
if backoff > maxBackoffTime {
backoff = maxBackoffTime
}
retryAttempt += 1
continue
}
return
func (n *NetworkPolicyController) createTier(ctx context.Context, t *secv1beta1.Tier) error {
klog.V(2).InfoS("Creating system Tier", "tier", klog.KObj(t))
if _, err := n.crdClient.CrdV1beta1().Tiers().Create(ctx, t, metav1.CreateOptions{}); err != nil {
return err
}
klog.InfoS("Created system Tier", "tier", klog.KObj(t))
return nil
}

func (n *NetworkPolicyController) updateTier(ctx context.Context, t *secv1beta1.Tier) error {
klog.V(2).InfoS("Updating system Tier", "tier", klog.KObj(t))
if _, err := n.crdClient.CrdV1beta1().Tiers().Update(ctx, t, metav1.UpdateOptions{}); err != nil {
return err
}
klog.InfoS("Updated system Tier", "tier", klog.KObj(t))
return nil
}
126 changes: 96 additions & 30 deletions pkg/controller/networkpolicy/tier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
package networkpolicy

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -27,61 +30,124 @@ import (
"antrea.io/antrea/pkg/client/clientset/versioned/fake"
)

func TestInitTier(t *testing.T) {
testTier := &secv1beta1.Tier{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: secv1beta1.TierSpec{
Priority: 10,
},
func TestInitializeTier(t *testing.T) {
makeTestTier := func(priority int32) *secv1beta1.Tier {
return &secv1beta1.Tier{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: secv1beta1.TierSpec{
Priority: priority,
},
}
}
testTier := makeTestTier(10)

tests := []struct {
name string
reactor k8stesting.ReactionFunc
expectedCalled int
name string
createReactor k8stesting.ReactionFunc
updateReactor k8stesting.ReactionFunc
existingTier *secv1beta1.Tier
createExpectedCalls int
updateExpectedCalls int
}{
{
name: "create successfully",
expectedCalled: 1,
name: "create successful",
createExpectedCalls: 1,
},
{
name: "tier already exists",
reactor: func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewAlreadyExists(action.GetResource().GroupResource(), testTier.Name)
},
expectedCalled: 1,
name: "create error",
createReactor: func() k8stesting.ReactionFunc {
curFailureCount := 0
return func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
if curFailureCount < 1 {
curFailureCount += 1
return true, nil, errors.NewServiceUnavailable("unknown reason")
}
return false, nil, nil
}
}(),
createExpectedCalls: 2,
},
{
name: "transient error",
reactor: func() k8stesting.ReactionFunc {
name: "update successful",
existingTier: makeTestTier(5),
updateExpectedCalls: 1,
},
{
name: "update error",
updateReactor: func() k8stesting.ReactionFunc {
curFailureCount := 0
maxFailureCount := 1
return func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
if curFailureCount < maxFailureCount {
if curFailureCount < 1 {
curFailureCount += 1
return true, nil, errors.NewServiceUnavailable("unknown reason")
}
return false, nil, nil
}
}(),
expectedCalled: 2,
existingTier: makeTestTier(5),
updateExpectedCalls: 2,
},
{
name: "no change needed",
existingTier: makeTestTier(10),
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
_, c := newController(nil, nil)
if tc.reactor != nil {
c.crdClient.(*fake.Clientset).PrependReactor("create", "tiers", tc.reactor)
ctx := context.Background()
crdObjects := []runtime.Object{}
if tc.existingTier != nil {
crdObjects = append(crdObjects, tc.existingTier)
}
_, c := newController(nil, crdObjects)
stopCh := make(chan struct{})
defer close(stopCh)
c.crdInformerFactory.Start(stopCh)
c.crdInformerFactory.WaitForCacheSync(stopCh)

if tc.createReactor != nil {
c.crdClient.(*fake.Clientset).PrependReactor("create", "tiers", tc.createReactor)
}
if tc.updateReactor != nil {
c.crdClient.(*fake.Clientset).PrependReactor("update", "tiers", tc.updateReactor)
}
createCalled := 0
createCalls := 0
c.crdClient.(*fake.Clientset).PrependReactor("create", "tiers", func(action k8stesting.Action) (bool, runtime.Object, error) {
createCalled += 1
createCalls += 1
return false, nil, nil
})
c.initTier(testTier)
assert.Equal(t, tc.expectedCalled, createCalled)
updateCalls := 0
c.crdClient.(*fake.Clientset).PrependReactor("update", "tiers", func(action k8stesting.Action) (bool, runtime.Object, error) {
updateCalls += 1
return false, nil, nil
})
// Prevent test from hanging in case of issue.
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
require.NoError(t, c.initializeTier(ctx, testTier))
assert.Equal(t, tc.createExpectedCalls, createCalls)
assert.Equal(t, tc.updateExpectedCalls, updateCalls)
})
}

}

func TestInitializeTiers(t *testing.T) {
ctx := context.Background()

_, c := newController(nil, nil)
stopCh := make(chan struct{})
defer close(stopCh)
c.crdInformerFactory.Start(stopCh)
c.crdInformerFactory.WaitForCacheSync(stopCh)

// All system Tiers should be created on the first try, so we can use a small timeout.
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
require.NoError(t, c.InitializeTiers(ctx))
tiers, err := c.crdClient.CrdV1beta1().Tiers().List(ctx, metav1.ListOptions{})
require.NoError(t, err)
assert.Len(t, tiers.Items, len(systemGeneratedTiers))
}
Loading