Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding support for Kafka on Agent #3226

Merged
merged 2 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/proto/orchestrator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ message TraceIdResponse {

message DataStoreConnectionTestRequest {
string requestID = 1;
DataStore datastore = 5;
DataStore datastore = 2;
}

message DataStoreConnectionTestResponse {
Expand Down
50 changes: 50 additions & 0 deletions agent/workers/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewTriggerWorker(client *client.Client, opts ...TriggerOption) *TriggerWork
registry.Add(agentTrigger.HTTP())
registry.Add(agentTrigger.GRPC())
registry.Add(agentTrigger.TRACEID())
registry.Add(agentTrigger.KAFKA())

worker := &TriggerWorker{
client: client,
Expand Down Expand Up @@ -94,6 +95,7 @@ func convertProtoToTrigger(pt *proto.Trigger) trigger.Trigger {
HTTP: convertProtoHttpTriggerToHttpTrigger(pt.Http),
GRPC: convertProtoGrpcTriggerToGrpcTrigger(pt.Grpc),
TraceID: convertProtoTraceIDTriggerToTraceIDTrigger(pt.TraceID),
Kafka: convertProtoKafkaTriggerToKafkaTrigger(pt.Kafka),
}
}

Expand Down Expand Up @@ -189,6 +191,42 @@ func convertProtoTraceIDTriggerToTraceIDTrigger(traceIDRequest *proto.TraceIDReq
}
}

func convertProtoKafkaTriggerToKafkaTrigger(kafkaRequest *proto.KafkaRequest) *trigger.KafkaRequest {
if kafkaRequest == nil {
return nil
}

headers := make([]trigger.KafkaMessageHeader, len(kafkaRequest.Headers))

for i, h := range headers {
headers[i] = trigger.KafkaMessageHeader{Key: h.Key, Value: h.Value}
}

return &trigger.KafkaRequest{
BrokerURLs: kafkaRequest.BrokerUrls,
Topic: kafkaRequest.Topic,
Headers: headers,
Authentication: convertProtoKafkaAuthToKafkaAuth(kafkaRequest.Authentication),
MessageKey: kafkaRequest.MessageKey,
MessageValue: kafkaRequest.MessageValue,
SSLVerification: kafkaRequest.SslVerification,
}
}

func convertProtoKafkaAuthToKafkaAuth(kafkaAuthentication *proto.KafkaAuthentication) *trigger.KafkaAuthenticator {
if kafkaAuthentication == nil {
return nil
}

return &trigger.KafkaAuthenticator{
Type: kafkaAuthentication.Type,
Plain: &trigger.KafkaPlainAuthenticator{
Username: kafkaAuthentication.Plain.Username,
Password: kafkaAuthentication.Plain.Password,
},
}
}

func convertResponseToProtoResponse(request *proto.TriggerRequest, response agentTrigger.Response) *proto.TriggerResponse {
return &proto.TriggerResponse{
TestID: request.TestID,
Expand All @@ -198,6 +236,7 @@ func convertResponseToProtoResponse(request *proto.TriggerRequest, response agen
Http: convertHttpResponseToProto(response.Result.HTTP),
Grpc: convertGrpcResponseToProto(response.Result.GRPC),
TraceID: convertTraceIDResponseToProto(response.Result.TraceID),
Kafka: convertKafkaResponseToProto(response.Result.Kafka),
},
}
}
Expand Down Expand Up @@ -246,3 +285,14 @@ func convertTraceIDResponseToProto(traceID *trigger.TraceIDResponse) *proto.Trac
Id: traceID.ID,
}
}

func convertKafkaResponseToProto(kafka *trigger.KafkaResponse) *proto.KafkaResponse {
if kafka == nil || kafka.Offset == "" {
return nil
}

return &proto.KafkaResponse{
Partition: kafka.Partition,
Offset: kafka.Offset,
}
}
76 changes: 76 additions & 0 deletions agent/workers/trigger/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package trigger

import (
"context"
"fmt"

"github.com/kubeshop/tracetest/server/pkg/kafka"
"github.com/kubeshop/tracetest/server/test/trigger"
"go.opentelemetry.io/otel/propagation"
)

func KAFKA() Triggerer {
return &kafkaTriggerer{}
}

type kafkaTriggerer struct{}

func (te *kafkaTriggerer) Trigger(ctx context.Context, triggerConfig trigger.Trigger, opts *Options) (Response, error) {
response := Response{
Result: trigger.TriggerResult{
Type: te.Type(),
},
}

kafkaTriggerRequest := triggerConfig.Kafka
kafkaConfig := te.getConfig(kafkaTriggerRequest)

kafkaProducer, err := kafka.GetProducer(kafkaConfig)
if err != nil {
return response, fmt.Errorf("error when creating kafka producer: %w", err)
}
defer kafkaProducer.Close()

messageHeaders := kafkaTriggerRequest.GetHeaderAsMap()
propagators().Inject(ctx, propagation.MapCarrier(messageHeaders))

result, err := kafkaProducer.ProduceSyncMessage(ctx, kafkaTriggerRequest.MessageKey, kafkaTriggerRequest.MessageValue, messageHeaders)
if err != nil {
return response, fmt.Errorf("error when sending message to kafka producer: %w", err)
}

response.Result.Kafka = &trigger.KafkaResponse{
Partition: result.Partition,
Offset: result.Offset,
}

response.SpanAttributes = map[string]string{
"tracetest.run.trigger.kafka.partition": result.Partition,
"tracetest.run.trigger.kafka.offset": result.Offset,
}

return response, nil
}

func (t *kafkaTriggerer) Type() trigger.TriggerType {
return trigger.TriggerTypeKafka
}

func (t *kafkaTriggerer) getConfig(request *trigger.KafkaRequest) kafka.Config {
config := kafka.Config{
BrokerURLs: request.BrokerURLs,
Topic: request.Topic,
SSLVerification: request.SSLVerification,
}

if request.Authentication == nil || request.Authentication.Plain == nil {
return config
}

config.Authentication = &kafka.AuthenticationConfig{
Username: request.Authentication.Plain.Username,
Password: request.Authentication.Plain.Password,
}

return config
}
Loading