diff --git a/CHANGELOG.md b/CHANGELOG.md index c823160d4..627d553ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,8 @@ * A prometheus remote-write sink compatible with Cortex and more. Thanks, [philipnrmn](https://github.com/philipnrmn)! * Some veneur-prometheus arguments to rename and add additional tags. Thanks, [christopherb-stripe](https://github.com/christopherb-stripe)! * Migrate Prometheus to new config format; part of multi-sink routing update. Thanks, [truong-stripe](https://github.com/truong-stripe)! -* Authentication support for Cortex remote-write sink. Thanks, -[oscil8](https://github.com/oscil8)! +* Authentication support for Cortex remote-write sink. Thanks, [oscil8](https://github.com/oscil8)! +* Option to flush sinks on shutdown. Thanks, [csolidum](https://github.com/csolidum)! ## Bugfixes * A fix for forwarding metrics with gRPC using the kubernetes discoverer. Thanks, [androohan](https://github.com/androohan)! diff --git a/config.go b/config.go index 884a70794..ae05ae053 100644 --- a/config.go +++ b/config.go @@ -37,6 +37,7 @@ type Config struct { } `yaml:"features"` FlushFile string `yaml:"flush_file"` FlushMaxPerBody int `yaml:"flush_max_per_body"` + FlushOnShutdown bool `yaml:"flush_on_shutdown"` FlushWatchdogMissedFlushes int `yaml:"flush_watchdog_missed_flushes"` ForwardAddress string `yaml:"forward_address"` ForwardUseGrpc bool `yaml:"forward_use_grpc"` diff --git a/example.yaml b/example.yaml index 2a917de60..262aca5e7 100644 --- a/example.yaml +++ b/example.yaml @@ -66,6 +66,9 @@ interval: "10s" # watchdog. flush_watchdog_missed_flushes: 0 +# Whether to flush sinks on shutdown. Defaults to false +flush_on_shutdown: false + # Veneur can "sychronize" it's flushes with the system clock, flushing at even # intervals i.e. 0, 10, 20… to align with the `interval`. This is disabled by # default for now, as it can cause thundering herds in large installations. diff --git a/server.go b/server.go index 82d4fcdde..88aa9460f 100644 --- a/server.go +++ b/server.go @@ -133,6 +133,7 @@ type Server struct { GRPCListenAddrs []net.Addr RcvbufBytes int + FlushOnShutdown bool Interval time.Duration synchronizeInterval bool numReaders int @@ -518,6 +519,8 @@ func NewFromConfig(config ServerConfig) (*Server, error) { } logger.WithField("BlockProfileRate", conf.BlockProfileRate).Info("Set block profile rate (nanoseconds)") + ret.FlushOnShutdown = conf.FlushOnShutdown + // Use the pre-allocated Workers slice to know how many to start. logger.WithField("number", len(ret.Workers)).Info("Preparing workers") for i := range ret.Workers { @@ -1398,6 +1401,11 @@ func (s *Server) Shutdown() { // TODO(aditya) shut down workers and socket readers s.logger.Info("Shutting down server gracefully") close(s.shutdown) + if s.FlushOnShutdown { + ctx, cancel := context.WithTimeout(context.Background(), s.Interval) + s.Flush(ctx) + cancel() + } graceful.Shutdown() for _, source := range s.sources { source.source.Stop() diff --git a/testdata/http_test_config.json b/testdata/http_test_config.json index d90bc1624..51e54cbfa 100644 --- a/testdata/http_test_config.json +++ b/testdata/http_test_config.json @@ -30,6 +30,7 @@ }, "FlushFile": "", "FlushMaxPerBody": 0, + "FlushOnShutdown": false, "FlushWatchdogMissedFlushes": 0, "ForwardAddress": "", "ForwardUseGrpc": false, diff --git a/testdata/http_test_config.yaml b/testdata/http_test_config.yaml index 53b9e52ef..01bb7c39b 100644 --- a/testdata/http_test_config.yaml +++ b/testdata/http_test_config.yaml @@ -27,6 +27,7 @@ features: migrate_span_sinks: false flush_file: "" flush_max_per_body: 0 +flush_on_shutdown: false flush_watchdog_missed_flushes: 0 forward_address: "" forward_use_grpc: false