Skip to content

Commit

Permalink
💫 use StringMap util to check for disabled controllers of Eventing Ka…
Browse files Browse the repository at this point in the history
…fka (#1371)

Signed-off-by: Matthias Wessendorf <[email protected]>
  • Loading branch information
matzew authored Jan 11, 2022
1 parent 0bc14e7 commit bda392e
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 10 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.32.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.19.1
k8s.io/api v0.21.4
k8s.io/apimachinery v0.21.4
Expand Down Expand Up @@ -114,7 +115,6 @@ require (
github.com/rickb777/plural v1.2.2 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/stretchr/testify v1.7.0 // indirect
github.com/tsenart/vegeta/v12 v12.8.4 // indirect
github.com/wavesoftware/go-ensure v1.0.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
Expand Down
20 changes: 20 additions & 0 deletions knative-operator/pkg/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,26 @@ import (

var Log = logf.Log.WithName("knative").WithName("openshift")

// StringMap is a map which key and value are strings
type StringMap map[string]string

// Removes given slice from StringMap
func (m StringMap) Remove(toRemove string) StringMap {
delete(m, toRemove)
return m
}

// Gets StringMap values as comma separated string
func (m StringMap) StringValues() string {
values := make([]string, 0, len(m))

for _, v := range m {
values = append(values, v)
}

return strings.Join(values, ",")
}

// Configure is a helper to set a value for a key, potentially overriding existing contents.
func Configure(ks *operatorv1alpha1.KnativeServing, cm, key, value string) bool {
if ks.Spec.Config == nil {
Expand Down
16 changes: 16 additions & 0 deletions knative-operator/pkg/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/google/go-cmp/cmp"
"github.com/openshift-knative/serverless-operator/knative-operator/pkg/common"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -120,6 +122,20 @@ func environFromMap(envMap map[string]string) []string {
return e
}

func TestStringMap(t *testing.T) {
disabledKafkaControllers := common.StringMap{
"broker": "broker-controller,trigger-controller",
"sink": "sink-controller",
}
disabledKafkaControllers.Remove("broker")
assert.Equal(t, len(disabledKafkaControllers), 1)
assert.NotEmpty(t, disabledKafkaControllers)

disabledKafkaControllers.Remove("sink")
assert.Equal(t, len(disabledKafkaControllers), 0)
assert.Empty(t, disabledKafkaControllers)
}

func TestSetAnnotations(t *testing.T) {
cases := []struct {
name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,8 @@ func (r *ReconcileKnativeKafka) deleteKnativeKafka(instance *serverlessoperatorv
type manifestBuild int

const (
broker = "BROKER"
sink = "SINK"
manifestBuildEnabledOnly manifestBuild = iota
manifestBuildDisabledOnly
manifestBuildAll
Expand Down Expand Up @@ -501,21 +503,28 @@ func configureEventingKafka(spec serverlessoperatorv1alpha1.KnativeKafkaSpec) mf
// patch the deployment and enable the relevant controllers
if u.GetKind() == "Deployment" && u.GetName() == "kafka-controller" {

var disabledKafkaControllers = common.StringMap{
broker: "broker-controller,trigger-controller",
sink: "sink-controller",
}

var deployment = &appsv1.Deployment{}
if err := scheme.Scheme.Convert(u, deployment, nil); err != nil {
return err
}

if spec.Broker.Enabled && spec.Sink.Enabled {
// all: nothing to disable
deployment.Spec.Template.Spec.Containers[0].Args = nil
} else if spec.Broker.Enabled {
// only Broker: we disable the sink controllers
deployment.Spec.Template.Spec.Containers[0].Args = []string{"--disable-controllers=sink-controller"}
} else if spec.Sink.Enabled {
// only sink: we disable the Broker controllers
deployment.Spec.Template.Spec.Containers[0].Args = []string{"--disable-controllers=broker-controller,trigger-controller"}
if spec.Broker.Enabled {
// broker is enabled, so we remove all of its controllers from the list of disabled controllers
disabledKafkaControllers.Remove(broker)
}
if spec.Sink.Enabled {
// only sink: we remove the Sink controllers from the list of disabled controllers
disabledKafkaControllers.Remove(sink)
}

// render the actual argument
// todo: if we have no disabled controllers left we should filter for the proper argument and remove just that!
deployment.Spec.Template.Spec.Containers[0].Args = []string{"--disable-controllers=" + disabledKafkaControllers.StringValues()}

return scheme.Scheme.Convert(deployment, u, nil)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,101 @@ func TestBrokerCfg(t *testing.T) {
}
}

func TestDisabledControllers(t *testing.T) {
tests := []struct {
name string
knativeKafka v1alpha1.KnativeKafkaSpec
expect *unstructured.Unstructured
}{{
name: "just broker",
knativeKafka: v1alpha1.KnativeKafkaSpec{
Broker: v1alpha1.Broker{
Enabled: true,
},
Sink: v1alpha1.Sink{
Enabled: false,
},
},
expect: makeEventingKafkaDeployment(t, "sink-controller"),
}, {
name: "just sink",
knativeKafka: v1alpha1.KnativeKafkaSpec{
Broker: v1alpha1.Broker{
Enabled: false,
},
Sink: v1alpha1.Sink{
Enabled: true,
},
},
expect: makeEventingKafkaDeployment(t, "broker-controller,trigger-controller"),
}, {
name: "broker and sink",
knativeKafka: v1alpha1.KnativeKafkaSpec{
Broker: v1alpha1.Broker{
Enabled: true,
},
Sink: v1alpha1.Sink{
Enabled: true,
},
},
expect: makeEventingKafkaDeployment(t, ""),
}, {
name: "no broker and no sink",
knativeKafka: v1alpha1.KnativeKafkaSpec{
Broker: v1alpha1.Broker{
Enabled: false,
},
Sink: v1alpha1.Sink{
Enabled: false,
},
},
expect: makeEventingKafkaDeployment(t, "broker-controller,trigger-controller,sink-controller"),
}}

for _, test := range tests {
defaultDeployment := makeEventingKafkaDeployment(t, "")
t.Run(test.name, func(t *testing.T) {
err := configureEventingKafka(test.knativeKafka)(defaultDeployment)
if err != nil {
t.Fatalf("configureKafkaBroker: (%v)", err)
}

if !cmp.Equal(test.expect, defaultDeployment) {
t.Fatalf("Resource wasn't what we expected, diff: %s", cmp.Diff(defaultDeployment, test.expect))
}
})
}
}

func makeEventingKafkaDeployment(t *testing.T, disabledControllers string) *unstructured.Unstructured {
d := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka-controller",
},
Spec: appsv1.DeploymentSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "controller",
},
},
},
},
},
}
d.Spec.Template.Spec.Containers[0].Args = []string{"--disable-controllers=" + disabledControllers}

result := &unstructured.Unstructured{}
err := scheme.Scheme.Convert(d, result, nil)
if err != nil {
t.Fatalf("Could not create unstructured Deployment: %v, err: %v", d, err)
}

return result

}

func marshalEventingKafkaConfig(kafka EventingKafkaConfig) string {
configBytes, _ := yaml.Marshal(kafka)
return string(configBytes)
Expand Down

0 comments on commit bda392e

Please sign in to comment.