diff --git a/agent/proto/orchestrator.proto b/agent/proto/orchestrator.proto index ac691276b0..7cf4a8851f 100644 --- a/agent/proto/orchestrator.proto +++ b/agent/proto/orchestrator.proto @@ -178,7 +178,7 @@ message TraceIdResponse { message DataStoreConnectionTestRequest { string requestID = 1; - DataStore datastore = 5; + DataStore datastore = 2; } message DataStoreConnectionTestResponse { diff --git a/agent/workers/trigger.go b/agent/workers/trigger.go index 7998a06235..66eefb2717 100644 --- a/agent/workers/trigger.go +++ b/agent/workers/trigger.go @@ -36,6 +36,7 @@ func NewTriggerWorker(client *client.Client, opts ...TriggerOption) *TriggerWork registry.Add(agentTrigger.HTTP()) registry.Add(agentTrigger.GRPC()) registry.Add(agentTrigger.TRACEID()) + registry.Add(agentTrigger.KAFKA()) worker := &TriggerWorker{ client: client, @@ -94,6 +95,7 @@ func convertProtoToTrigger(pt *proto.Trigger) trigger.Trigger { HTTP: convertProtoHttpTriggerToHttpTrigger(pt.Http), GRPC: convertProtoGrpcTriggerToGrpcTrigger(pt.Grpc), TraceID: convertProtoTraceIDTriggerToTraceIDTrigger(pt.TraceID), + Kafka: convertProtoKafkaTriggerToKafkaTrigger(pt.Kafka), } } @@ -189,6 +191,42 @@ func convertProtoTraceIDTriggerToTraceIDTrigger(traceIDRequest *proto.TraceIDReq } } +func convertProtoKafkaTriggerToKafkaTrigger(kafkaRequest *proto.KafkaRequest) *trigger.KafkaRequest { + if kafkaRequest == nil { + return nil + } + + headers := make([]trigger.KafkaMessageHeader, len(kafkaRequest.Headers)) + + for i, h := range headers { + headers[i] = trigger.KafkaMessageHeader{Key: h.Key, Value: h.Value} + } + + return &trigger.KafkaRequest{ + BrokerURLs: kafkaRequest.BrokerUrls, + Topic: kafkaRequest.Topic, + Headers: headers, + Authentication: convertProtoKafkaAuthToKafkaAuth(kafkaRequest.Authentication), + MessageKey: kafkaRequest.MessageKey, + MessageValue: kafkaRequest.MessageValue, + SSLVerification: kafkaRequest.SslVerification, + } +} + +func convertProtoKafkaAuthToKafkaAuth(kafkaAuthentication *proto.KafkaAuthentication) *trigger.KafkaAuthenticator { + if kafkaAuthentication == nil { + return nil + } + + return &trigger.KafkaAuthenticator{ + Type: kafkaAuthentication.Type, + Plain: &trigger.KafkaPlainAuthenticator{ + Username: kafkaAuthentication.Plain.Username, + Password: kafkaAuthentication.Plain.Password, + }, + } +} + func convertResponseToProtoResponse(request *proto.TriggerRequest, response agentTrigger.Response) *proto.TriggerResponse { return &proto.TriggerResponse{ TestID: request.TestID, @@ -198,6 +236,7 @@ func convertResponseToProtoResponse(request *proto.TriggerRequest, response agen Http: convertHttpResponseToProto(response.Result.HTTP), Grpc: convertGrpcResponseToProto(response.Result.GRPC), TraceID: convertTraceIDResponseToProto(response.Result.TraceID), + Kafka: convertKafkaResponseToProto(response.Result.Kafka), }, } } @@ -246,3 +285,14 @@ func convertTraceIDResponseToProto(traceID *trigger.TraceIDResponse) *proto.Trac Id: traceID.ID, } } + +func convertKafkaResponseToProto(kafka *trigger.KafkaResponse) *proto.KafkaResponse { + if kafka == nil || kafka.Offset == "" { + return nil + } + + return &proto.KafkaResponse{ + Partition: kafka.Partition, + Offset: kafka.Offset, + } +} diff --git a/agent/workers/trigger/kafka.go b/agent/workers/trigger/kafka.go new file mode 100644 index 0000000000..233bcc0b80 --- /dev/null +++ b/agent/workers/trigger/kafka.go @@ -0,0 +1,76 @@ +package trigger + +import ( + "context" + "fmt" + + "github.com/kubeshop/tracetest/server/pkg/kafka" + "github.com/kubeshop/tracetest/server/test/trigger" + "go.opentelemetry.io/otel/propagation" +) + +func KAFKA() Triggerer { + return &kafkaTriggerer{} +} + +type kafkaTriggerer struct{} + +func (te *kafkaTriggerer) Trigger(ctx context.Context, triggerConfig trigger.Trigger, opts *Options) (Response, error) { + response := Response{ + Result: trigger.TriggerResult{ + Type: te.Type(), + }, + } + + kafkaTriggerRequest := triggerConfig.Kafka + kafkaConfig := te.getConfig(kafkaTriggerRequest) + + kafkaProducer, err := kafka.GetProducer(kafkaConfig) + if err != nil { + return response, fmt.Errorf("error when creating kafka producer: %w", err) + } + defer kafkaProducer.Close() + + messageHeaders := kafkaTriggerRequest.GetHeaderAsMap() + propagators().Inject(ctx, propagation.MapCarrier(messageHeaders)) + + result, err := kafkaProducer.ProduceSyncMessage(ctx, kafkaTriggerRequest.MessageKey, kafkaTriggerRequest.MessageValue, messageHeaders) + if err != nil { + return response, fmt.Errorf("error when sending message to kafka producer: %w", err) + } + + response.Result.Kafka = &trigger.KafkaResponse{ + Partition: result.Partition, + Offset: result.Offset, + } + + response.SpanAttributes = map[string]string{ + "tracetest.run.trigger.kafka.partition": result.Partition, + "tracetest.run.trigger.kafka.offset": result.Offset, + } + + return response, nil +} + +func (t *kafkaTriggerer) Type() trigger.TriggerType { + return trigger.TriggerTypeKafka +} + +func (t *kafkaTriggerer) getConfig(request *trigger.KafkaRequest) kafka.Config { + config := kafka.Config{ + BrokerURLs: request.BrokerURLs, + Topic: request.Topic, + SSLVerification: request.SSLVerification, + } + + if request.Authentication == nil || request.Authentication.Plain == nil { + return config + } + + config.Authentication = &kafka.AuthenticationConfig{ + Username: request.Authentication.Plain.Username, + Password: request.Authentication.Plain.Password, + } + + return config +}