Skip to content

Commit

Permalink
[7.5] [Metricbeat] Convert millis-since-epoch timestamps in `elastics…
Browse files Browse the repository at this point in the history
…earch/ml_job` metricset to ints (#14222) (#14293)

* [Metricbeat] Convert millis-since-epoch timestamps in `elasticsearch/ml_job` metricset to ints (#14222)

* Convert timestamp field to int from float64

* Reformatting comment to be less wide

* Fixing up comment

* Adding unit tests

* Adding CHANGELOG entry

* Fixing up CHANGELOG
  • Loading branch information
ycombinator authored Oct 29, 2019
1 parent 41c2b09 commit 3a31511
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Metricbeat*

- Convert indexed ms-since-epoch timestamp fields in `elasticsearch/ml_job` metricset to ints from float64s. {issue}14220[14220] {pull}14222[14222]

*Packetbeat*

Expand Down
21 changes: 21 additions & 0 deletions metricbeat/helper/elastic/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,24 @@ func ReportAndLogError(err error, r mb.ReporterV2, l *logp.Logger) {
r.Error(err)
l.Error(err)
}

// FixTimestampField converts the given timestamp field in the given map from a float64 to an
// int, so that it is not serialized in scientific notation in the event. This is because
// Elasticsearch cannot accepts scientific notation to represent millis-since-epoch values
// for it's date fields: https://github.com/elastic/elasticsearch/pull/36691
func FixTimestampField(m common.MapStr, field string) error {
v, err := m.GetValue(field)
if err == common.ErrKeyNotFound {
return nil
}
if err != nil {
return err
}

switch vv := v.(type) {
case float64:
_, err := m.Put(field, int(vv))
return err
}
return nil
}
53 changes: 53 additions & 0 deletions metricbeat/helper/elastic/elastic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,56 @@ func TestReportErrorForMissingField(t *testing.T) {
assert.Equal(t, expectedError, err)
assert.Equal(t, expectedError, currentErr)
}

func TestFixTimestampField(t *testing.T) {
tests := []struct {
Name string
OriginalValue map[string]interface{}
ExpectedValue map[string]interface{}
}{
{
"converts float64s in scientific notation to ints",
map[string]interface{}{
"foo": 1.571284349E12,
},
map[string]interface{}{
"foo": 1571284349000,
},
},
{
"converts regular notation float64s to ints",
map[string]interface{}{
"foo": float64(1234),
},
map[string]interface{}{
"foo": 1234,
},
},
{
"ignores missing fields",
map[string]interface{}{
"bar": 12345,
},
map[string]interface{}{
"bar": 12345,
},
},
{
"leaves strings untouched",
map[string]interface{}{
"foo": "bar",
},
map[string]interface{}{
"foo": "bar",
},
},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
err := FixTimestampField(test.OriginalValue, "foo")
assert.NoError(t, err)
assert.Equal(t, test.ExpectedValue, test.OriginalValue)
})
}
}
13 changes: 11 additions & 2 deletions metricbeat/module/elasticsearch/ml_job/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,22 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info,
}

var errs multierror.Errors
for _, job := range jobsArr {
job, ok = job.(map[string]interface{})
for _, j := range jobsArr {
job, ok := j.(map[string]interface{})
if !ok {
errs = append(errs, fmt.Errorf("job is not a map"))
continue
}

if err := elastic.FixTimestampField(job, "data_counts.earliest_record_timestamp"); err != nil {
errs = append(errs, err)
continue
}
if err := elastic.FixTimestampField(job, "data_counts.latest_record_timestamp"); err != nil {
errs = append(errs, err)
continue
}

event := mb.Event{}
event.RootFields = common.MapStr{
"cluster_uuid": info.ClusterID,
Expand Down

0 comments on commit 3a31511

Please sign in to comment.