Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat] Add SSL and AUTH username support for Redis input #40111

Merged
merged 14 commits into from
Aug 1, 2024
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Relax constraint on Base DN in entity analytics Active Directory provider. {pull}40054[40054]
- Enhance input state reporting for CEL evaluations that return a single error object in events. {pull}40083[40083]
- Allow absent credentials when using GCS with Application Default Credentials. {issue}39977[39977] {pull}40072[40072]
- Add SSL support for Redis input. {pull}40111[40111]
gpop63 marked this conversation as resolved.
Show resolved Hide resolved

*Auditbeat*

Expand Down
30 changes: 18 additions & 12 deletions filebeat/input/redis/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,31 @@
package redis

import (
"crypto/tls"
"time"

"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)

var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "redis",
},
Network: "tcp",
MaxConn: 10,
Password: "",
func defaultConfig() config {
return config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "redis",
},
Network: "tcp",
MaxConn: 10,
}
}

type config struct {
harvester.ForwarderConfig `config:",inline"`
Hosts []string `config:"hosts" validate:"required"`
IdleTimeout time.Duration `config:"idle_timeout"`
Network string `config:"network"`
MaxConn int `config:"maxconn" validate:"min=1"`
Password string `config:"password"`
Hosts []string `config:"hosts" validate:"required"`
IdleTimeout time.Duration `config:"idle_timeout"`
Network string `config:"network"`
MaxConn int `config:"maxconn" validate:"min=1"`
Username string `config:"username"`
Password string `config:"password"`
TLS *tlscommon.Config `config:"ssl"`
tlsConfig *tls.Config
}
52 changes: 34 additions & 18 deletions filebeat/input/redis/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package redis

import (
"time"

rd "github.com/gomodule/redigo/redis"

"github.com/elastic/beats/v7/filebeat/channel"
Expand All @@ -29,6 +27,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)

func init() {
Expand All @@ -51,13 +50,22 @@ type Input struct {
func NewInput(cfg *conf.C, connector channel.Connector, context input.Context) (input.Input, error) {
cfgwarn.Experimental("Redis slowlog input is enabled.")

config := defaultConfig
config := defaultConfig()

err := cfg.Unpack(&config)
if err != nil {
return nil, err
}

if config.TLS.IsEnabled() {
tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS)
if err != nil {
return nil, err
}

config.tlsConfig = tlsConfig.ToConfig()
}

out, err := connector.Connect(cfg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -94,8 +102,7 @@ func (p *Input) Run() {

forwarder := harvester.NewForwarder(p.outlet)
for _, host := range p.config.Hosts {
pool := CreatePool(host, p.config.Password, p.config.Network,
p.config.MaxConn, p.config.IdleTimeout, p.config.IdleTimeout)
pool := CreatePool(host, p.config)

h, err := NewHarvester(pool.Get())
if err != nil {
Expand All @@ -121,28 +128,37 @@ func (p *Input) Wait() {}

// CreatePool creates a redis connection pool
// NOTE: This code is copied from the redis pool handling in metricbeat
func CreatePool(
host, password, network string,
maxConn int,
idleTimeout, connTimeout time.Duration,
) *rd.Pool {
func CreatePool(host string, cfg config) *rd.Pool {
return &rd.Pool{
MaxIdle: maxConn,
IdleTimeout: idleTimeout,
MaxIdle: cfg.MaxConn,
IdleTimeout: cfg.IdleTimeout,
Dial: func() (rd.Conn, error) {
c, err := rd.Dial(network, host,
rd.DialConnectTimeout(connTimeout),
rd.DialReadTimeout(connTimeout),
rd.DialWriteTimeout(connTimeout))
dialOptions := []rd.DialOption{
rd.DialUsername(cfg.Username),
rd.DialConnectTimeout(cfg.IdleTimeout),
rd.DialReadTimeout(cfg.IdleTimeout),
rd.DialWriteTimeout(cfg.IdleTimeout),
}

if cfg.TLS.IsEnabled() && cfg.tlsConfig != nil {
dialOptions = append(dialOptions,
rd.DialUseTLS(true),
rd.DialTLSConfig(cfg.tlsConfig),
)
}

c, err := rd.Dial(cfg.Network, host, dialOptions...)
if err != nil {
return nil, err
}
if password != "" {
if _, err := c.Do("AUTH", password); err != nil {

if cfg.Password != "" {
if _, err := c.Do("AUTH", cfg.Password); err != nil {
c.Close()
return nil, err
}
}

return c, err
},
}
Expand Down
Loading