Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix issue when response contains ANSI escape sequences #38

Merged
merged 6 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## tip

* BUGFIX: fix bug with parsing response when one of the field contains ANSI escape sequences. See [this issue](https://github.com/VictoriaMetrics/victorialogs-datasource/issues/24).

## v0.2.3

* BUGFIX: fix bug with displaying response when one of the stream field is defined and lines are not collected. See [this issue](https://github.com/VictoriaMetrics/victorialogs-datasource/issues/34).
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/unknwon/com v1.0.1 // indirect
github.com/unknwon/log v0.0.0-20150304194804-e617c87089d3 // indirect
github.com/urfave/cli v1.22.14 // indirect
github.com/valyala/fastjson v1.6.4 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ github.com/unknwon/log v0.0.0-20150304194804-e617c87089d3/go.mod h1:1xEUf2abjfP9
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk=
github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA=
github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ=
github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
Expand Down
77 changes: 45 additions & 32 deletions pkg/plugin/response.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package plugin

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
Expand All @@ -9,6 +11,7 @@ import (
"github.com/VictoriaMetrics/metricsql"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/valyala/fastjson"

"github.com/VictoriaMetrics/victorialogs-datasource/pkg/utils"
)
Expand All @@ -25,10 +28,6 @@ const (
gLineField = "Line"
)

// Response contains fields from query response
// It represents victoria logs response
type Response map[string]string

// parseStreamResponse reads data from the reader and collects
// fields and frame with necessary information
func parseStreamResponse(reader io.Reader) backend.DataResponse {
Expand All @@ -44,42 +43,56 @@ func parseStreamResponse(reader io.Reader) backend.DataResponse {

labels := data.Labels{}

dec := json.NewDecoder(reader)
scanner := bufio.NewScanner(reader)

for dec.More() {
var r Response
err := dec.Decode(&r)
for scanner.Scan() {
value, err := fastjson.ParseBytes(scanner.Bytes())
if err != nil {
return newResponseError(fmt.Errorf("error decode response: %s", err), backend.StatusInternal)
}

for fieldName, value := range r {
switch fieldName {
case messageField:
lineField.Append(value)
case timeField:
getTime, err := utils.GetTime(value)
if err != nil {
return newResponseError(fmt.Errorf("error parse time from _time field: %s", err), backend.StatusInternal)
}
timeFd.Append(getTime)
case streamField:
expr, err := metricsql.Parse(value)
if err != nil {
return newResponseError(err, backend.StatusInternal)
}
if mExpr, ok := expr.(*metricsql.MetricExpr); ok {
for _, filters := range mExpr.LabelFilterss {
for _, filter := range filters {
labels[filter.Label] = filter.Value
}
if value.Exists(messageField) {
message := value.GetStringBytes(messageField)
lineField.Append(string(message))
}
if value.Exists(timeField) {
t := value.GetStringBytes(timeField)
getTime, err := utils.GetTime(string(t))
if err != nil {
return newResponseError(fmt.Errorf("error parse time from _time field: %s", err), backend.StatusInternal)
}
timeFd.Append(getTime)
}
if value.Exists(streamField) {
stream := value.GetStringBytes(streamField)
expr, err := metricsql.Parse(string(stream))
if err != nil {
return newResponseError(err, backend.StatusInternal)
}
if mExpr, ok := expr.(*metricsql.MetricExpr); ok {
for _, filters := range mExpr.LabelFilterss {
for _, filter := range filters {
labels[filter.Label] = filter.Value
}
}
default:
labels[fieldName] = value
}
}

obj, err := value.Object()
if err != nil {
return newResponseError(fmt.Errorf("error get object from decoded response: %s", err), backend.StatusInternal)
}
obj.Visit(func(key []byte, v *fastjson.Value) {
if bytes.Equal(key, []byte(timeField)) ||
bytes.Equal(key, []byte(streamField)) ||
bytes.Equal(key, []byte(messageField)) {
return
}
fieldName := string(key)
value := string(v.GetStringBytes())
labels[fieldName] = value
})

d, err := labelsToJSON(labels)
if err != nil {
return newResponseError(err, backend.StatusInternal)
Expand Down Expand Up @@ -120,10 +133,10 @@ func parseStreamResponse(reader io.Reader) backend.DataResponse {
// labelsToJSON converts labels to json representation
// data.Labels when converted to JSON keep the fields sorted
func labelsToJSON(labels data.Labels) (json.RawMessage, error) {
bytes, err := json.Marshal(labels)
b, err := json.Marshal(labels)
if err != nil {
return nil, err
}

return bytes, nil
return b, nil
}
80 changes: 79 additions & 1 deletion pkg/plugin/response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/valyala/fastjson"
)

func Test_parseStreamResponse(t *testing.T) {
Expand Down Expand Up @@ -44,7 +45,7 @@ func Test_parseStreamResponse(t *testing.T) {
name: "incorrect response",
response: "abcd",
want: func() backend.DataResponse {
return newResponseError(fmt.Errorf("error decode response: invalid character 'a' looking for beginning of value"), backend.StatusInternal)
return newResponseError(fmt.Errorf("error decode response: cannot parse JSON: cannot parse number: unexpected char: \"a\"; unparsed tail: \"abcd\""), backend.StatusInternal)
},
},
{
Expand Down Expand Up @@ -241,6 +242,83 @@ func Test_parseStreamResponse(t *testing.T) {
frame.Meta = &data.FrameMeta{}
rsp.Frames = append(rsp.Frames, frame)

return rsp
},
},
{
name: "response has ANSI chars",
response: `{"_time":"2024-06-26T13:15:15.000Z","_stream_id":"00000000000000009eaf29866f70976a098adc735393deb1","_stream":"{compose_project=\"app\",compose_service=\"gateway\"}","_msg":"\x1b[2m2024-06-26T13:15:15.004Z\x1b[0;39m \x1b[32mTRACE\x1b[0;39m \x1b[35m1\x1b[0;39m \x1b[2m---\x1b[0;39m \x1b[2m[ parallel-19]\x1b[0;39m \x1b[36mo.s.c.g.f.WeightCalculatorWebFilter \x1b[0;39m \x1b[2m:\x1b[0;39m Weights attr: {} ","compose_project":"app","compose_service":"gateway"}`,
want: func() backend.DataResponse {
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
labelsField.Name = gLabelsField

timeFd := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeFd.Name = gTimeField

lineField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
lineField.Name = gLineField

timeFd.Append(time.Date(2024, 06, 26, 13, 15, 15, 0, time.UTC))

lineField.Append(`\x1b[2m2024-06-26T13:15:15.004Z\x1b[0;39m \x1b[32mTRACE\x1b[0;39m \x1b[35m1\x1b[0;39m \x1b[2m---\x1b[0;39m \x1b[2m[ parallel-19]\x1b[0;39m \x1b[36mo.s.c.g.f.WeightCalculatorWebFilter \x1b[0;39m \x1b[2m:\x1b[0;39m Weights attr: {} `)

labels := data.Labels{
"compose_project": "app",
"compose_service": "gateway",
"_stream_id": "00000000000000009eaf29866f70976a098adc735393deb1",
}

b, _ := labelsToJSON(labels)
labelsField.Append(b)

frame := data.NewFrame("", timeFd, lineField, labelsField)

rsp := backend.DataResponse{}
frame.Meta = &data.FrameMeta{}
rsp.Frames = append(rsp.Frames, frame)

return rsp
},
},
{
name: "response has unicode",
response: `{"_time":"2024-06-26T13:20:34.000Z","_stream":"{compose_project=\"app\",compose_service=\"gateway\"}","_msg":"\u001b[2m2024-06-26T13:20:34.608Z\u001b[0;39m \u001b[33m WARN\u001b[0;39m \u001b[35m1\u001b[0;39m \u001b[2m---\u001b[0;39m \u001b[2m[ main]\u001b[0;39m \u001b[36mjakarta.persistence.spi \u001b[0;39m \u001b[2m:\u001b[0;39m jakarta.persistence.spi::No valid providers found. ","compose_project":"app","compose_service":"gateway"}`,
want: func() backend.DataResponse {
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
labelsField.Name = gLabelsField

timeFd := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeFd.Name = gTimeField

lineField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
lineField.Name = gLineField

timeFd.Append(time.Date(2024, 06, 26, 13, 20, 34, 0, time.UTC))

value, err := fastjson.Parse(`{"_msg":"\u001b[2m2024-06-26T13:20:34.608Z\u001b[0;39m \u001b[33m WARN\u001b[0;39m \u001b[35m1\u001b[0;39m \u001b[2m---\u001b[0;39m \u001b[2m[ main]\u001b[0;39m \u001b[36mjakarta.persistence.spi \u001b[0;39m \u001b[2m:\u001b[0;39m jakarta.persistence.spi::No valid providers found. "}`)
if err != nil {
t.Fatalf("error decode response: %s", err)
}

if value.Exists(messageField) {
message := value.GetStringBytes(messageField)
lineField.Append(string(message))
}

labels := data.Labels{
"compose_project": "app",
"compose_service": "gateway",
}

b, _ := labelsToJSON(labels)
labelsField.Append(b)

frame := data.NewFrame("", timeFd, lineField, labelsField)

rsp := backend.DataResponse{}
frame.Meta = &data.FrameMeta{}
rsp.Frames = append(rsp.Frames, frame)

return rsp
},
},
Expand Down
Loading