diff --git a/lib/logstash/inputs/base.rb b/lib/logstash/inputs/base.rb index bde8d3bff4d..b72490bae45 100644 --- a/lib/logstash/inputs/base.rb +++ b/lib/logstash/inputs/base.rb @@ -101,12 +101,20 @@ def tag(newtag) @tags << newtag end # def tag - # if you override stop, don't forget to call super - # as the first action public + # override stop if you need to do more than do_stop to + # enforce the input plugin to return from `run`. + # e.g. a tcp plugin might need to close the tcp socket + # so blocking read operation aborts def stop + # override if necessary + end + + public + def do_stop @logger.debug("stopping", :plugin => self) @stop_called.make_true + stop end # stop? should never be overriden diff --git a/lib/logstash/pipeline.rb b/lib/logstash/pipeline.rb index c66681bf6f8..44eeb2fcf46 100644 --- a/lib/logstash/pipeline.rb +++ b/lib/logstash/pipeline.rb @@ -186,7 +186,7 @@ def inputworker(plugin) sleep(1) retry ensure - plugin.close + plugin.do_close end end # def inputworker @@ -214,7 +214,7 @@ def filterworker @logger.error("Exception in filterworker", "exception" => e, "backtrace" => e.backtrace) end - @filters.each(&:close) + @filters.each(&:do_close) end # def filterworker def outputworker @@ -228,7 +228,7 @@ def outputworker end ensure @outputs.each do |output| - output.worker_plugins.each(&:close) + output.worker_plugins.each(&:do_close) end end # def outputworker @@ -245,7 +245,7 @@ def shutdown InflightEventsReporter.logger = @logger InflightEventsReporter.start(@input_to_filter, @filter_to_output, @outputs) - @inputs.each(&:stop) + @inputs.each(&:do_stop) end # def shutdown def plugin(plugin_type, name, *args) diff --git a/lib/logstash/plugin.rb b/lib/logstash/plugin.rb index eabed9b1a7d..e4ed6171ecc 100644 --- a/lib/logstash/plugin.rb +++ b/lib/logstash/plugin.rb @@ -28,12 +28,19 @@ def initialize(params=nil) @logger = Cabin::Channel.get(LogStash) end + # close is called during shutdown, after the plugin worker + # main task terminates + public + def do_close + @logger.debug("closing", :plugin => self) + close + end + # Subclasses should implement this close method if you need to perform any # special tasks during shutdown (like flushing, etc.) - # if you override close, don't forget to call super public def close - @logger.debug("closing", :plugin => self) + # .. end def to_s