Skip to content

Commit

Permalink
feat(rollingupgrade): allow concurrent broker restarts within same br…
Browse files Browse the repository at this point in the history
…oker rack (#1001)

Co-authored-by: Lucian Ilie <[email protected]>
Co-authored-by: Darren Lau <[email protected]>
  • Loading branch information
3 people authored Aug 4, 2023
1 parent eb441d8 commit 4329189
Show file tree
Hide file tree
Showing 14 changed files with 1,229 additions and 36 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,8 @@ mock-generate: bin/mockgen
-package mocks \
-destination pkg/resources/kafka/mocks/Client.go \
sigs.k8s.io/controller-runtime/pkg/client Client
$(BIN_DIR)/mockgen \
-copyright_file $(BOILERPLATE_DIR)/header.generated.txt \
-package mocks \
-destination pkg/resources/kafka/mocks/KafkaClient.go \
-source pkg/kafkaclient/client.go
8 changes: 8 additions & 0 deletions config/samples/banzaicloud_v1beta1_kafkacluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ spec:
# alerts with 'rollingupgrade'
# failureThreshold: 1

# concurrentBrokerRestartsAllowed controls how many brokers can be restarted in parallel during a rolling upgrade. If
# it is set to a value greater than 1, the operator will restart up to that amount of brokers in parallel, if the
# brokers are within the same AZ (as specified by "broker.rack" in broker read-only configs). Since using Kafka broker
# racks spreads out the replicas, we know that restarting multiple brokers in the same rack will not cause more than
# 1/Nth of the replicas of a topic-partition to be unavailable at the same time, where N is the number of racks used.
# This is a safe way to speed up the rolling upgrade.
# concurrentBrokerRestartsAllowed: 1

# brokerConfigGroups specifies multiple broker configs with unique name
brokerConfigGroups:
# Specify desired group name (eg., 'default_group')
Expand Down
2 changes: 1 addition & 1 deletion controllers/tests/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func createMinimalKafkaClusterCR(name, namespace string) *v1beta1.KafkaCluster {
CCJMXExporterConfig: "custom_property: custom_value",
},
ReadOnlyConfig: "cruise.control.metrics.topic.auto.create=true",
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1, ConcurrentBrokerRestartCountPerRack: 1},
},
}
}
Expand Down
14 changes: 7 additions & 7 deletions controllers/tests/cruisecontroloperation_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("there is an add_broker operation for execution", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock1())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock1())
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -105,7 +105,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("add_broker operation is finished with completedWithError and 30s has not elapsed", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock2())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock2())
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -132,7 +132,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("add_broker operation is finished with completedWithError and 30s has elapsed", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock5())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock5())
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -161,7 +161,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("there is an errored remove_broker and an add_broker operation", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock3())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock3())
// First operation will get completedWithError
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expand Down Expand Up @@ -208,7 +208,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("there is a new remove_broker and an errored remove_broker operation with pause annotation", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock4())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock4())
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
operation.Labels["pause"] = "true"
err := k8sClient.Create(ctx, &operation)
Expand Down Expand Up @@ -257,7 +257,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("there is a new remove_broker and an errored remove_broker operation with ignore ErrorPolicy", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock4())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock4())
// Creating first operation
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
operation.Spec.ErrorPolicy = v1alpha1.ErrorPolicyIgnore
Expand Down Expand Up @@ -307,7 +307,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("Cruise Control makes the Status operation async", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock7())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock7())
operation := generateCruiseControlOperation("add-broker-operation", namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
Expand Down
16 changes: 5 additions & 11 deletions controllers/tests/cruisecontroltask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
When("new storage is added", Serial, func() {

JustBeforeEach(func(ctx SpecContext) {
kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))
kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))

err := util.RetryOnConflict(util.DefaultBackOffForConflict, func() error {
if err := k8sClient.Get(ctx, types.NamespacedName{
Expand Down Expand Up @@ -147,7 +147,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("new storage is added but there is a not JBOD capacityConfig for that", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))
kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))
err := k8sClient.Get(ctx, types.NamespacedName{
Name: kafkaCluster.Name,
Namespace: kafkaCluster.Namespace,
Expand Down Expand Up @@ -227,7 +227,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("new storage is added and one broker is JBOD and another is not JBOD", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))
kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath}))
err := k8sClient.Get(ctx, types.NamespacedName{
Name: kafkaCluster.Name,
Namespace: kafkaCluster.Namespace,
Expand Down Expand Up @@ -312,7 +312,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("new broker is added", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask1())
kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask1())
err := k8sClient.Get(ctx, types.NamespacedName{
Name: kafkaCluster.Name,
Namespace: kafkaCluster.Namespace,
Expand Down Expand Up @@ -400,7 +400,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("a broker is removed", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask1())
kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask1())
err := util.RetryOnConflict(util.DefaultBackOffForConflict, func() error {
if err := k8sClient.Get(ctx, types.NamespacedName{
Name: kafkaCluster.Name,
Expand Down Expand Up @@ -447,12 +447,6 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
})

func NewMockScaleFactory(mock scale.CruiseControlScaler) func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) {
return func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) {
return mock, nil
}
}

func getScaleMockCCTask1() *mocks.MockCruiseControlScaler {
mockCtrl := gomock.NewController(GinkgoT())
scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl)
Expand Down
28 changes: 28 additions & 0 deletions controllers/tests/mocks/scale_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mocks

import (
"context"

"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/scale"
)

func NewMockScaleFactory(mock scale.CruiseControlScaler) func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) {
return func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) {
return mock, nil
}
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/banzaicloud/istio-client-go v0.0.17
github.com/banzaicloud/istio-operator/api/v2 v2.15.1
github.com/banzaicloud/k8s-objectmatcher v1.8.0
github.com/banzaicloud/koperator/api v0.28.6
github.com/banzaicloud/koperator/api v0.28.7
github.com/banzaicloud/koperator/properties v0.4.1
github.com/cert-manager/cert-manager v1.11.2
github.com/cisco-open/cluster-registry-controller/api v0.2.5
Expand Down Expand Up @@ -38,6 +38,7 @@ require (
require (
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/tools v0.7.0 // indirect
)

Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ github.com/banzaicloud/k8s-objectmatcher v1.8.0 h1:Nugn25elKtPMTA2br+JgHNeSQ04sc
github.com/banzaicloud/k8s-objectmatcher v1.8.0/go.mod h1:p2LSNAjlECf07fbhDyebTkPUIYnU05G+WfGgkTmgeMg=
github.com/banzaicloud/koperator/api v0.28.6 h1:ZsOAXAsg34O78qVCEHx84cdp57HlCje6zjzXHhvtXf4=
github.com/banzaicloud/koperator/api v0.28.6/go.mod h1:AGGQ+aTBklaaG8ErotNPlP/nS47MYLc/jFVW7AsDiEE=
github.com/banzaicloud/koperator/api v0.28.7 h1:G6ICLzuz6Tumcsl9ZaqZ46ccwdAc1rXjidP03v6Kqp4=
github.com/banzaicloud/koperator/api v0.28.7/go.mod h1:AGGQ+aTBklaaG8ErotNPlP/nS47MYLc/jFVW7AsDiEE=
github.com/banzaicloud/koperator/properties v0.4.1 h1:SB2QgXlcK1Dc7Z1rg65PJifErDa8OQnoWCCJgmC7SGc=
github.com/banzaicloud/koperator/properties v0.4.1/go.mod h1:TcL+llxuhW3UeQtVEDYEXGouFLF2P+LuZZVudSb6jyA=
github.com/banzaicloud/operator-tools v0.28.0 h1:GSfc0qZr6zo7WrNxdgWZE1LcTChPU8QFYOTDirYVtIM=
Expand Down Expand Up @@ -433,6 +435,7 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand Down
11 changes: 11 additions & 0 deletions pkg/kafkaclient/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package kafkaclient

import (
"github.com/stretchr/testify/mock"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/banzaicloud/koperator/api/v1beta1"
Expand Down Expand Up @@ -45,3 +46,13 @@ func NewDefaultProvider() Provider {
func (dp *defaultProvider) NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) {
return NewFromCluster(client, cluster)
}

// MockedProvider is a Testify mock for providing Kafka clients that can be mocks too
type MockedProvider struct {
mock.Mock
}

func (m *MockedProvider) NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) {
args := m.Called(client, cluster)
return args.Get(0).(KafkaClient), args.Get(1).(func()), args.Error(2)
}
Loading

0 comments on commit 4329189

Please sign in to comment.