Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kafka output] removed regex validation to allow dynamic topic #40415

Merged
merged 11 commits into from
Aug 14, 2024
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- elasticsearch output now supports `idle_connection_timeout`. {issue}35616[35615] {pull}36843[36843]
- Enable early event encoding in the Elasticsearch output, improving cpu and memory use {pull}38572[38572]
- The environment variable `BEATS_ADD_CLOUD_METADATA_PROVIDERS` overrides configured/default `add_cloud_metadata` providers {pull}38669[38669]
- Kafka output allows dynamic topic in `topic` field {pull}40415[40415]
juliaElastic marked this conversation as resolved.
Show resolved Hide resolved

*Auditbeat*

Expand Down
10 changes: 0 additions & 10 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"math"
"math/rand"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -109,11 +108,6 @@ var compressionModes = map[string]sarama.CompressionCodec{
"snappy": sarama.CompressionSnappy,
}

// validTopicRegExp is used to validate the topic contains only valid characters
// when running under Elastic-Agent. The regexp is taken from:
// https://github.com/apache/kafka/blob/a126e3a622f2b7142f3543b9dbee54b6412ba9d8/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L33
var validTopicRegExp = regexp.MustCompile("^[a-zA-Z0-9._-]+$")

func defaultConfig() kafkaConfig {
return kafkaConfig{
Hosts: nil,
Expand Down Expand Up @@ -193,10 +187,6 @@ func (c *kafkaConfig) Validate() error {
if len(c.Topics) != 0 {
return errors.New("'topics' is not supported when running under Elastic-Agent")
}

if !validTopicRegExp.MatchString(c.Topic) {
return fmt.Errorf("topic '%s' is invalid, it must match '[a-zA-Z0-9._-]'", c.Topic)
}
}

return nil
Expand Down
19 changes: 2 additions & 17 deletions libbeat/outputs/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,25 +142,10 @@ func TestConfigUnderElasticAgent(t *testing.T) {
expectError: true,
},
{
name: "topic cannot contain invalid characters",
name: "valid topic with dynamic topic selection",
cfg: mapstr.M{
"topic": "foo bar",
"topic": "%{[event.field]}",
},
expectError: true,
},
{
name: "topic with invalid characters",
cfg: mapstr.M{
"topic": "foo + bar",
},
expectError: true,
},
{
name: "topic with invalid characters from dynamic topic selection",
cfg: mapstr.M{
"topic": "%{event.field}",
},
expectError: true,
},

// The default config does not set `topic` not `topics`.
Expand Down
17 changes: 3 additions & 14 deletions libbeat/outputs/kafka/kafka.go
Copy link
Contributor Author

@juliaElastic juliaElastic Aug 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying to undo the changes added here: https://github.com/elastic/beats/pull/37902/files#diff-d8e7a8c5739c80fb30e5c2f22ff87b2c8d476d8e9b71c222f2bcd6b4a6d0c3d6
though the dynamic topic resolution still doesn't seem to work

tested with standalone agent, but the topic resolution doesn't seem to work:

{"log.level":"error","@timestamp":"2024-08-02T12:01:28.385Z","message":"Kafka (topic=%{[data_stream.dataset]}): kafka server: The request attempted to perform an operation on an invalid topic.","component":{"binary":"filebeat","dataset":"elastic_agent.filebeat","id":"log-e1d038f6-44ea-4a4e-bd8c-e03fc4b224ac","type":"log"},"log":{"source":"log-e1d038f6-44ea-4a4e-bd8c-e03fc4b224ac"},"log.logger":"kafka","log.origin":{"file.line":338,"file.name":"kafka/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).errorWorker"},"service.name":"filebeat","ecs.version":"1.6.0","ecs.version":"1.6.0"}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmacknz Can I get some help/review on this? I'm not sure why the dynamic topic is not being resolved in agent. Tried to add debug logs locally, but I'm not seeing the buildTopicSelector function being triggered.

Copy link
Member

@cmacknz cmacknz Aug 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I see some things to adjust but nothing that would cause that function not to be hit at all.

How are you testing this? You now need to build x-pack/agentbeat and copy over the resulting agentbeat binary, so make sure you are doing that and not copying one of the beat binaries directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't copy anything, I thought the EXTERNAL=false flag should build and use the local beats repo. I'll try copying it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay I could build beats, copied over to agent and now I can see the dynamic topic resolution works as expected, I'll update the description with screenshots

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably EXTERNAL=false isn't updated to be aware of agentbeat yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I missed the part where the beats are not rebuilt if it was built already, and the step to copy it over to agent.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/Shopify/sarama"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/beats/v7/libbeat/outputs/outil"
Expand Down Expand Up @@ -91,21 +90,11 @@ func makeKafka(
// running under Elastic-Agent based on cfg.
//
// When running standalone the topic selector works as expected and documented.
// When running under Elastic-Agent, dynamic topic selection is not supported,
// so a constant selector using the `topic` value is returned.
// When running under Elastic-Agent, dynamic topic selection is also supported
func buildTopicSelector(cfg *config.C) (outil.Selector, error) {
topicCfg := struct {
Topic string `config:"topic" yaml:"topic"`
}{}

if err := cfg.Unpack(&topicCfg); err != nil {
return outil.Selector{}, fmt.Errorf("cannot unpack Kafka config to read the topic: %w", err)
}

if management.UnderAgent() {
exprSelector := outil.ConstSelectorExpr(topicCfg.Topic, outil.SelectorKeepCase)
selector := outil.MakeSelector(exprSelector)
return selector, nil
if cfg == nil {
return outil.Selector{}, fmt.Errorf("Kafka config cannot be nil")
}

return outil.BuildSelectorFromConfig(cfg, outil.Settings{
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestBuildTopicSelector(t *testing.T) {
{
name: "dynamic topic under agent",
topic: "%{[foo]}",
expected: "%{[foo]}",
expected: "bar",
underAgent: true,
},
{
Expand Down
Loading