Skip to content

Commit

Permalink
use AddError everywhere (influxdata#2372)
Browse files Browse the repository at this point in the history
  • Loading branch information
phemmer authored and Vladislav Mugultyanov (Lazada Group) committed May 30, 2017
1 parent 920578a commit 7f58537
Show file tree
Hide file tree
Showing 95 changed files with 341 additions and 531 deletions.
37 changes: 0 additions & 37 deletions internal/errchan/errchan.go

This file was deleted.

6 changes: 2 additions & 4 deletions plugins/inputs/aerospike/aerospike.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"

as "github.com/aerospike/aerospike-client-go"
Expand Down Expand Up @@ -41,17 +40,16 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
}

var wg sync.WaitGroup
errChan := errchan.New(len(a.Servers))
wg.Add(len(a.Servers))
for _, server := range a.Servers {
go func(serv string) {
defer wg.Done()
errChan.C <- a.gatherServer(serv, acc)
acc.AddError(a.gatherServer(serv, acc))
}(server)
}

wg.Wait()
return errChan.Error()
return nil
}

func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) error {
Expand Down
5 changes: 2 additions & 3 deletions plugins/inputs/aerospike/aerospike_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestAerospikeStatistics(t *testing.T) {

var acc testutil.Accumulator

err := a.Gather(&acc)
err := acc.GatherError(a.Gather)
require.NoError(t, err)

assert.True(t, acc.HasMeasurement("aerospike_node"))
Expand All @@ -41,8 +41,7 @@ func TestAerospikeStatisticsPartialErr(t *testing.T) {

var acc testutil.Accumulator

err := a.Gather(&acc)
require.Error(t, err)
require.Error(t, acc.GatherError(a.Gather))

assert.True(t, acc.HasMeasurement("aerospike_node"))
assert.True(t, acc.HasMeasurement("aerospike_namespace"))
Expand Down
22 changes: 9 additions & 13 deletions plugins/inputs/apache/apache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -65,28 +66,23 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error {
n.ResponseTimeout.Duration = time.Second * 5
}

var outerr error
var errch = make(chan error)

var wg sync.WaitGroup
wg.Add(len(n.Urls))
for _, u := range n.Urls {
addr, err := url.Parse(u)
if err != nil {
return fmt.Errorf("Unable to parse address '%s': %s", u, err)
acc.AddError(fmt.Errorf("Unable to parse address '%s': %s", u, err))
continue
}

go func(addr *url.URL) {
errch <- n.gatherUrl(addr, acc)
defer wg.Done()
acc.AddError(n.gatherUrl(addr, acc))
}(addr)
}

// Drain channel, waiting for all requests to finish and save last error.
for range n.Urls {
if err := <-errch; err != nil {
outerr = err
}
}

return outerr
wg.Wait()
return nil
}

func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/apache/apache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestHTTPApache(t *testing.T) {
}

var acc testutil.Accumulator
err := a.Gather(&acc)
err := acc.GatherError(a.Gather)
require.NoError(t, err)

fields := map[string]interface{}{
Expand Down
26 changes: 15 additions & 11 deletions plugins/inputs/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
Expand Down Expand Up @@ -123,8 +122,8 @@ func (j javaMetric) addTagsFields(out map[string]interface{}) {
}
j.acc.AddFields(tokens["class"]+tokens["type"], fields, tags)
} else {
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
j.metric, out)
j.acc.AddError(fmt.Errorf("Missing key 'value' in '%s' output response\n%v\n",
j.metric, out))
}
}

Expand Down Expand Up @@ -155,17 +154,17 @@ func (c cassandraMetric) addTagsFields(out map[string]interface{}) {
addCassandraMetric(k, c, v.(map[string]interface{}))
}
} else {
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
c.metric, out)
c.acc.AddError(fmt.Errorf("Missing key 'value' in '%s' output response\n%v\n",
c.metric, out))
return
}
} else {
if values, ok := out["value"]; ok {
addCassandraMetric(r.(map[string]interface{})["mbean"].(string),
c, values.(map[string]interface{}))
} else {
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
c.metric, out)
c.acc.AddError(fmt.Errorf("Missing key 'value' in '%s' output response\n%v\n",
c.metric, out))
return
}
}
Expand Down Expand Up @@ -274,25 +273,30 @@ func (c *Cassandra) Gather(acc telegraf.Accumulator) error {
m = newCassandraMetric(serverTokens["host"], metric, acc)
} else {
// unsupported metric type
log.Printf("I! Unsupported Cassandra metric [%s], skipping",
metric)
acc.AddError(fmt.Errorf("E! Unsupported Cassandra metric [%s], skipping",
metric))
continue
}

// Prepare URL
requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" +
serverTokens["port"] + context + metric)
if err != nil {
return err
acc.AddError(err)
continue
}
if serverTokens["user"] != "" && serverTokens["passwd"] != "" {
requestUrl.User = url.UserPassword(serverTokens["user"],
serverTokens["passwd"])
}

