Skip to content

Commit

Permalink
improve efficiency of getting harvest data by avoiding memAlloc
Browse files Browse the repository at this point in the history
  • Loading branch information
iamemilio committed Jun 14, 2022
1 parent 8bd5634 commit ea253d7
Show file tree
Hide file tree
Showing 23 changed files with 431 additions and 174 deletions.
12 changes: 4 additions & 8 deletions v3/newrelic/analytics_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,11 @@ func (events *analyticsEvents) Merge(other *analyticsEvents) {
events.numSeen = allSeen
}

func (events *analyticsEvents) CollectorJSON(agentRunID string) ([]byte, error) {
if 0 == len(events.events) {
return nil, nil
func (events *analyticsEvents) CollectorJSON(buf *bytes.Buffer, agentRunID string) error {
if buf == nil || events.NumSaved() == 0 {
return nil
}

estimate := 256 * len(events.events)
buf := bytes.NewBuffer(make([]byte, 0, estimate))

buf.WriteByte('[')
jsonx.AppendString(buf, agentRunID)
buf.WriteByte(',')
Expand All @@ -120,8 +117,7 @@ func (events *analyticsEvents) CollectorJSON(agentRunID string) ([]byte, error)
buf.WriteByte(']')
buf.WriteByte(']')

return buf.Bytes(), nil

return nil
}

// split splits the events into two. NOTE! The two event pools are not valid
Expand Down
88 changes: 68 additions & 20 deletions v3/newrelic/analytics_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ func TestBasic(t *testing.T) {
events.addEvent(sampleAnalyticsEvent(0.5))
events.addEvent(sampleAnalyticsEvent(0.5))

json, err := events.CollectorJSON(agentRunID)
buf := &bytes.Buffer{}
err := events.CollectorJSON(buf, agentRunID)
if nil != err {
t.Fatal(err)
}

json := buf.Bytes()

expected := `["12345",{"reservoir_size":10,"events_seen":3},[0.5,0.5,0.5]]`

if string(json) != expected {
Expand All @@ -55,17 +58,19 @@ func TestBasic(t *testing.T) {

func TestEmpty(t *testing.T) {
events := newAnalyticsEvents(10)
json, err := events.CollectorJSON(agentRunID)
var buf *bytes.Buffer
err := events.CollectorJSON(buf, agentRunID)
if nil != err {
t.Fatal(err)
}
if nil != json {
t.Error(string(json))

if buf != nil {
t.Error(string(buf.Bytes()))
}
if 0 != events.numSeen {
if events.numSeen != 0 {
t.Error(events.numSeen)
}
if 0 != events.NumSaved() {
if events.NumSaved() != 0 {
t.Error(events.NumSaved())
}
}
Expand All @@ -79,10 +84,13 @@ func TestSampling(t *testing.T) {
events.addEvent(sampleAnalyticsEvent(0.8))
events.addEvent(sampleAnalyticsEvent(0.3))

json, err := events.CollectorJSON(agentRunID)
buf := &bytes.Buffer{}
err := events.CollectorJSON(buf, agentRunID)
if nil != err {
t.Fatal(err)
}

json := buf.Bytes()
if string(json) != `["12345",{"reservoir_size":3,"events_seen":6},[0.8,0.999999,0.9]]` {
t.Error(string(json))
}
Expand All @@ -98,10 +106,14 @@ func TestMergeEmpty(t *testing.T) {
e1 := newAnalyticsEvents(10)
e2 := newAnalyticsEvents(10)
e1.Merge(e2)
json, err := e1.CollectorJSON(agentRunID)
buf := &bytes.Buffer{}
err := e1.CollectorJSON(buf, agentRunID)
if nil != err {
t.Fatal(err)
}

json := buf.Bytes()

if nil != json {
t.Error(string(json))
}
Expand All @@ -127,10 +139,13 @@ func TestMergeFull(t *testing.T) {
e2.addEvent(sampleAnalyticsEvent(0.24))

e1.Merge(e2)
json, err := e1.CollectorJSON(agentRunID)
buf := &bytes.Buffer{}
err := e1.CollectorJSON(buf, agentRunID)
if nil != err {
t.Fatal(err)
}

json := buf.Bytes()
if string(json) != `["12345",{"reservoir_size":2,"events_seen":7},[0.24,0.25]]` {
t.Error(string(json))
}
Expand All @@ -157,10 +172,13 @@ func TestAnalyticsEventMergeFailedSuccess(t *testing.T) {

e1.mergeFailed(e2)

json, err := e1.CollectorJSON(agentRunID)
buf := &bytes.Buffer{}
err := e1.CollectorJSON(buf, agentRunID)
if nil != err {
t.Fatal(err)
}

json := buf.Bytes()
if string(json) != `["12345",{"reservoir_size":2,"events_seen":7},[0.24,0.25]]` {
t.Error(string(json))
}
Expand Down Expand Up @@ -192,10 +210,13 @@ func TestAnalyticsEventMergeFailedLimitReached(t *testing.T) {

e1.mergeFailed(e2)

json, err := e1.CollectorJSON(agentRunID)
buf := &bytes.Buffer{}
err := e1.CollectorJSON(buf, agentRunID)
if nil != err {
t.Fatal(err)
}

json := buf.Bytes()
if string(json) != `["12345",{"reservoir_size":2,"events_seen":3},[0.15,0.25]]` {
t.Error(string(json))
}
Expand All @@ -221,9 +242,10 @@ func analyticsEventBenchmarkHelper(b *testing.B, w jsonWriter) {
b.ResetTimer()

for n := 0; n < b.N; n++ {
js, err := events.CollectorJSON(agentRunID)
buf := &bytes.Buffer{}
err := events.CollectorJSON(buf, agentRunID)
if nil != err {
b.Fatal(err, js)
b.Fatal(err)
}
}
}
Expand Down Expand Up @@ -279,8 +301,14 @@ func TestSplitFull(t *testing.T) {
t.Error(events.capacity())
}
e1, e2 := events.split()
j1, err1 := e1.CollectorJSON(agentRunID)
j2, err2 := e2.CollectorJSON(agentRunID)
buf := &bytes.Buffer{}
err1 := e1.CollectorJSON(buf, agentRunID)
j1 := buf.Bytes()

buf = &bytes.Buffer{}
err2 := e2.CollectorJSON(buf, agentRunID)
j2 := buf.Bytes()

if err1 != nil || err2 != nil {
t.Fatal(err1, err2)
}
Expand All @@ -298,8 +326,14 @@ func TestSplitNotFullOdd(t *testing.T) {
events.addEvent(sampleAnalyticsEvent(priority(float32(i) / 10.0)))
}
e1, e2 := events.split()
j1, err1 := e1.CollectorJSON(agentRunID)
j2, err2 := e2.CollectorJSON(agentRunID)
buf := &bytes.Buffer{}
err1 := e1.CollectorJSON(buf, agentRunID)
j1 := buf.Bytes()

buf = &bytes.Buffer{}
err2 := e2.CollectorJSON(buf, agentRunID)
j2 := buf.Bytes()

if err1 != nil || err2 != nil {
t.Fatal(err1, err2)
}
Expand All @@ -317,8 +351,14 @@ func TestSplitNotFullEven(t *testing.T) {
events.addEvent(sampleAnalyticsEvent(priority(float32(i) / 10.0)))
}
e1, e2 := events.split()
j1, err1 := e1.CollectorJSON(agentRunID)
j2, err2 := e2.CollectorJSON(agentRunID)
buf := &bytes.Buffer{}
err1 := e1.CollectorJSON(buf, agentRunID)
j1 := buf.Bytes()

buf = &bytes.Buffer{}
err2 := e2.CollectorJSON(buf, agentRunID)
j2 := buf.Bytes()

if err1 != nil || err2 != nil {
t.Fatal(err1, err2)
}
Expand All @@ -341,7 +381,15 @@ func TestAnalyticsEventsZeroCapacity(t *testing.T) {
if 1 != events.NumSeen() || 0 != events.NumSaved() || 0 != events.capacity() {
t.Error(events.NumSeen(), events.NumSaved(), events.capacity())
}
js, err := events.CollectorJSON("agentRunID")

data := &bytes.Buffer{}
err := events.CollectorJSON(data, agentRunID)

var js []byte
if data != nil {
js = data.Bytes()
}

if err != nil || js != nil {
t.Error(err, string(js))
}
Expand Down
18 changes: 15 additions & 3 deletions v3/newrelic/custom_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

package newrelic

import "time"
import (
"bytes"
"time"
)

type customEvents struct {
*analyticsEvents
Expand All @@ -27,8 +30,17 @@ func (cs *customEvents) MergeIntoHarvest(h *harvest) {
h.CustomEvents.mergeFailed(cs.analyticsEvents)
}

func (cs *customEvents) Data(agentRunID string, harvestStart time.Time) ([]byte, error) {
return cs.CollectorJSON(agentRunID)
func (cs *customEvents) DataBuffer() *bytes.Buffer {
if len(cs.events) == 0 {
return nil
}

estimate := 256 * len(cs.events)
return bytes.NewBuffer(make([]byte, 0, estimate))
}

func (cs *customEvents) WriteData(buf *bytes.Buffer, agentRunID string, harvestStart time.Time) error {
return cs.CollectorJSON(buf, agentRunID)
}

func (cs *customEvents) EndpointMethod() string {
Expand Down
13 changes: 11 additions & 2 deletions v3/newrelic/error_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,17 @@ func (events *errorEvents) MergeIntoHarvest(h *harvest) {
h.ErrorEvents.mergeFailed(events.analyticsEvents)
}

func (events *errorEvents) Data(agentRunID string, harvestStart time.Time) ([]byte, error) {
return events.CollectorJSON(agentRunID)
func (events *errorEvents) DataBuffer() *bytes.Buffer {
if 0 == len(events.events) {
return nil
}

estimate := 256 * len(events.events)
return bytes.NewBuffer(make([]byte, 0, estimate))
}

func (events *errorEvents) WriteData(buf *bytes.Buffer, agentRunID string, harvestStart time.Time) error {
return events.CollectorJSON(buf, agentRunID)
}

func (events *errorEvents) EndpointMethod() string {
Expand Down
14 changes: 10 additions & 4 deletions v3/newrelic/errors_from_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,18 @@ func mergeTxnErrors(errors *harvestErrors, errs txnErrors, txnEvent txnEvent) {
}
}

func (errors harvestErrors) Data(agentRunID string, harvestStart time.Time) ([]byte, error) {
func (errors harvestErrors) DataBuffer() *bytes.Buffer {
if 0 == len(errors) {
return nil, nil
return nil
}
estimate := 1024 * len(errors)
buf := bytes.NewBuffer(make([]byte, 0, estimate))
return bytes.NewBuffer(make([]byte, 0, estimate))
}

func (errors harvestErrors) WriteData(buf *bytes.Buffer, agentRunID string, harvestStart time.Time) error {
if buf == nil {
return nil
}
buf.WriteByte('[')
jsonx.AppendString(buf, agentRunID)
buf.WriteByte(',')
Expand All @@ -169,7 +175,7 @@ func (errors harvestErrors) Data(agentRunID string, harvestStart time.Time) ([]b
}
buf.WriteByte(']')
buf.WriteByte(']')
return buf.Bytes(), nil
return nil
}

func (errors harvestErrors) MergeIntoHarvest(h *harvest) {}
Expand Down
9 changes: 7 additions & 2 deletions v3/newrelic/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ func TestErrorsLifecycle(t *testing.T) {
},
TotalTime: 2 * time.Second,
})
js, err := he.Data("agentRunID", time.Now())
buf := he.DataBuffer()
err := he.WriteData(buf, "agentRunID", time.Now())
js := buf.Bytes()
if nil != err {
t.Error(err)
}
Expand Down Expand Up @@ -350,7 +352,10 @@ func BenchmarkErrorsJSON(b *testing.B) {
b.ResetTimer()

for n := 0; n < b.N; n++ {
js, err := he.Data("agentRundID", when)
buf := he.DataBuffer()
err := he.WriteData(buf, "agentRundID", when)
js := buf.Bytes()

if nil != err || nil == js {
b.Fatal(err, js)
}
Expand Down
6 changes: 4 additions & 2 deletions v3/newrelic/expect_implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,12 +476,14 @@ func expectTxnTraces(v internal.Validator, traces *harvestTraces, want []interna
if len(want) == 0 {
return
}
js, err := traces.Data("agentRunID", time.Now())
if nil != err {
data := traces.DataBuffer()
err := traces.WriteData(data, "agentRunID", time.Now())
if err != nil {
v.Error("error creasing harvest traces data", err)
return
}

js := data.Bytes()
var unmarshalled []interface{}
err = json.Unmarshal(js, &unmarshalled)
if nil != err {
Expand Down
5 changes: 4 additions & 1 deletion v3/newrelic/harvest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package newrelic

import (
"bytes"
"time"

"github.com/newrelic/go-agent/v3/internal"
Expand Down Expand Up @@ -257,10 +258,12 @@ type payloadCreator interface {
// intermittent collector issue) the payload may be merged into the next
// time period's harvest.
harvestable

DataBuffer() *bytes.Buffer
// Data prepares JSON in the format expected by the collector endpoint.
// This method should return (nil, nil) if the payload is empty and no
// rpm request is necessary.
Data(agentRunID string, harvestStart time.Time) ([]byte, error)
WriteData(buffer *bytes.Buffer, agentRunID string, harvestStart time.Time) error
// EndpointMethod is used for the "method" query parameter when posting
// the data.
EndpointMethod() string
Expand Down
Loading

0 comments on commit ea253d7

Please sign in to comment.