Skip to content
This repository has been archived by the owner on Dec 1, 2018. It is now read-only.

Commit

Permalink
Ensure that riemann sink is resilient to backend outages.
Browse files Browse the repository at this point in the history
Signed-off-by: Vishnu kannan <[email protected]>
  • Loading branch information
vishh committed Nov 6, 2015
1 parent 6881269 commit 22a8ea0
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 58 deletions.
101 changes: 63 additions & 38 deletions sinks/riemann/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"reflect"
"runtime"
"strconv"
"time"

riemann_api "github.com/bigdatadev/goryman"
"github.com/golang/glog"
"k8s.io/heapster/extpoints"
sink_api "k8s.io/heapster/sinks/api"
"k8s.io/heapster/sinks/util"
kube_api "k8s.io/kubernetes/pkg/api"
)

Expand All @@ -34,6 +36,7 @@ import (
type riemannSink struct {
client riemannClient
config riemannConfig
ci util.ClientInitializer
}

type riemannConfig struct {
Expand Down Expand Up @@ -80,24 +83,26 @@ func CreateRiemannSink(uri *url.URL, _ extpoints.HeapsterConf) ([]sink_api.Exter
}
c.storeEvents = storeEvents
}
glog.V(4).Infof("Riemann sink URI: '%+v', host: '%+v', options: '%+v', ", uri, c.host, options)
var sink, err = new(func() riemannClient {
return riemann_api.NewGorymanClient(c.host)
}, c)
if err != nil {
return nil, err
glog.Infof("Riemann sink URI: '%+v', host: '%+v', options: '%+v', ", uri, c.host, options)
rs := &riemannSink{
client: riemann_api.NewGorymanClient(c.host),
config: c,
}
return []sink_api.ExternalSink{sink}, nil
rs.ci = util.NewClientInitializer("riemann", rs.setupRiemannClient, rs.ping, 10*time.Second)
runtime.SetFinalizer(rs.client, func(c riemannClient) { c.Close() })
return []sink_api.ExternalSink{rs}, nil
}

func (rs *riemannSink) setupRiemannClient() error {
return rs.client.Connect()
}

func new(getClient func() riemannClient, conf riemannConfig) (sink_api.ExternalSink, error) {
var client = getClient()
var err = client.Connect()
func (rs *riemannSink) ping() error {
_, err := rs.client.QueryEvents("false")
if err != nil {
return nil, err
rs.client.Close()
}
runtime.SetFinalizer(client, func(c riemannClient) { c.Close() })
return &riemannSink{client: client, config: conf}, nil
return err
}

// Abstracted for testing: this package works against any client that obeys the
Expand All @@ -107,9 +112,10 @@ type riemannClient interface {
Connect() error
Close() error
SendEvent(*riemann_api.Event) error
QueryEvents(string) ([]riemann_api.Event, error)
}

func (self *riemannSink) kubeEventToRiemannEvent(event kube_api.Event) (*riemann_api.Event, error) {
func (rs *riemannSink) kubeEventToRiemannEvent(event kube_api.Event) (*riemann_api.Event, error) {
glog.V(4).Infof("riemannSink.kubeEventToRiemannEvent(%+v)", event)

var rv = riemann_api.Event{
Expand All @@ -127,15 +133,15 @@ func (self *riemannSink) kubeEventToRiemannEvent(event kube_api.Event) (*riemann
Description: event.Reason + ": " + event.Message,
// Count: the number of times this event has occurred
Metric: event.Count,
Ttl: self.config.ttl,
State: self.config.state,
Tags: self.config.tags,
Ttl: rs.config.ttl,
State: rs.config.state,
Tags: rs.config.tags,
}
glog.V(4).Infof("Returning Riemann event: %+v", rv)
return &rv, nil
}

func (self *riemannSink) timeseriesToRiemannEvent(ts sink_api.Timeseries) (*riemann_api.Event, error) {
func (rs *riemannSink) timeseriesToRiemannEvent(ts sink_api.Timeseries) (*riemann_api.Event, error) {
glog.V(4).Infof("riemannSink.timeseriesToRiemannEvent(%+v)", ts)

var service string
Expand All @@ -152,9 +158,9 @@ func (self *riemannSink) timeseriesToRiemannEvent(ts sink_api.Timeseries) (*riem
Description: ts.MetricDescriptor.Description,
Attributes: ts.Point.Labels,
Metric: valueToMetric(ts.Point.Value),
Ttl: self.config.ttl,
State: self.config.state,
Tags: self.config.tags,
Ttl: rs.config.ttl,
State: rs.config.state,
Tags: rs.config.tags,
}
glog.V(4).Infof("Returning Riemann event: %+v", rv)
return &rv, nil
Expand All @@ -172,60 +178,79 @@ func valueToMetric(i interface{}) interface{} {
}
}

func (self *riemannSink) sendEvents(events []*riemann_api.Event) error {
// Error is the type of a parse error; it satisfies the error interface.
type Error string

func (e Error) Error() string {
return string(e)
}

func (rs *riemannSink) sendEvents(events []*riemann_api.Event) error {
var result error
for _, event := range events {
err := self.client.SendEvent(event)
defer func() {
if res := recover(); res != nil {
result = res.(Error)
}
}()
err := rs.client.SendEvent(event)
// FIXME handle multiple errors
if err != nil {
glog.Warningf("Failed sending event to Riemann: %+v: %+v", event, err)
glog.V(2).Infof("Failed sending event to Riemann: %+v: %+v", event, err)
result = err
}
}
return result
}

// Riemann does not pre-register metrics, so Register() is a no-op
func (self *riemannSink) Register(descriptor []sink_api.MetricDescriptor) error { return nil }
func (rs *riemannSink) Register(descriptor []sink_api.MetricDescriptor) error { return nil }

// Like Register
func (self *riemannSink) Unregister(metrics []sink_api.MetricDescriptor) error { return nil }
func (rs *riemannSink) Unregister(metrics []sink_api.MetricDescriptor) error { return nil }

// Send a collection of Timeseries to Riemann
func (self *riemannSink) StoreTimeseries(inputs []sink_api.Timeseries) error {
// glog.V(3).Infof("riemannSink.StoreTimeseries(%+v)", inputs)

func (rs *riemannSink) StoreTimeseries(inputs []sink_api.Timeseries) error {
if !rs.ci.Done() {
// Riemann backend isn't available.
glog.V(4).Infof("Skipping timeseries data. Riemann backend isn't available. ")
return nil
}
var events []*riemann_api.Event
for _, input := range inputs {
var event, err = self.timeseriesToRiemannEvent(input)
var event, err = rs.timeseriesToRiemannEvent(input)
if err != nil {
return err
}
events = append(events, event)
}
return self.sendEvents(events)
return rs.sendEvents(events)
}

// Send a collection of Kubernetes Events to Riemann
func (self *riemannSink) StoreEvents(inputs []kube_api.Event) error {
// glog.V(3).Infof("riemannSink.StoreEvents(%+v)", inputs)
if !self.config.storeEvents {
func (rs *riemannSink) StoreEvents(inputs []kube_api.Event) error {
if !rs.ci.Done() {
// Riemann backend isn't available.
glog.V(4).Infof("Skipping events data. Riemann backend isn't available. ")
return nil
}
if !rs.config.storeEvents {
return nil
}

var events []*riemann_api.Event
for _, input := range inputs {
var event, err = self.kubeEventToRiemannEvent(input)
var event, err = rs.kubeEventToRiemannEvent(input)
if err != nil {
return err
}
events = append(events, event)
}
return self.sendEvents(events)
return rs.sendEvents(events)
}

// Return debug information specific to Riemann
func (self *riemannSink) DebugInfo() string { return fmt.Sprintf("%s", self.client) }
func (rs *riemannSink) DebugInfo() string { return fmt.Sprintf("%s", rs.client) }

// Return a user-friendly string describing the sink
func (self *riemannSink) Name() string { return "Riemann" }
func (rs *riemannSink) Name() string { return "Riemann" }
31 changes: 11 additions & 20 deletions sinks/riemann/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,16 @@ type mockClient struct {
closed bool
}

func (self *mockClient) Connect() error { self.connected = true; return nil }
func (self *mockClient) Close() error { self.closed = true; return nil }
func (self *mockClient) SendEvent(*riemann_api.Event) error { return nil }
func getMock() riemannClient { return &mockClient{} }
func (rs *mockClient) Connect() error { rs.connected = true; return nil }
func (rs *mockClient) Close() error { rs.closed = true; return nil }
func (rs *mockClient) SendEvent(*riemann_api.Event) error { return nil }
func (rs *mockClient) QueryEvents(_ string) ([]riemann_api.Event, error) { return nil, nil }
func getMock() riemannClient { return &mockClient{} }

var _ = Describe("Driver", func() {
Describe("new", func() {
It("creates a new Riemann sink pointing to the requested instance", func() {
var _, err = new(getMock, riemannConfig{})
Expect(err).To(BeNil())
})
// func new(addr string) (sink_api.ExternalSink, error) { return nil, nil }
})

Describe("Name", func() {
It("gives a user-friendly string describing the sink", func() {
var subject, err = new(getMock, riemannConfig{})
Expect(err).To(BeNil())
var subject = &riemannSink{getMock(), riemannConfig{}, nil}

var name = subject.Name()

Expand All @@ -51,25 +43,24 @@ var _ = Describe("Driver", func() {
})

Describe("Register", func() {
// func (self *riemannSink) Register(descriptor []sink_api.MetricDescriptor) error { return nil }
// func (rs *riemannSink) Register(descriptor []sink_api.MetricDescriptor) error { return nil }
It("registers a metric with Riemann (no-op)", func() {})
})

PDescribe("StoreTimeseries", func() {
// func (self *riemannSink) StoreTimeseries(ts []sink_api.Timeseries) error { return nil }
// func (rs *riemannSink) StoreTimeseries(ts []sink_api.Timeseries) error { return nil }
It("sends a collection of Timeseries to Riemann", func() {})
})

PDescribe("StoreEvents", func() {
// func (self *riemannSink) StoreEvents(event []kube_api.Event) error { return nil }
// func (rs *riemannSink) StoreEvents(event []kube_api.Event) error { return nil }
It("sends a collection of Kubernetes Events to Riemann", func() {})
})

Describe("DebugInfo", func() {
// func (self *riemannSink) DebugInfo() string { return "" }
// func (rs *riemannSink) DebugInfo() string { return "" }
It("gives debug information specific to Riemann", func() {
var subject, err = new(getMock, riemannConfig{})
Expect(err).To(BeNil())
var subject = &riemannSink{getMock(), riemannConfig{}, nil}
var debugInfo = subject.DebugInfo()
Expect(debugInfo).ToNot(Equal(""))
})
Expand Down

0 comments on commit 22a8ea0

Please sign in to comment.