Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

kafkasource: change bootstrapServers and Topics to accept arrays #1156

Merged
merged 5 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions kafka/source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ event sink.
consumerGroup: knative-group
# Broker URL. Replace this with the URLs for your kafka cluster,
# which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092.
bootstrapServers: REPLACE_WITH_CLUSTER_URL
topics: knative-demo-topic
bootstrapServers:
- REPLACE_WITH_CLUSTER_URL
lberk marked this conversation as resolved.
Show resolved Hide resolved
topics:
- knative-demo-topic
lberk marked this conversation as resolved.
Show resolved Hide resolved
sink:
ref:
apiVersion: serving.knative.dev/v1alpha1
Expand Down
4 changes: 2 additions & 2 deletions kafka/source/config/300-kafkasource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ spec:
spec:
properties:
bootstrapServers:
type: string
type: array
minLength: 1
topics:
type: string
type: array
minLength: 1
consumerGroup:
type: string
Expand Down
6 changes: 4 additions & 2 deletions kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"context"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (kfb *KafkaBinding) Do(ctx context.Context, ps *duckv1.WithPod) {
for i := range spec.InitContainers {
spec.InitContainers[i].Env = append(spec.InitContainers[i].Env, corev1.EnvVar{
Name: "KAFKA_BOOTSTRAP_SERVERS",
Value: kfb.Spec.BootstrapServers,
Value: strings.Join(kfb.Spec.BootstrapServers, ","),
})
if kfb.Spec.Net.SASL.Enable {
spec.InitContainers[i].Env = append(spec.InitContainers[i].Env, corev1.EnvVar{
Expand Down Expand Up @@ -124,8 +125,9 @@ func (kfb *KafkaBinding) Do(ctx context.Context, ps *duckv1.WithPod) {
for i := range spec.Containers {
spec.Containers[i].Env = append(spec.Containers[i].Env, corev1.EnvVar{
Name: "KAFKA_BOOTSTRAP_SERVERS",
Value: kfb.Spec.BootstrapServers,
Value: strings.Join(kfb.Spec.BootstrapServers, ","),
})

if kfb.Spec.Net.SASL.Enable {
spec.Containers[i].Env = append(spec.Containers[i].Env, corev1.EnvVar{
Name: "KAFKA_NET_SASL_ENABLE",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ func TestKafkaBindingDoSASL(t *testing.T) {
vsb := &KafkaBinding{
Spec: KafkaBindingSpec{
KafkaAuthSpec: KafkaAuthSpec{
BootstrapServers: url.String(),
BootstrapServers: []string{
url.String(),
},
Net: KafkaNetSpec{
SASL: KafkaSASLSpec{
Enable: true,
Expand Down Expand Up @@ -499,7 +501,9 @@ func TestKafkaBindingDoTLS(t *testing.T) {
vsb := &KafkaBinding{
Spec: KafkaBindingSpec{
KafkaAuthSpec: KafkaAuthSpec{
BootstrapServers: url.String(),
BootstrapServers: []string{
url.String(),
},
Net: KafkaNetSpec{
TLS: KafkaTLSSpec{
Enable: true,
Expand Down
2 changes: 1 addition & 1 deletion kafka/source/pkg/apis/bindings/v1alpha1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type KafkaNetSpec struct {
type KafkaAuthSpec struct {
// Bootstrap servers are the Kafka servers the consumer will connect to.
// +required
BootstrapServers string `json:"bootstrapServers"`
BootstrapServers []string `json:"bootstrapServers"`

Net KafkaNetSpec `json:"net,omitempty"`
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion kafka/source/pkg/apis/sources/v1alpha1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type KafkaSourceSpec struct {

// Topic topics to consume messages from
// +required
Topics string `json:"topics"`
Topics []string `json:"topics"`

// ConsumerGroupID is the consumer group ID.
// +optional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
var (
fullSpec = KafkaSourceSpec{
KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{
BootstrapServers: "servers",
BootstrapServers: []string{"servers"},
},
Topics: "topics",
Topics: []string{"topics"},
ConsumerGroup: "group",
Sink: &duckv1.Destination{
Ref: &duckv1.KReference{
Expand All @@ -57,7 +57,7 @@ func TestKafkaSourceCheckImmutableFields(t *testing.T) {
"Topic changed": {
orig: &fullSpec,
updated: KafkaSourceSpec{
Topics: "some-other-topic",
Topics: []string{"some-other-topic"},
Sink: fullSpec.Sink,
ServiceAccountName: fullSpec.ServiceAccountName,
},
Expand All @@ -67,7 +67,7 @@ func TestKafkaSourceCheckImmutableFields(t *testing.T) {
orig: &fullSpec,
updated: KafkaSourceSpec{
KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{
BootstrapServers: "server1,server2",
BootstrapServers: []string{"server1,server2"},
},
Sink: fullSpec.Sink,
ServiceAccountName: fullSpec.ServiceAccountName,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions kafka/source/pkg/reconciler/source/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,15 @@ func (r *Reconciler) UpdateFromMetricsConfigMap(cfg *corev1.ConfigMap) {
}

func (r *Reconciler) createCloudEventAttributes(src *v1alpha1.KafkaSource) []duckv1.CloudEventAttributes {
topics := strings.Split(src.Spec.Topics, ",")
ceAttributes := make([]duckv1.CloudEventAttributes, 0, len(topics))
for _, topic := range topics {
ceAttributes = append(ceAttributes, duckv1.CloudEventAttributes{
Type: v1alpha1.KafkaEventType,
Source: v1alpha1.KafkaEventSource(src.Namespace, src.Name, topic),
})
ceAttributes := make([]duckv1.CloudEventAttributes, 0, len(src.Spec.Topics))
for i := range src.Spec.Topics {
topics := strings.Split(src.Spec.Topics[i], ",")
for _, topic := range topics {
ceAttributes = append(ceAttributes, duckv1.CloudEventAttributes{
Type: v1alpha1.KafkaEventType,
Source: v1alpha1.KafkaEventSource(src.Namespace, src.Name, topic),
})
}
}
return ceAttributes
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package resources
import (
"fmt"
"strconv"
"strings"

v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -43,10 +44,10 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment {

env := []corev1.EnvVar{{
Name: "KAFKA_BOOTSTRAP_SERVERS",
Value: args.Source.Spec.BootstrapServers,
Value: strings.Join(args.Source.Spec.BootstrapServers, ","),
}, {
Name: "KAFKA_TOPICS",
Value: args.Source.Spec.Topics,
Value: strings.Join(args.Source.Spec.Topics, ","),
}, {
Name: "KAFKA_CONSUMER_GROUP",
Value: args.Source.Spec.ConsumerGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func TestMakeReceiveAdapter(t *testing.T) {
},
Spec: v1alpha1.KafkaSourceSpec{
ServiceAccountName: "source-svc-acct",
Topics: "topic1,topic2",
Topics: []string{"topic1,topic2"},
ConsumerGroup: "group",
KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{
BootstrapServers: "server1,server2",
BootstrapServers: []string{"server1,server2"},
Net: bindingsv1alpha1.KafkaNetSpec{
SASL: bindingsv1alpha1.KafkaSASLSpec{
Enable: true,
Expand Down Expand Up @@ -255,9 +255,9 @@ func TestMakeReceiveAdapterNoNet(t *testing.T) {
},
Spec: v1alpha1.KafkaSourceSpec{
ServiceAccountName: "source-svc-acct",
Topics: "topic1,topic2",
Topics: []string{"topic1,topic2"},
KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{
BootstrapServers: "server1,server2",
BootstrapServers: []string{"server1,server2"},
},
ConsumerGroup: "group",
},
Expand Down Expand Up @@ -485,9 +485,9 @@ func TestMakeReceiveAdapterKeyType(t *testing.T) {
},
Spec: v1alpha1.KafkaSourceSpec{
ServiceAccountName: "source-svc-acct",
Topics: "topic1,topic2",
Topics: []string{"topic1,topic2"},
KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{
BootstrapServers: "server1,server2",
BootstrapServers: []string{"server1,server2"},
},
ConsumerGroup: "group",
},
Expand Down
6 changes: 3 additions & 3 deletions test/lib/resources/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ func KafkaSource(bootstrapServer string, topicName string, ref *corev1.ObjectRef
},
Spec: kafkasourcev1alpha1.KafkaSourceSpec{
KafkaAuthSpec: kafkabindingv1alpha1.KafkaAuthSpec{
BootstrapServers: bootstrapServer,
BootstrapServers: []string{bootstrapServer},
},
Topics: topicName,
Topics: []string{topicName},
ConsumerGroup: "test-consumer-group",
Sink: &duckv1.Destination{
Ref: &duckv1.KReference{
Expand All @@ -108,7 +108,7 @@ func KafkaBinding(bootstrapServer string, ref *tracker.Reference) *kafkabindingv
},
Spec: kafkabindingv1alpha1.KafkaBindingSpec{
KafkaAuthSpec: kafkabindingv1alpha1.KafkaAuthSpec{
BootstrapServers: bootstrapServer,
BootstrapServers: []string{bootstrapServer},
},
BindingSpec: duckv1alpha1.BindingSpec{
Subject: *ref,
Expand Down