Skip to content

Commit

Permalink
feat: escape tag strings (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
karel-rehor authored Sep 25, 2024
1 parent ea6f9d2 commit 68a0862
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## 0.11.0 [unreleased]

1. [#105](https://github.com/InfluxCommunity/influxdb3-go/pull/105): Support newlines in tag values.

## 0.10.0 [2024-09-13]

### Features
Expand Down
53 changes: 53 additions & 0 deletions influxdb3/client_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,21 @@ import (
"github.com/stretchr/testify/require"
)

func SkipCheck(t *testing.T) {

if _, present := os.LookupEnv("TESTING_INFLUXDB_URL"); !present {
t.Skip("TESTING_INFLUXDB_URL not set")
}
if _, present := os.LookupEnv("TESTING_INFLUXDB_TOKEN"); !present {
t.Skip("TESTING_INFLUXDB_TOKEN not set")
}
if _, present := os.LookupEnv("TESTING_INFLUXDB_DATABASE"); !present {
t.Skip("TESTING_INFLUXDB_DATABASE not set")
}
}

func TestWriteAndQueryExample(t *testing.T) {
SkipCheck(t)
now := time.Now().UTC()
testId := now.UnixNano()

Expand Down Expand Up @@ -153,6 +167,7 @@ func TestWriteAndQueryExample(t *testing.T) {
}

func TestQueryWithParameters(t *testing.T) {
SkipCheck(t)
now := time.Now().UTC()
testId := now.UnixNano()

Expand Down Expand Up @@ -221,6 +236,7 @@ func TestQueryWithParameters(t *testing.T) {
}

func TestQueryDatabaseDoesNotExist(t *testing.T) {
SkipCheck(t)
url := os.Getenv("TESTING_INFLUXDB_URL")
token := os.Getenv("TESTING_INFLUXDB_TOKEN")

Expand All @@ -237,6 +253,7 @@ func TestQueryDatabaseDoesNotExist(t *testing.T) {
}

func TestQuerySchema(t *testing.T) {
SkipCheck(t)
url := os.Getenv("TESTING_INFLUXDB_URL")
token := os.Getenv("TESTING_INFLUXDB_TOKEN")
database := os.Getenv("TESTING_INFLUXDB_DATABASE")
Expand All @@ -253,6 +270,7 @@ func TestQuerySchema(t *testing.T) {
}

func TestQuerySchemaWithOptions(t *testing.T) {
SkipCheck(t)
url := os.Getenv("TESTING_INFLUXDB_URL")
token := os.Getenv("TESTING_INFLUXDB_TOKEN")
database := os.Getenv("TESTING_INFLUXDB_DATABASE")
Expand All @@ -269,6 +287,7 @@ func TestQuerySchemaWithOptions(t *testing.T) {
}

func TestQuerySchemaInfluxQL(t *testing.T) {
SkipCheck(t)
url := os.Getenv("TESTING_INFLUXDB_URL")
token := os.Getenv("TESTING_INFLUXDB_TOKEN")
database := os.Getenv("TESTING_INFLUXDB_DATABASE")
Expand All @@ -285,6 +304,7 @@ func TestQuerySchemaInfluxQL(t *testing.T) {
}

func TestWriteError(t *testing.T) {
SkipCheck(t)
url := os.Getenv("TESTING_INFLUXDB_URL")
token := os.Getenv("TESTING_INFLUXDB_TOKEN")
database := os.Getenv("TESTING_INFLUXDB_DATABASE")
Expand All @@ -308,3 +328,36 @@ func TestWriteError(t *testing.T) {
assert.NotNil(t, err.(*influxdb3.ServerError).Headers["X-Influxdb-Build"][0])

}

func TestEscapedStringValues(t *testing.T) {
SkipCheck(t)
url := os.Getenv("TESTING_INFLUXDB_URL")
token := os.Getenv("TESTING_INFLUXDB_TOKEN")
database := os.Getenv("TESTING_INFLUXDB_DATABASE")

client, err := influxdb3.New(influxdb3.ClientConfig{
Host: url,
Token: token,
Database: database,
})
require.NoError(t, err)
p := influxdb3.NewPoint("escapee",
map[string]string{
"tag1": "new\nline and space",
"tag2": "escaped\\nline and space",
},
map[string]interface{}{
"fVal": 41.3,
"sVal": "greetings\nearthlings",
}, time.Now())

err = client.WritePoints(context.Background(), []*influxdb3.Point{p})
require.NoError(t, err)
qit, err := client.Query(context.Background(), "SELECT * FROM \"escapee\" WHERE time >= now() - interval '1 minute'")
require.NoError(t, err)
for qit.Next() {
assert.EqualValues(t, "greetings\\nearthlings", qit.Value()["sVal"])
assert.EqualValues(t, "new\\nline and space", qit.Value()["tag1"])
assert.EqualValues(t, "escaped\\nline and space", qit.Value()["tag2"])
}
}
8 changes: 6 additions & 2 deletions influxdb3/point.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"errors"
"fmt"
"sort"
"strings"
"time"

"github.com/influxdata/line-protocol/v2/lineprotocol"
Expand Down Expand Up @@ -268,10 +269,13 @@ func (p *Point) MarshalBinaryWithDefaultTags(precision lineprotocol.Precision, d
}
lastKey = tagKey

// N.B. Some customers have requested support for newline chars in tag values (EAR 5476)
// Though this is outside the lineprotocol specification, it was supported in
// previous GO client versions.
if value, ok := p.Values.Tags[tagKey]; ok {
enc.AddTag(tagKey, value)
enc.AddTag(tagKey, strings.ReplaceAll(value, "\n", "\\n"))
} else {
enc.AddTag(tagKey, defaultTags[tagKey])
enc.AddTag(tagKey, strings.ReplaceAll(defaultTags[tagKey], "\n", "\\n"))
}
}

Expand Down
43 changes: 43 additions & 0 deletions influxdb3/point_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,49 @@ func TestPointDefaultTags(t *testing.T) {
assert.EqualValues(t, `test,tag1=a,tag2=b,tag3=f float64=80.1234567 60000000070`+"\n", string(line))
}

func TestPointWithNewlineTags(t *testing.T) {
p := NewPoint("test",
map[string]string{
"tag1": "new\nline and space",
"tag2": "escaped\\nline and space",
"ambiTag": "ambiguous\ntag",
},
map[string]interface{}{
"fVal": 41.3,
}, time.Unix(60, 70))

defaultTags := map[string]string{
"defTag1": "default\nline and space",
"defTag2": "escaped\\ndefault line and space",
"ambiTag": "default\nambiguous\ntag",
}

line, err := p.MarshalBinary(lineprotocol.Nanosecond)
require.NoError(t, err)
assert.EqualValues(t,
"test,ambiTag=ambiguous\\ntag,tag1=new\\nline\\ and\\ space,tag2=escaped\\nline\\ and\\ space "+
"fVal=41.3 60000000070\n",
string(line))

line, err = p.MarshalBinaryWithDefaultTags(lineprotocol.Nanosecond, defaultTags)
require.NoError(t, err)
assert.EqualValues(t,
"test,ambiTag=ambiguous\\ntag,defTag1=default\\nline\\ and\\ space,defTag2=escaped"+
"\\ndefault\\ line\\ and\\ space,tag1=new\\nline\\ and\\ space,tag2=escaped\\nline\\ and\\ space "+
"fVal=41.3 60000000070\n",
string(line))

pInvalid := NewPoint("test", map[string]string{
"tag\nbroken": "tag\nvalue with space",
}, map[string]interface{}{
"fVal": 17.2,
}, time.Unix(60, 70))

_, err = pInvalid.MarshalBinary(lineprotocol.Nanosecond)
require.Error(t, err)
assert.EqualValues(t, "encoding error: invalid tag key \"tag\\nbroken\"", err.Error())
}

func TestPointFields(t *testing.T) {
p := NewPoint("test", nil, map[string]interface{}{
"field1": 10,
Expand Down

0 comments on commit 68a0862

Please sign in to comment.