From 5539ad9cc241ac1db4544ae8f9a4d07079efc090 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Tue, 21 Apr 2020 17:12:13 -0400 Subject: [PATCH 1/5] kafkasource: change bootstrapServers and Topics to accept arrays Each item in the can be comma separated --- kafka/source/config/300-kafkasource.yaml | 4 ++-- .../apis/bindings/v1alpha1/kafka_lifecycle.go | 6 ++++-- .../pkg/apis/bindings/v1alpha1/kafka_types.go | 2 +- .../bindings/v1alpha1/zz_generated.deepcopy.go | 5 +++++ .../pkg/apis/sources/v1alpha1/kafka_types.go | 2 +- .../sources/v1alpha1/zz_generated.deepcopy.go | 5 +++++ .../source/pkg/reconciler/source/kafkasource.go | 16 +++++++++------- .../source/resources/receive_adapter.go | 5 +++-- 8 files changed, 30 insertions(+), 15 deletions(-) diff --git a/kafka/source/config/300-kafkasource.yaml b/kafka/source/config/300-kafkasource.yaml index b2465583bb..71264c7879 100644 --- a/kafka/source/config/300-kafkasource.yaml +++ b/kafka/source/config/300-kafkasource.yaml @@ -61,10 +61,10 @@ spec: spec: properties: bootstrapServers: - type: string + type: array minLength: 1 topics: - type: string + type: array minLength: 1 consumerGroup: type: string diff --git a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle.go b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle.go index 349bab5a18..c73fb0cce9 100644 --- a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle.go +++ b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle.go @@ -18,6 +18,7 @@ package v1alpha1 import ( "context" + "strings" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -80,7 +81,7 @@ func (kfb *KafkaBinding) Do(ctx context.Context, ps *duckv1.WithPod) { for i := range spec.InitContainers { spec.InitContainers[i].Env = append(spec.InitContainers[i].Env, corev1.EnvVar{ Name: "KAFKA_BOOTSTRAP_SERVERS", - Value: kfb.Spec.BootstrapServers, + Value: strings.Join(kfb.Spec.BootstrapServers, ","), }) if kfb.Spec.Net.SASL.Enable { spec.InitContainers[i].Env = append(spec.InitContainers[i].Env, corev1.EnvVar{ @@ -124,8 +125,9 @@ func (kfb *KafkaBinding) Do(ctx context.Context, ps *duckv1.WithPod) { for i := range spec.Containers { spec.Containers[i].Env = append(spec.Containers[i].Env, corev1.EnvVar{ Name: "KAFKA_BOOTSTRAP_SERVERS", - Value: kfb.Spec.BootstrapServers, + Value: strings.Join(kfb.Spec.BootstrapServers, ","), }) + if kfb.Spec.Net.SASL.Enable { spec.Containers[i].Env = append(spec.Containers[i].Env, corev1.EnvVar{ Name: "KAFKA_NET_SASL_ENABLE", diff --git a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_types.go b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_types.go index 9896d3b0c9..1903d59da9 100644 --- a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_types.go +++ b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_types.go @@ -85,7 +85,7 @@ type KafkaNetSpec struct { type KafkaAuthSpec struct { // Bootstrap servers are the Kafka servers the consumer will connect to. // +required - BootstrapServers string `json:"bootstrapServers"` + BootstrapServers []string `json:"bootstrapServers"` Net KafkaNetSpec `json:"net,omitempty"` } diff --git a/kafka/source/pkg/apis/bindings/v1alpha1/zz_generated.deepcopy.go b/kafka/source/pkg/apis/bindings/v1alpha1/zz_generated.deepcopy.go index b9036dc626..660dce5da1 100644 --- a/kafka/source/pkg/apis/bindings/v1alpha1/zz_generated.deepcopy.go +++ b/kafka/source/pkg/apis/bindings/v1alpha1/zz_generated.deepcopy.go @@ -28,6 +28,11 @@ import ( // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KafkaAuthSpec) DeepCopyInto(out *KafkaAuthSpec) { *out = *in + if in.BootstrapServers != nil { + in, out := &in.BootstrapServers, &out.BootstrapServers + *out = make([]string, len(*in)) + copy(*out, *in) + } in.Net.DeepCopyInto(&out.Net) return } diff --git a/kafka/source/pkg/apis/sources/v1alpha1/kafka_types.go b/kafka/source/pkg/apis/sources/v1alpha1/kafka_types.go index c502bfd504..7f6419b80b 100644 --- a/kafka/source/pkg/apis/sources/v1alpha1/kafka_types.go +++ b/kafka/source/pkg/apis/sources/v1alpha1/kafka_types.go @@ -70,7 +70,7 @@ type KafkaSourceSpec struct { // Topic topics to consume messages from // +required - Topics string `json:"topics"` + Topics []string `json:"topics"` // ConsumerGroupID is the consumer group ID. // +optional diff --git a/kafka/source/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go b/kafka/source/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go index 8f10214236..382c218ba8 100644 --- a/kafka/source/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go +++ b/kafka/source/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go @@ -140,6 +140,11 @@ func (in *KafkaSourceList) DeepCopyObject() runtime.Object { func (in *KafkaSourceSpec) DeepCopyInto(out *KafkaSourceSpec) { *out = *in in.KafkaAuthSpec.DeepCopyInto(&out.KafkaAuthSpec) + if in.Topics != nil { + in, out := &in.Topics, &out.Topics + *out = make([]string, len(*in)) + copy(*out, *in) + } if in.Sink != nil { in, out := &in.Sink, &out.Sink *out = new(v1.Destination) diff --git a/kafka/source/pkg/reconciler/source/kafkasource.go b/kafka/source/pkg/reconciler/source/kafkasource.go index 02a8145c98..8882c50e8f 100644 --- a/kafka/source/pkg/reconciler/source/kafkasource.go +++ b/kafka/source/pkg/reconciler/source/kafkasource.go @@ -294,13 +294,15 @@ func (r *Reconciler) UpdateFromMetricsConfigMap(cfg *corev1.ConfigMap) { } func (r *Reconciler) createCloudEventAttributes(src *v1alpha1.KafkaSource) []duckv1.CloudEventAttributes { - topics := strings.Split(src.Spec.Topics, ",") - ceAttributes := make([]duckv1.CloudEventAttributes, 0, len(topics)) - for _, topic := range topics { - ceAttributes = append(ceAttributes, duckv1.CloudEventAttributes{ - Type: v1alpha1.KafkaEventType, - Source: v1alpha1.KafkaEventSource(src.Namespace, src.Name, topic), - }) + ceAttributes := make([]duckv1.CloudEventAttributes, 0, len(src.Spec.Topics)) + for i := range src.Spec.Topics { + topics := strings.Split(src.Spec.Topics[i], ",") + for _, topic := range topics { + ceAttributes = append(ceAttributes, duckv1.CloudEventAttributes{ + Type: v1alpha1.KafkaEventType, + Source: v1alpha1.KafkaEventSource(src.Namespace, src.Name, topic), + }) + } } return ceAttributes } diff --git a/kafka/source/pkg/reconciler/source/resources/receive_adapter.go b/kafka/source/pkg/reconciler/source/resources/receive_adapter.go index 784c0e1554..955030ff91 100644 --- a/kafka/source/pkg/reconciler/source/resources/receive_adapter.go +++ b/kafka/source/pkg/reconciler/source/resources/receive_adapter.go @@ -19,6 +19,7 @@ package resources import ( "fmt" "strconv" + "strings" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -43,10 +44,10 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { env := []corev1.EnvVar{{ Name: "KAFKA_BOOTSTRAP_SERVERS", - Value: args.Source.Spec.BootstrapServers, + Value: strings.Join(args.Source.Spec.BootstrapServers, ","), }, { Name: "KAFKA_TOPICS", - Value: args.Source.Spec.Topics, + Value: strings.Join(args.Source.Spec.Topics, ","), }, { Name: "KAFKA_CONSUMER_GROUP", Value: args.Source.Spec.ConsumerGroup, From fadbbb49444ec3900fbada5370833e7d396920fa Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Wed, 22 Apr 2020 09:52:32 -0400 Subject: [PATCH 2/5] Fix unit tests --- .../apis/bindings/v1alpha1/kafka_lifecycle_test.go | 8 ++++++-- .../apis/sources/v1alpha1/kafka_validation_test.go | 8 ++++---- .../source/resources/receive_adapter_test.go | 12 ++++++------ 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle_test.go b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle_test.go index c7e3ddb7f3..f843d6d5ff 100644 --- a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle_test.go +++ b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle_test.go @@ -243,7 +243,9 @@ func TestKafkaBindingDoSASL(t *testing.T) { vsb := &KafkaBinding{ Spec: KafkaBindingSpec{ KafkaAuthSpec: KafkaAuthSpec{ - BootstrapServers: url.String(), + BootstrapServers: []string{ + url.String(), + }, Net: KafkaNetSpec{ SASL: KafkaSASLSpec{ Enable: true, @@ -499,7 +501,9 @@ func TestKafkaBindingDoTLS(t *testing.T) { vsb := &KafkaBinding{ Spec: KafkaBindingSpec{ KafkaAuthSpec: KafkaAuthSpec{ - BootstrapServers: url.String(), + BootstrapServers: []string{ + url.String(), + }, Net: KafkaNetSpec{ TLS: KafkaTLSSpec{ Enable: true, diff --git a/kafka/source/pkg/apis/sources/v1alpha1/kafka_validation_test.go b/kafka/source/pkg/apis/sources/v1alpha1/kafka_validation_test.go index 81af6dc277..6236f2602a 100644 --- a/kafka/source/pkg/apis/sources/v1alpha1/kafka_validation_test.go +++ b/kafka/source/pkg/apis/sources/v1alpha1/kafka_validation_test.go @@ -28,9 +28,9 @@ import ( var ( fullSpec = KafkaSourceSpec{ KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{ - BootstrapServers: "servers", + BootstrapServers: []string{"servers"}, }, - Topics: "topics", + Topics: []string{"topics"}, ConsumerGroup: "group", Sink: &duckv1.Destination{ Ref: &duckv1.KReference{ @@ -57,7 +57,7 @@ func TestKafkaSourceCheckImmutableFields(t *testing.T) { "Topic changed": { orig: &fullSpec, updated: KafkaSourceSpec{ - Topics: "some-other-topic", + Topics: []string{"some-other-topic"}, Sink: fullSpec.Sink, ServiceAccountName: fullSpec.ServiceAccountName, }, @@ -67,7 +67,7 @@ func TestKafkaSourceCheckImmutableFields(t *testing.T) { orig: &fullSpec, updated: KafkaSourceSpec{ KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{ - BootstrapServers: "server1,server2", + BootstrapServers: []string{"server1,server2"}, }, Sink: fullSpec.Sink, ServiceAccountName: fullSpec.ServiceAccountName, diff --git a/kafka/source/pkg/reconciler/source/resources/receive_adapter_test.go b/kafka/source/pkg/reconciler/source/resources/receive_adapter_test.go index e286f3445e..794d4337a0 100644 --- a/kafka/source/pkg/reconciler/source/resources/receive_adapter_test.go +++ b/kafka/source/pkg/reconciler/source/resources/receive_adapter_test.go @@ -36,10 +36,10 @@ func TestMakeReceiveAdapter(t *testing.T) { }, Spec: v1alpha1.KafkaSourceSpec{ ServiceAccountName: "source-svc-acct", - Topics: "topic1,topic2", + Topics: []string{"topic1,topic2"}, ConsumerGroup: "group", KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{ - BootstrapServers: "server1,server2", + BootstrapServers: []string{"server1,server2"}, Net: bindingsv1alpha1.KafkaNetSpec{ SASL: bindingsv1alpha1.KafkaSASLSpec{ Enable: true, @@ -255,9 +255,9 @@ func TestMakeReceiveAdapterNoNet(t *testing.T) { }, Spec: v1alpha1.KafkaSourceSpec{ ServiceAccountName: "source-svc-acct", - Topics: "topic1,topic2", + Topics: []string{"topic1,topic2"}, KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{ - BootstrapServers: "server1,server2", + BootstrapServers: []string{"server1,server2"}, }, ConsumerGroup: "group", }, @@ -485,9 +485,9 @@ func TestMakeReceiveAdapterKeyType(t *testing.T) { }, Spec: v1alpha1.KafkaSourceSpec{ ServiceAccountName: "source-svc-acct", - Topics: "topic1,topic2", + Topics: []string{"topic1,topic2"}, KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{ - BootstrapServers: "server1,server2", + BootstrapServers: []string{"server1,server2"}, }, ConsumerGroup: "group", }, From b0209e1369c0530faf378d330be4ce16adb4f60f Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Wed, 22 Apr 2020 09:55:08 -0400 Subject: [PATCH 3/5] Fix integration tests --- test/lib/resources/kafka.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/lib/resources/kafka.go b/test/lib/resources/kafka.go index f420e07a6e..b48da2ba6a 100644 --- a/test/lib/resources/kafka.go +++ b/test/lib/resources/kafka.go @@ -85,9 +85,9 @@ func KafkaSource(bootstrapServer string, topicName string, ref *corev1.ObjectRef }, Spec: kafkasourcev1alpha1.KafkaSourceSpec{ KafkaAuthSpec: kafkabindingv1alpha1.KafkaAuthSpec{ - BootstrapServers: bootstrapServer, + BootstrapServers: []string{bootstrapServer}, }, - Topics: topicName, + Topics: []string{topicName}, ConsumerGroup: "test-consumer-group", Sink: &duckv1.Destination{ Ref: &duckv1.KReference{ @@ -108,7 +108,7 @@ func KafkaBinding(bootstrapServer string, ref *tracker.Reference) *kafkabindingv }, Spec: kafkabindingv1alpha1.KafkaBindingSpec{ KafkaAuthSpec: kafkabindingv1alpha1.KafkaAuthSpec{ - BootstrapServers: bootstrapServer, + BootstrapServers: []string{bootstrapServer}, }, BindingSpec: duckv1alpha1.BindingSpec{ Subject: *ref, From 30842d03c97a7f993a40f6bb5e14b122ae459599 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Wed, 22 Apr 2020 11:41:53 -0400 Subject: [PATCH 4/5] Update README.md --- kafka/source/README.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kafka/source/README.md b/kafka/source/README.md index d338cfac36..2a22084e7a 100644 --- a/kafka/source/README.md +++ b/kafka/source/README.md @@ -38,8 +38,10 @@ event sink. consumerGroup: knative-group # Broker URL. Replace this with the URLs for your kafka cluster, # which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092. - bootstrapServers: REPLACE_WITH_CLUSTER_URL - topics: knative-demo-topic + bootstrapServers: + - REPLACE_WITH_CLUSTER_URL + topics: + - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1alpha1 From 82f4c2cf1510ad1e96caaebad2ade96408a5557b Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Wed, 22 Apr 2020 11:44:24 -0400 Subject: [PATCH 5/5] Fix lint --- kafka/source/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/source/README.md b/kafka/source/README.md index 2a22084e7a..d0e927efd8 100644 --- a/kafka/source/README.md +++ b/kafka/source/README.md @@ -39,9 +39,9 @@ event sink. # Broker URL. Replace this with the URLs for your kafka cluster, # which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092. bootstrapServers: - - REPLACE_WITH_CLUSTER_URL + - REPLACE_WITH_CLUSTER_URL topics: - - knative-demo-topic + - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1alpha1