Skip to content

Commit

Permalink
fix: when using delimiter, ensure codec flush consumes buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
yaauie committed Oct 25, 2022
1 parent e34436f commit 2f0c75e
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## 6.2.6
- Fix: when decoding, escaped newlines and carriage returns in extension values are now correctly decoded into literal newlines and carriage returns respectively [#98](https://github.com/logstash-plugins/logstash-codec-cef/pull/98)
- Fix: when decoding, non-CEF payloads are identified and intercepted to prevent data-loss and corruption. They now cause a descriptive log message to be emitted, and are emitted as their own `_cefparsefailure`-tagged event containing the original bytes in its `message` field [#99](https://github.com/logstash-plugins/logstash-codec-cef/issues/99)
- Fix: when decoding while configured with a `delimiter`, flushing this codec now correctly consumes the remainder of its internal buffer. This resolves an issue where bytes that are written without a trailing delimiter could be lost [#100](https://github.com/logstash-plugins/logstash-codec-cef/issues/100)

## 6.2.5
- [DOC] Update link to CEF implementation guide [#97](https://github.com/logstash-plugins/logstash-codec-cef/pull/97)
Expand Down
10 changes: 10 additions & 0 deletions lib/logstash/codecs/cef.rb
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,24 @@ def initialize(params={})
public
def decode(data, &block)
if @delimiter
@logger.trace("Buffering #{data.bytesize}B of data") if @logger.trace?
@buffer.extract(data).each do |line|
@logger.trace("Decoding #{line.bytesize + @delimiter.bytesize}B of buffered data") if @logger.trace?
handle(line, &block)
end
else
@logger.trace("Decoding #{data.bytesize}B of unbuffered data") if @logger.trace?
handle(data, &block)
end
end

def flush(&block)
if @delimiter && (remainder = @buffer.flush)
@logger.trace("Flushing #{remainder.bytesize}B of buffered data") if @logger.trace?
handle(remainder, &block) unless remainder.empty?
end
end

def handle(data, &block)
original_data = data.dup
event = event_factory.new_event
Expand Down
53 changes: 47 additions & 6 deletions spec/codecs/cef_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,8 @@ def validate_ecs_disabled(e)
# @yieldparam event [Event]
# @yieldreturn [void]
# @return [Event]
def decode_one(codec, data, &block)
events = do_decode(codec, data)
def decode_one(codec, data, flush: true, &block)
events = do_decode(codec, data, flush: flush)
fail("Expected one event, got #{events.size} events: #{events.inspect}") unless events.size == 1
event = events.first

Expand All @@ -436,11 +436,14 @@ def decode_one(codec, data, &block)
# @yieldparam event [Event]
# @yieldreturn [void]
# @return [Array<Event>]
def do_decode(codec, data, &block)
def do_decode(codec, data, flush: true, &block)
events = []
codec.decode(data) do |event|
events << event
end
flush && codec.flush do |event|
events << event
end

if block
events.each do |event|
Expand Down Expand Up @@ -470,7 +473,7 @@ def enriched_event_validation(event)
allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility)
end

let (:message) { "CEF:0|security|threatmanager|1.0|100|trojan successfully stopped|10|src=10.0.0.192 dst=12.121.122.82 spt=1232" }
let(:message) { "CEF:0|security|threatmanager|1.0|100|trojan successfully stopped|10|src=10.0.0.192 dst=12.121.122.82 spt=1232" }

include DecodeHelpers

Expand All @@ -483,17 +486,55 @@ def enriched_event_validation(event)
# Related: https://github.com/elastic/logstash/issues/1645
subject(:codec) { LogStash::Codecs::CEF.new("delimiter" => '\r\n') }

let(:message_two) { "CEF:0|fun|whimsy|1.0|100|trojan successfully stopped|10|src=10.0.0.192 dst=12.121.122.82 spt=1232" }

# testing implicit flush when
it "should parse on the delimiter " do
do_decode(subject,message) do |e|
do_decode(subject, message, flush: false) do |e|
raise Exception.new("Should not get here. If we do, it means the decoder emitted an event before the delimiter was seen?")
end

decode_one(subject, "\r\n") do |e|
# the delimiter's presence flushes what we already received, but not the new bytes we send
decode_one(subject, "\r\n#{message_two}", flush: false) do |e|
validate(e)
insist { e.get(ecs_select[disabled: "deviceVendor", v1:"[observer][vendor]"]) } == "security"
insist { e.get(ecs_select[disabled: "deviceProduct", v1:"[observer][product]"]) } == "threatmanager"
end

# allowing a flush emits the buffered event with our new bits appended
decode_one(subject, " split=perfect", flush: true) do |e|
validate(e)
insist { e.get(ecs_select[disabled: "deviceVendor", v1:"[observer][vendor]"]) } == "fun"
insist { e.get(ecs_select[disabled: "deviceProduct", v1:"[observer][product]"]) } == "whimsy"
insist { e.get("split") } == "perfect"
end
end

it 'flushes on close' do
# message does NOT have delimiter, but we still get our event
decode_one(subject, message, flush: true) do |e|
validate(e)
insist { e.get(ecs_select[disabled: "deviceVendor", v1:"[observer][vendor]"]) } == "security"
insist { e.get(ecs_select[disabled: "deviceProduct", v1:"[observer][product]"]) } == "threatmanager"
end
end

it 'emits multiple from a single decode operation' do
events = do_decode(subject, "#{message}\r\n#{message_two}")
expect(events.size).to eq(2)

enriched_event_validation(events[0]) do |event|
validate(event)
insist { event.get(ecs_select[disabled: "deviceVendor", v1:"[observer][vendor]"]) } == "security"
insist { event.get(ecs_select[disabled: "deviceProduct", v1:"[observer][product]"]) } == "threatmanager"
end

enriched_event_validation(events[1]) do |event|
validate(event)
insist { event.get(ecs_select[disabled: "deviceVendor", v1:"[observer][vendor]"]) } == "fun"
insist { event.get(ecs_select[disabled: "deviceProduct", v1:"[observer][product]"]) } == "whimsy"
end
end
end

# CEF requires seven pipe-terminated headers before optional extensions
Expand Down

0 comments on commit 2f0c75e

Please sign in to comment.