out, err := c.getAttr(requestUrl)
if err != nil {
acc.AddError(err)
continue
}
if out["status"] != 200.0 {
fmt.Printf("URL returned with status %v\n", out["status"])
acc.AddError(fmt.Errorf("URL returned with status %v\n", out["status"]))
continue
}
m.addTagsFields(out)
Expand Down
17 changes: 9 additions & 8 deletions plugins/inputs/cassandra/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestHttpJsonJavaMultiValue(t *testing.T) {

var acc testutil.Accumulator
acc.SetDebug(true)
err := cassandra.Gather(&acc)
err := acc.GatherError(cassandra.Gather)

assert.Nil(t, err)
assert.Equal(t, 2, len(acc.Metrics))
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestHttpJsonJavaMultiType(t *testing.T) {

var acc testutil.Accumulator
acc.SetDebug(true)
err := cassandra.Gather(&acc)
err := acc.GatherError(cassandra.Gather)

assert.Nil(t, err)
assert.Equal(t, 2, len(acc.Metrics))
Expand All @@ -197,24 +197,25 @@ func TestHttpJsonJavaMultiType(t *testing.T) {
}

// Test that the proper values are ignored or collected
func TestHttpJsonOn404(t *testing.T) {
func TestHttp404(t *testing.T) {

jolokia := genJolokiaClientStub(validJavaMultiValueJSON, 404, Servers,
jolokia := genJolokiaClientStub(invalidJSON, 404, Servers,
[]string{HeapMetric})

var acc testutil.Accumulator
err := jolokia.Gather(&acc)
err := acc.GatherError(jolokia.Gather)

assert.Nil(t, err)
assert.Error(t, err)
assert.Equal(t, 0, len(acc.Metrics))
assert.Contains(t, err.Error(), "has status code 404")
}

// Test that the proper values are ignored or collected for class=Cassandra
func TestHttpJsonCassandraMultiValue(t *testing.T) {
cassandra := genJolokiaClientStub(validCassandraMultiValueJSON, 200, Servers, []string{ReadLatencyMetric})

var acc testutil.Accumulator
err := cassandra.Gather(&acc)
err := acc.GatherError(cassandra.Gather)

assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Metrics))
Expand Down Expand Up @@ -246,7 +247,7 @@ func TestHttpJsonCassandraNestedMultiValue(t *testing.T) {

var acc testutil.Accumulator
acc.SetDebug(true)
err := cassandra.Gather(&acc)
err := acc.GatherError(cassandra.Gather)

assert.Nil(t, err)
assert.Equal(t, 2, len(acc.Metrics))
Expand Down
4 changes: 2 additions & 2 deletions plugins/inputs/ceph/ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error {
for _, s := range sockets {
dump, err := perfDump(c.CephBinary, s)
if err != nil {
log.Printf("E! error reading from socket '%s': %v", s.socket, err)
acc.AddError(fmt.Errorf("E! error reading from socket '%s': %v", s.socket, err))
continue
}
data, err := parseDump(dump)
if err != nil {
log.Printf("E! error parsing dump from socket '%s': %v", s.socket, err)
acc.AddError(fmt.Errorf("E! error parsing dump from socket '%s': %v", s.socket, err))
continue
}
for tag, metrics := range data {
Expand Down
5 changes: 3 additions & 2 deletions plugins/inputs/cgroup/cgroup_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ func (g *CGroup) Gather(acc telegraf.Accumulator) error {

for dir := range list {
if dir.err != nil {
return dir.err
acc.AddError(dir.err)
continue
}
if err := g.gatherDir(dir.path, acc); err != nil {
return err
acc.AddError(err)
}
}

Expand Down
12 changes: 6 additions & 6 deletions plugins/inputs/cgroup/cgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var cg1 = &CGroup{
func TestCgroupStatistics_1(t *testing.T) {
var acc testutil.Accumulator

err := cg1.Gather(&acc)
err := acc.GatherError(cg1.Gather)
require.NoError(t, err)

tags := map[string]string{
Expand Down Expand Up @@ -56,7 +56,7 @@ var cg2 = &CGroup{
func TestCgroupStatistics_2(t *testing.T) {
var acc testutil.Accumulator

err := cg2.Gather(&acc)
err := acc.GatherError(cg2.Gather)
require.NoError(t, err)

tags := map[string]string{
Expand All @@ -81,7 +81,7 @@ var cg3 = &CGroup{
func TestCgroupStatistics_3(t *testing.T) {
var acc testutil.Accumulator

err := cg3.Gather(&acc)
err := acc.GatherError(cg3.Gather)
require.NoError(t, err)

tags := map[string]string{
Expand All @@ -108,7 +108,7 @@ var cg4 = &CGroup{
func TestCgroupStatistics_4(t *testing.T) {
var acc testutil.Accumulator

err := cg4.Gather(&acc)
err := acc.GatherError(cg4.Gather)
require.NoError(t, err)

tags := map[string]string{
Expand Down Expand Up @@ -140,7 +140,7 @@ var cg5 = &CGroup{
func TestCgroupStatistics_5(t *testing.T) {
var acc testutil.Accumulator

err := cg5.Gather(&acc)
err := acc.GatherError(cg5.Gather)
require.NoError(t, err)

tags := map[string]string{
Expand All @@ -167,7 +167,7 @@ var cg6 = &CGroup{
func TestCgroupStatistics_6(t *testing.T) {
var acc testutil.Accumulator

err := cg6.Gather(&acc)
err := acc.GatherError(cg6.Gather)
require.NoError(t, err)

tags := map[string]string{
Expand Down
Loading

0 comments on commit 7f58537

Please sign in to comment.