Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

kafkasource: change bootstrapServers and Topics to accept arrays #1156

Merged
merged 5 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions kafka/source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ event sink.
consumerGroup: knative-group
# Broker URL. Replace this with the URLs for your kafka cluster,
# which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092.
bootstrapServers: REPLACE_WITH_CLUSTER_URL
topics: knative-demo-topic
bootstrapServers:
- REPLACE_WITH_CLUSTER_URL
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1alpha1
Expand Down
4 changes: 2 additions & 2 deletions kafka/source/config/300-kafkasource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ spec:
spec:
properties:
bootstrapServers:
type: string
type: array
minLength: 1
topics:
type: string
type: array
minLength: 1
consumerGroup:
type: string
Expand Down
6 changes: 4 additions & 2 deletions kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"context"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (kfb *KafkaBinding) Do(ctx context.Context, ps *duckv1.WithPod) {
for i := range spec.InitContainers {
spec.InitContainers[i].Env = append(spec.InitContainers[i].Env, corev1.EnvVar{
Name: "KAFKA_BOOTSTRAP_SERVERS",
Value: kfb.Spec.BootstrapServers,
Value: strings.Join(kfb.Spec.BootstrapServers, ","),
})
if kfb.Spec.Net.SASL.Enable {
spec.InitContainers[i].Env = append(spec.InitContainers[i].Env, corev1.EnvVar{
Expand Down Expand Up @@ -124,8 +125,9 @@ func (kfb *KafkaBinding) Do(ctx context.Context, ps *duckv1.WithPod) {
for i := range spec.Containers {
spec.Containers[i].Env = append(spec.Containers[i].Env, corev1.EnvVar{
Name: "KAFKA_BOOTSTRAP_SERVERS",
Value: kfb.Spec.BootstrapServers,
Value: strings.Join(kfb.Spec.BootstrapServers, ","),
})

if kfb.Spec.Net.SASL.Enable {
spec.Containers[i].Env = append(spec.Containers[i].Env, corev1.EnvVar{
Name: "KAFKA_NET_SASL_ENABLE",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ func TestKafkaBindingDoSASL(t *testing.T) {
vsb := &KafkaBinding{
Spec: KafkaBindingSpec{
KafkaAuthSpec: KafkaAuthSpec{
BootstrapServers: url.String(),
BootstrapServers: []string{
url.String(),
},
Net: KafkaNetSpec{
SASL: KafkaSASLSpec{
Enable: true,
Expand Down Expand Up @@ -499,7 +501,9 @@ func TestKafkaBindingDoTLS(t *testing.T) {
vsb := &KafkaBinding{
Spec: KafkaBindingSpec{
KafkaAuthSpec: KafkaAuthSpec{
BootstrapServers: url.String(),
BootstrapServers: []string{
url.String(),
},
Net: KafkaNetSpec{
TLS: KafkaTLSSpec{
Enable: true,
Expand Down
2 changes: 1 addition & 1 deletion kafka/source/pkg/apis/bindings/v1alpha1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type KafkaNetSpec struct {
type KafkaAuthSpec struct {
// Bootstrap servers are the Kafka servers the consumer will connect to.
// +required
BootstrapServers string `json:"bootstrapServers"`
BootstrapServers []string `json:"bootstrapServers"`

Net KafkaNetSpec `json:"net,omitempty"`
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion kafka/source/pkg/apis/sources/v1alpha1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type KafkaSourceSpec struct {

// Topic topics to consume messages from
// +required
Topics string `json:"topics"`
Topics []string `json:"topics"`

// ConsumerGroupID is the consumer group ID.
// +optional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
var (
fullSpec = KafkaSourceSpec{
KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{
BootstrapServers: "servers",
BootstrapServers: []string{"servers"},
},
Topics: "topics",
Topics: []string{"topics"},
ConsumerGroup: "group",
Sink: &duckv1.Destination{
Ref: &duckv1.KReference{
Expand All @@ -57,7 +57,7 @@ func TestKafkaSourceCheckImmutableFields(t *testing.T) {
"Topic changed": {
orig: &fullSpec,
updated: KafkaSourceSpec{
Topics: "some-other-topic",
Topics: []string{"some-other-topic"},
Sink: fullSpec.Sink,
ServiceAccountName: fullSpec.ServiceAccountName,
},
Expand All @@ -67,7 +67,7 @@ func TestKafkaSourceCheckImmutableFields(t *testing.T) {
orig: &fullSpec,
updated: KafkaSourceSpec{
KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{
BootstrapServers: "server1,server2",
BootstrapServers: []string{"server1,server2"},
},
Sink: fullSpec.Sink,
ServiceAccountName: fullSpec.ServiceAccountName,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions kafka/source/pkg/reconciler/source/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,15 @@ func (r *Reconciler) UpdateFromMetricsConfigMap(cfg *corev1.ConfigMap) {
}

func (r *Reconciler) createCloudEventAttributes(src *v1alpha1.KafkaSource) []duckv1.CloudEventAttributes {
topics := strings.Split(src.Spec.Topics, ",")
ceAttributes := make([]duckv1.CloudEventAttributes, 0, len(topics))
for _, topic := range topics {
ceAttributes = append(ceAttributes, duckv1.CloudEventAttributes{
Type: v1alpha1.KafkaEventType,
Source: v1alpha1.KafkaEventSource(src.Namespace, src.Name, topic),
})
ceAttributes := make([]duckv1.CloudEventAttributes, 0, len(src.Spec.Topics))
for i := range src.Spec.Topics {
topics := strings.Split(src.Spec.Topics[i], ",")
for _, topic := range topics {
ceAttributes = append(ceAttributes, duckv1.CloudEventAttributes{
Type: v1alpha1.KafkaEventType,
Source: v1alpha1.KafkaEventSource(src.Namespace, src.Name, topic),
})
}
}
return ceAttributes
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package resources
import (
"fmt"
"strconv"
"strings"

v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -43,10 +44,10 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment {

env := []corev1.EnvVar{{
Name: "KAFKA_BOOTSTRAP_SERVERS",
Value: args.Source.Spec.BootstrapServers,
Value: strings.Join(args.Source.Spec.BootstrapServers, ","),
}, {
Name: "KAFKA_TOPICS",
Value: args.Source.Spec.Topics,
Value: strings.Join(args.Source.Spec.Topics, ","),
}, {
Name: "KAFKA_CONSUMER_GROUP",
Value: args.Source.Spec.ConsumerGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func TestMakeReceiveAdapter(t *testing.T) {
},
Spec: v1alpha1.KafkaSourceSpec{
ServiceAccountName: "source-svc-acct",
Topics: "topic1,topic2",
Topics: []string{"topic1,topic2"},
ConsumerGroup: "group",
KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{
BootstrapServers: "server1,server2",
BootstrapServers: []string{"server1,server2"},
Net: bindingsv1alpha1.KafkaNetSpec{
SASL: bindingsv1alpha1.KafkaSASLSpec{
Enable: true,
Expand Down Expand Up @@ -255,9 +255,9 @@ func TestMakeReceiveAdapterNoNet(t *testing.T) {
},
Spec: v1alpha1.KafkaSourceSpec{
ServiceAccountName: "source-svc-acct",
Topics: "topic1,topic2",
Topics: []string{"topic1,topic2"},
KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{
BootstrapServers: "server1,server2",
BootstrapServers: []string{"server1,server2"},
},
ConsumerGroup: "group",
},
Expand Down Expand Up @@ -485,9 +485,9 @@ func TestMakeReceiveAdapterKeyType(t *testing.T) {
},
Spec: v1alpha1.KafkaSourceSpec{
ServiceAccountName: "source-svc-acct",
Topics: "topic1,topic2",
Topics: []string{"topic1,topic2"},
KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{
BootstrapServers: "server1,server2",
BootstrapServers: []string{"server1,server2"},
},
ConsumerGroup: "group",
},
Expand Down
6 changes: 3 additions & 3 deletions test/lib/resources/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ func KafkaSource(bootstrapServer string, topicName string, ref *corev1.ObjectRef
},
Spec: kafkasourcev1alpha1.KafkaSourceSpec{
KafkaAuthSpec: kafkabindingv1alpha1.KafkaAuthSpec{
BootstrapServers: bootstrapServer,
BootstrapServers: []string{bootstrapServer},
},
Topics: topicName,
Topics: []string{topicName},
ConsumerGroup: "test-consumer-group",
Sink: &duckv1.Destination{
Ref: &duckv1.KReference{
Expand All @@ -108,7 +108,7 @@ func KafkaBinding(bootstrapServer string, ref *tracker.Reference) *kafkabindingv
},
Spec: kafkabindingv1alpha1.KafkaBindingSpec{
KafkaAuthSpec: kafkabindingv1alpha1.KafkaAuthSpec{
BootstrapServers: bootstrapServer,
BootstrapServers: []string{bootstrapServer},
},
BindingSpec: duckv1alpha1.BindingSpec{
Subject: *ref,
Expand Down