Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http_listener_v2): allows multiple paths and add path_tag #9529

Merged
merged 7 commits into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions plugins/inputs/http_listener_v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ This is a sample configuration for the plugin.
## Address and port to host HTTP listener on
service_address = ":8080"

## Path to listen to.
# path = "/telegraf"
## Paths to listen to.
# paths = ["/telegraf"]
srebhan marked this conversation as resolved.
Show resolved Hide resolved

## Save path in path_tag
## Do not include path in tag if path_tag is an empty string
# path_tag = ""
srebhan marked this conversation as resolved.
Show resolved Hide resolved

## HTTP methods to accept.
# methods = ["POST", "PUT"]
Expand Down
31 changes: 26 additions & 5 deletions plugins/inputs/http_listener_v2/http_listener_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type TimeFunc func() time.Time
// HTTPListenerV2 is an input plugin that collects external metrics sent via HTTP
type HTTPListenerV2 struct {
ServiceAddress string `toml:"service_address"`
Path string `toml:"path"`
sumo-drosiek marked this conversation as resolved.
Show resolved Hide resolved
Paths []string `toml:"paths"`
PathTag string `toml:"path_tag"`
sumo-drosiek marked this conversation as resolved.
Show resolved Hide resolved
Methods []string `toml:"methods"`
DataSource string `toml:"data_source"`
ReadTimeout config.Duration `toml:"read_timeout"`
Expand All @@ -63,8 +64,12 @@ const sampleConfig = `
## Address and port to host HTTP listener on
service_address = ":8080"

## Path to listen to.
# path = "/telegraf"
## Paths to listen to.
# paths = ["/telegraf"]

## Save path in path_tag
## Do not include path in tag if path_tag is an empty string
# path_tag = ""
sumo-drosiek marked this conversation as resolved.
Show resolved Hide resolved

## HTTP methods to accept.
# methods = ["POST", "PUT"]
Expand Down Expand Up @@ -136,6 +141,8 @@ func (h *HTTPListenerV2) Start(acc telegraf.Accumulator) error {
h.WriteTimeout = config.Duration(time.Second * 10)
}

h.PathTag = strings.TrimSpace(h.PathTag)

h.acc = acc

tlsConf, err := h.ServerConfig.TLSConfig()
Expand Down Expand Up @@ -186,10 +193,20 @@ func (h *HTTPListenerV2) Stop() {
h.wg.Wait()
}

func (h *HTTPListenerV2) containsPath(needle string) bool {
for _, v := range h.Paths {
if v == needle {
return true
}
}

return false
}

sumo-drosiek marked this conversation as resolved.
Show resolved Hide resolved
func (h *HTTPListenerV2) ServeHTTP(res http.ResponseWriter, req *http.Request) {
handler := h.serveWrite

if req.URL.Path != h.Path {
if !h.containsPath(req.URL.Path) {
sumo-drosiek marked this conversation as resolved.
Show resolved Hide resolved
handler = http.NotFound
}

Expand Down Expand Up @@ -251,6 +268,10 @@ func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request)
}
}

if h.PathTag != "" {
m.AddTag(h.PathTag, req.URL.Path)
}
sumo-drosiek marked this conversation as resolved.
Show resolved Hide resolved

h.acc.AddMetric(m)
}

Expand Down Expand Up @@ -370,7 +391,7 @@ func init() {
return &HTTPListenerV2{
ServiceAddress: ":8080",
TimeFunc: time.Now,
Path: "/telegraf",
Paths: []string{"/telegraf"},
Methods: []string{"POST", "PUT"},
DataSource: body,
}
Expand Down
54 changes: 49 additions & 5 deletions plugins/inputs/http_listener_v2/http_listener_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func newTestHTTPListenerV2() *HTTPListenerV2 {
listener := &HTTPListenerV2{
Log: testutil.Logger{},
ServiceAddress: "localhost:0",
Path: "/write",
Paths: []string{"/write"},
sumo-drosiek marked this conversation as resolved.
Show resolved Hide resolved
Methods: []string{"POST"},
Parser: parser,
TimeFunc: time.Now,
Expand All @@ -72,7 +72,7 @@ func newTestHTTPSListenerV2() *HTTPListenerV2 {
listener := &HTTPListenerV2{
Log: testutil.Logger{},
ServiceAddress: "localhost:0",
Path: "/write",
Paths: []string{"/write"},
Methods: []string{"POST"},
Parser: parser,
ServerConfig: *pki.TLSServerConfig(),
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestInvalidListenerConfig(t *testing.T) {
listener := &HTTPListenerV2{
Log: testutil.Logger{},
ServiceAddress: "address_without_port",
Path: "/write",
Paths: []string{"/write"},
Methods: []string{"POST"},
Parser: parser,
TimeFunc: time.Now,
Expand Down Expand Up @@ -230,6 +230,50 @@ func TestWriteHTTP(t *testing.T) {
)
}

// http listener should add request path as configured path_tag
func TestWriteHTTPWithPathTag(t *testing.T) {
listener := newTestHTTPListenerV2()
listener.PathTag = "path"

acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()

// post single message to listener
resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgNoNewline)))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)

acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01", "path": "/write"},
)
}

// http listener should add request path as configured path_tag (trimming it before)
func TestWriteHTTPWithWhiteSpacesPathTag(t *testing.T) {
listener := newTestHTTPListenerV2()
listener.PathTag = " path "

acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()

// post single message to listener
resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgNoNewline)))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)

acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01", "path": "/write"},
)
}

// http listener should add a newline at the end of the buffer if it's not there
func TestWriteHTTPNoNewline(t *testing.T) {
listener := newTestHTTPListenerV2()
Expand Down Expand Up @@ -257,7 +301,7 @@ func TestWriteHTTPExactMaxBodySize(t *testing.T) {
listener := &HTTPListenerV2{
Log: testutil.Logger{},
ServiceAddress: "localhost:0",
Path: "/write",
Paths: []string{"/write"},
Methods: []string{"POST"},
Parser: parser,
MaxBodySize: config.Size(len(hugeMetric)),
Expand All @@ -280,7 +324,7 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
listener := &HTTPListenerV2{
Log: testutil.Logger{},
ServiceAddress: "localhost:0",
Path: "/write",
Paths: []string{"/write"},
Methods: []string{"POST"},
Parser: parser,
MaxBodySize: config.Size(4096),
Expand Down