Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop time when used as a tag or field key #7132

Merged
merged 1 commit into from
Aug 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## v1.1.0 [unreleased]

### Bugfixes

- [#1834](https://github.com/influxdata/influxdb/issues/1834): Drop time when used as a tag or field key.

## v1.0.0 [unreleased]

### Breaking changes
Expand Down
7 changes: 7 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# TODO

## v2

TODO list for v2. Here is a list of things we want to add to v1, but can't because they would be a breaking change.

- [#1834](https://github.com/influxdata/influxdb/issues/1834): Disallow using time as a tag key or field key.
24 changes: 21 additions & 3 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,24 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate,

// get the shard mutex for locally defined fields
for _, p := range points {
// verify the tags and fields
tags := p.Tags()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the right place to reject this as it will fail the whole batch instead of the point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would also have that issue with the max series error. We may want to look into that again if we have the time.

if _, ok := tags["time"]; ok {
s.logger.Printf("dropping tag 'time' from '%s'\n", p.PrecisionString(""))
delete(tags, "time")
p.SetTags(tags)
}

fields := p.Fields()
if _, ok := fields["time"]; ok {
s.logger.Printf("dropping field 'time' from '%s'\n", p.PrecisionString(""))
delete(fields, "time")

if len(fields) == 0 {
continue
}
}

// see if the series should be added to the index
key := string(p.Key())
ss := s.index.Series(key)
Expand All @@ -455,7 +473,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate,
return nil, fmt.Errorf("max series per database exceeded: %s", key)
}

ss = NewSeries(key, p.Tags())
ss = NewSeries(key, tags)
atomic.AddInt64(&s.stats.SeriesCreated, 1)
}

Expand All @@ -466,14 +484,14 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate,
mf := s.engine.MeasurementFields(p.Name())

if mf == nil {
for name, value := range p.Fields() {
for name, value := range fields {
fieldsToCreate = append(fieldsToCreate, &FieldCreate{p.Name(), &Field{Name: name, Type: influxql.InspectDataType(value)}})
}
continue // skip validation since all fields are new
}

// validate field types and encode data
for name, value := range p.Fields() {
for name, value := range fields {
if f := mf.Field(name); f != nil {
// Field present in shard metadata, make sure there is no type conflict.
if f.Type != influxql.InspectDataType(value) {
Expand Down
104 changes: 103 additions & 1 deletion tsdb/shard_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tsdb_test

import (
"bytes"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -144,12 +145,113 @@ func TestMaxSeriesLimit(t *testing.T) {
if err == nil {
t.Fatal("expected error")
} else if err.Error() != "max series per database exceeded: cpu,host=server9999" {
t.Fatalf("unexpected error messag:\n\texp = max series per database exceeded: cpu,host=server9999\n\tgot = %s", err.Error())
t.Fatalf("unexpected error message:\n\texp = max series per database exceeded: cpu,host=server9999\n\tgot = %s", err.Error())
}

sh.Close()
}

func TestWriteTimeTag(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")

index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")

sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
defer sh.Close()

pt := models.MustNewPoint(
"cpu",
map[string]string{},
map[string]interface{}{"time": 1.0},
time.Unix(1, 2),
)

buf := bytes.NewBuffer(nil)
sh.SetLogOutput(buf)
if err := sh.WritePoints([]models.Point{pt}); err != nil {
t.Fatalf("unexpected error: %v", err)
} else if got, exp := buf.String(), "dropping field 'time'"; !strings.Contains(got, exp) {
t.Fatalf("unexpected log message: %s", strings.TrimSpace(got))
}

m := index.Measurement("cpu")
if m != nil {
t.Fatal("unexpected cpu measurement")
}

pt = models.MustNewPoint(
"cpu",
map[string]string{},
map[string]interface{}{"value": 1.0, "time": 1.0},
time.Unix(1, 2),
)

buf = bytes.NewBuffer(nil)
sh.SetLogOutput(buf)
if err := sh.WritePoints([]models.Point{pt}); err != nil {
t.Fatalf("unexpected error: %v", err)
} else if got, exp := buf.String(), "dropping field 'time'"; !strings.Contains(got, exp) {
t.Fatalf("unexpected log message: %s", strings.TrimSpace(got))
}

m = index.Measurement("cpu")
if m == nil {
t.Fatal("expected cpu measurement")
}

if got, exp := len(m.FieldNames()), 1; got != exp {
t.Fatalf("invalid number of field names: got=%v exp=%v", got, exp)
}
}

func TestWriteTimeField(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")

index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")

sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
defer sh.Close()

pt := models.MustNewPoint(
"cpu",
map[string]string{"time": "now"},
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)

buf := bytes.NewBuffer(nil)
sh.SetLogOutput(buf)
if err := sh.WritePoints([]models.Point{pt}); err != nil {
t.Fatalf("unexpected error: %v", err)
} else if got, exp := buf.String(), "dropping tag 'time'"; !strings.Contains(got, exp) {
t.Fatalf("unexpected log message: %s", strings.TrimSpace(got))
}

key := models.MakeKey([]byte("cpu"), nil)
series := index.Series(string(key))
if series == nil {
t.Fatal("expected series")
} else if len(series.Tags) != 0 {
t.Fatalf("unexpected number of tags: got=%v exp=%v", len(series.Tags), 0)
}
}

func TestShardWriteAddNewField(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
Expand Down