Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for retrying output writes, using independent threads #298

Merged
merged 1 commit into from
Oct 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 52 additions & 18 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type Agent struct {
// Interval at which to flush data
FlushInterval Duration

// FlushRetries is the number of times to retry each data flush
FlushRetries int

// TODO(cam): Remove UTC and Precision parameters, they are no longer
// valid for the agent config. Leaving them here for now for backwards-
// compatability
Expand Down Expand Up @@ -61,6 +64,7 @@ func NewAgent(config *Config) (*Agent, error) {
Config: config,
Interval: Duration{10 * time.Second},
FlushInterval: Duration{10 * time.Second},
FlushRetries: 2,
UTC: true,
Precision: "s",
}
Expand Down Expand Up @@ -293,28 +297,56 @@ func (a *Agent) Test() error {
return nil
}

func (a *Agent) flush(points []*client.Point) {
var wg sync.WaitGroup

// writeOutput writes a list of points to a single output, with retries
func (a *Agent) writeOutput(
points []*client.Point,
ro *runningOutput,
shutdown chan struct{},
) {
retry := 0
retries := a.FlushRetries
start := time.Now()
counter := 0
for _, o := range a.outputs {
wg.Add(1)
counter++

go func(ro *runningOutput) {
defer wg.Done()
// Log all output errors:
if err := ro.output.Write(points); err != nil {
log.Printf("Error in output [%s]: %s", ro.name, err.Error())
for {
err := ro.output.Write(points)

select {
case <-shutdown:
return
default:
if err == nil {
// Write successful
elapsed := time.Since(start)
log.Printf("Flushed %d metrics to output %s in %s\n",
len(points), ro.name, elapsed)
return
} else if retry >= retries {
// No more retries
msg := "FATAL: Write to output [%s] failed %d times, dropping" +
" %d metrics\n"
log.Printf(msg, ro.name, retries+1, len(points))
return
} else if err != nil {
// Sleep for a retry
log.Printf("Error in output [%s]: %s, retrying in %s",
ro.name, err.Error(), a.FlushInterval.Duration)
time.Sleep(a.FlushInterval.Duration)
}
}(o)
}

retry++
}
}

wg.Wait()
elapsed := time.Since(start)
log.Printf("Flushed %d metrics to %d output sinks in %s\n",
len(points), counter, elapsed)
// flush writes a list of points to all configured outputs
func (a *Agent) flush(points []*client.Point, shutdown chan struct{}) {
if len(points) == 0 {
return
}

for _, o := range a.outputs {
go a.writeOutput(points, o, shutdown)
}
}

// flusher monitors the points input channel and flushes on the minimum interval
Expand All @@ -327,9 +359,11 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er
for {
select {
case <-shutdown:
log.Println("Hang on, flushing any cached points before shutdown")
a.flush(points, shutdown)
return nil
case <-ticker.C:
a.flush(points)
a.flush(points, shutdown)
points = make([]*client.Point, 0)
case pt := <-pointChan:
points = append(points, pt)
Expand Down
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ var header = `# Telegraf configuration
interval = "10s"
# Default data flushing interval for all outputs
flush_interval = "10s"
# Number of times to retry each data flush
flush_retries = 2
# run telegraf in debug mode
debug = false
# Override default hostname, if empty use os.Hostname()
Expand Down
18 changes: 7 additions & 11 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,12 @@
[agent]
# Default data collection interval for all plugins
interval = "10s"

# If utc = false, uses local time (utc is highly recommended)
utc = true

# Precision of writes, valid values are n, u, ms, s, m, and h
# note: using second precision greatly helps InfluxDB compression
precision = "s"

# Default data flushing interval for all outputs
flush_interval = "10s"
# Number of times to retry each data flush
flush_retries = 2
# run telegraf in debug mode
debug = false

# Override default hostname, if empty use os.Hostname()
hostname = ""

Expand All @@ -54,15 +49,16 @@
# Multiple urls can be specified for InfluxDB cluster support. Server to
# write to will be randomly chosen each interval.
urls = ["http://localhost:8086"] # required.

# The target database for metrics. This database must already exist
database = "telegraf" # required.
# Precision of writes, valid values are n, u, ms, s, m, and h
# note: using second precision greatly helps InfluxDB compression
precision = "s"

# Connection timeout (for the connection with InfluxDB), formatted as a string.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
# If not provided, will default to 0 (no timeout)
# timeout = "5s"

# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"

Expand Down