From 2f0c75e6a0b2f4b57f173600769957e14ce563e6 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Tue, 25 Oct 2022 21:12:16 +0000 Subject: [PATCH] fix: when using `delimiter`, ensure codec flush consumes buffer Resolves #100 --- CHANGELOG.md | 1 + lib/logstash/codecs/cef.rb | 10 +++++++ spec/codecs/cef_spec.rb | 53 +++++++++++++++++++++++++++++++++----- 3 files changed, 58 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index acf6aae..eba5f86 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## 6.2.6 - Fix: when decoding, escaped newlines and carriage returns in extension values are now correctly decoded into literal newlines and carriage returns respectively [#98](https://github.com/logstash-plugins/logstash-codec-cef/pull/98) - Fix: when decoding, non-CEF payloads are identified and intercepted to prevent data-loss and corruption. They now cause a descriptive log message to be emitted, and are emitted as their own `_cefparsefailure`-tagged event containing the original bytes in its `message` field [#99](https://github.com/logstash-plugins/logstash-codec-cef/issues/99) + - Fix: when decoding while configured with a `delimiter`, flushing this codec now correctly consumes the remainder of its internal buffer. This resolves an issue where bytes that are written without a trailing delimiter could be lost [#100](https://github.com/logstash-plugins/logstash-codec-cef/issues/100) ## 6.2.5 - [DOC] Update link to CEF implementation guide [#97](https://github.com/logstash-plugins/logstash-codec-cef/pull/97) diff --git a/lib/logstash/codecs/cef.rb b/lib/logstash/codecs/cef.rb index 16fa88a..86d12ed 100644 --- a/lib/logstash/codecs/cef.rb +++ b/lib/logstash/codecs/cef.rb @@ -210,14 +210,24 @@ def initialize(params={}) public def decode(data, &block) if @delimiter + @logger.trace("Buffering #{data.bytesize}B of data") if @logger.trace? @buffer.extract(data).each do |line| + @logger.trace("Decoding #{line.bytesize + @delimiter.bytesize}B of buffered data") if @logger.trace? handle(line, &block) end else + @logger.trace("Decoding #{data.bytesize}B of unbuffered data") if @logger.trace? handle(data, &block) end end + def flush(&block) + if @delimiter && (remainder = @buffer.flush) + @logger.trace("Flushing #{remainder.bytesize}B of buffered data") if @logger.trace? + handle(remainder, &block) unless remainder.empty? + end + end + def handle(data, &block) original_data = data.dup event = event_factory.new_event diff --git a/spec/codecs/cef_spec.rb b/spec/codecs/cef_spec.rb index db0b3d1..8b585b5 100644 --- a/spec/codecs/cef_spec.rb +++ b/spec/codecs/cef_spec.rb @@ -409,8 +409,8 @@ def validate_ecs_disabled(e) # @yieldparam event [Event] # @yieldreturn [void] # @return [Event] - def decode_one(codec, data, &block) - events = do_decode(codec, data) + def decode_one(codec, data, flush: true, &block) + events = do_decode(codec, data, flush: flush) fail("Expected one event, got #{events.size} events: #{events.inspect}") unless events.size == 1 event = events.first @@ -436,11 +436,14 @@ def decode_one(codec, data, &block) # @yieldparam event [Event] # @yieldreturn [void] # @return [Array] - def do_decode(codec, data, &block) + def do_decode(codec, data, flush: true, &block) events = [] codec.decode(data) do |event| events << event end + flush && codec.flush do |event| + events << event + end if block events.each do |event| @@ -470,7 +473,7 @@ def enriched_event_validation(event) allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility) end - let (:message) { "CEF:0|security|threatmanager|1.0|100|trojan successfully stopped|10|src=10.0.0.192 dst=12.121.122.82 spt=1232" } + let(:message) { "CEF:0|security|threatmanager|1.0|100|trojan successfully stopped|10|src=10.0.0.192 dst=12.121.122.82 spt=1232" } include DecodeHelpers @@ -483,17 +486,55 @@ def enriched_event_validation(event) # Related: https://github.com/elastic/logstash/issues/1645 subject(:codec) { LogStash::Codecs::CEF.new("delimiter" => '\r\n') } + let(:message_two) { "CEF:0|fun|whimsy|1.0|100|trojan successfully stopped|10|src=10.0.0.192 dst=12.121.122.82 spt=1232" } + + # testing implicit flush when it "should parse on the delimiter " do - do_decode(subject,message) do |e| + do_decode(subject, message, flush: false) do |e| raise Exception.new("Should not get here. If we do, it means the decoder emitted an event before the delimiter was seen?") end - decode_one(subject, "\r\n") do |e| + # the delimiter's presence flushes what we already received, but not the new bytes we send + decode_one(subject, "\r\n#{message_two}", flush: false) do |e| + validate(e) + insist { e.get(ecs_select[disabled: "deviceVendor", v1:"[observer][vendor]"]) } == "security" + insist { e.get(ecs_select[disabled: "deviceProduct", v1:"[observer][product]"]) } == "threatmanager" + end + + # allowing a flush emits the buffered event with our new bits appended + decode_one(subject, " split=perfect", flush: true) do |e| + validate(e) + insist { e.get(ecs_select[disabled: "deviceVendor", v1:"[observer][vendor]"]) } == "fun" + insist { e.get(ecs_select[disabled: "deviceProduct", v1:"[observer][product]"]) } == "whimsy" + insist { e.get("split") } == "perfect" + end + end + + it 'flushes on close' do + # message does NOT have delimiter, but we still get our event + decode_one(subject, message, flush: true) do |e| validate(e) insist { e.get(ecs_select[disabled: "deviceVendor", v1:"[observer][vendor]"]) } == "security" insist { e.get(ecs_select[disabled: "deviceProduct", v1:"[observer][product]"]) } == "threatmanager" end end + + it 'emits multiple from a single decode operation' do + events = do_decode(subject, "#{message}\r\n#{message_two}") + expect(events.size).to eq(2) + + enriched_event_validation(events[0]) do |event| + validate(event) + insist { event.get(ecs_select[disabled: "deviceVendor", v1:"[observer][vendor]"]) } == "security" + insist { event.get(ecs_select[disabled: "deviceProduct", v1:"[observer][product]"]) } == "threatmanager" + end + + enriched_event_validation(events[1]) do |event| + validate(event) + insist { event.get(ecs_select[disabled: "deviceVendor", v1:"[observer][vendor]"]) } == "fun" + insist { event.get(ecs_select[disabled: "deviceProduct", v1:"[observer][product]"]) } == "whimsy" + end + end end # CEF requires seven pipe-terminated headers before optional extensions