Skip to content

Commit

Permalink
fix: proactively detect malformed CEF
Browse files Browse the repository at this point in the history
When encountering malformed-CEF or non-CEF payloads, this plugin now emits
helpful descriptive log messages, and prevents data-loss and corruption by
emitting an event tagged with `_cefparsefailure` containing the bytes it
received.

This set of changes catches 3 distinct cases of malformed payloads.

  - missing one or more of the 7 required CEF header fields; a payload that
    does not have all 7 unescaped-pipe-terminated header fields cannot be
    reliably interpreted as CEF (prevents corruption).
  - containing something OTHER than a sequence of key=value pairs in the
    extensions space (prevent data-loss; previously when extensions were
    invalid they were silently omitted)
  - containing unescaped newlines (prevents corruption; previously data after
    the first newline was injected into the currently-parsed extension field)

In catching these classes of malformed inputs, this changeset also
resolves logstash-plugins#99 in which our failure
to detect a malformed input proactively caused an unhelpful `NoMethodError`
message to be logged before a `_cefparsefailure`-tagged event was emitted.
  • Loading branch information
yaauie committed Oct 25, 2022
1 parent 842e838 commit e34436f
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## 6.2.6
- Fix: when decoding, escaped newlines and carriage returns in extension values are now correctly decoded into literal newlines and carriage returns respectively [#98](https://github.com/logstash-plugins/logstash-codec-cef/pull/98)
- Fix: when decoding, non-CEF payloads are identified and intercepted to prevent data-loss and corruption. They now cause a descriptive log message to be emitted, and are emitted as their own `_cefparsefailure`-tagged event containing the original bytes in its `message` field [#99](https://github.com/logstash-plugins/logstash-codec-cef/issues/99)

## 6.2.5
- [DOC] Update link to CEF implementation guide [#97](https://github.com/logstash-plugins/logstash-codec-cef/pull/97)
Expand Down
47 changes: 36 additions & 11 deletions lib/logstash/codecs/cef.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ class LogStash::Codecs::CEF < LogStash::Codecs::Base
# - non-pipe characters
HEADER_PATTERN = /(?:\\\||\\\\|[^|])*?/

# Cache of a scanner pattern that _captures_ a HEADER followed by an unescaped pipe
HEADER_SCANNER = /(#{HEADER_PATTERN})#{Regexp.quote('|')}/
# Cache of a scanner pattern that _captures_ a HEADER followed by EOF or an unescaped pipe
HEADER_NEXT_FIELD_PATTERN = /(#{HEADER_PATTERN})#{Regexp.quote('|')}/

# Cache of a gsub pattern that matches a backslash-escaped backslash or backslash-escaped pipe, _capturing_ the escaped character
HEADER_ESCAPE_CAPTURE = /\\([\\|])/
Expand Down Expand Up @@ -136,8 +136,8 @@ class LogStash::Codecs::CEF < LogStash::Codecs::Base
# - runs of whitespace that are NOT followed by something that looks like a key-equals sequence
EXTENSION_VALUE_PATTERN = /(?:\S|\s++(?!#{EXTENSION_KEY_PATTERN}=))*/

# Cache of a scanner pattern that _captures_ extension field key/value pairs
EXTENSION_KEY_VALUE_SCANNER = /(#{EXTENSION_KEY_PATTERN})=(#{EXTENSION_VALUE_PATTERN})\s*/
# Cache of a pattern that _captures_ the NEXT extension field key/value pair
EXTENSION_NEXT_KEY_VALUE_PATTERN = /^(#{EXTENSION_KEY_PATTERN})=(#{EXTENSION_VALUE_PATTERN})\s*/

##
# @see CEF#sanitize_header_field
Expand Down Expand Up @@ -235,10 +235,16 @@ def handle(data, &block)
end

# Use a scanning parser to capture the HEADER_FIELDS
unprocessed_data = data
@header_fields.each do |field_name|
match_data = HEADER_SCANNER.match(unprocessed_data)
break if match_data.nil? # missing fields
unprocessed_data = data.chomp
if unprocessed_data.include?(LITERAL_NEWLINE)
fail("message is not valid CEF because it contains unescaped newline characters; " +
"use the `delimiter` setting to enable in-codec buffering and delimiter-splitting")
end
@header_fields.each_with_index do |field_name, idx|
match_data = HEADER_NEXT_FIELD_PATTERN.match(unprocessed_data)
if match_data.nil?
fail("message is not valid CEF; found #{idx} of 7 required pipe-terminated header fields")
end

escaped_field_value = match_data[1]
next if escaped_field_value.nil?
Expand All @@ -265,11 +271,14 @@ def handle(data, &block)
event.set(cef_version_field, delete_cef_prefix(event.get(cef_version_field)))

# Use a scanning parser to capture the Extension Key/Value Pairs
if message && message.include?('=')
if message && !message.empty?
message = message.strip
extension_fields = {}

message.scan(EXTENSION_KEY_VALUE_SCANNER) do |extension_field_key, raw_extension_field_value|
while (match = message.match(EXTENSION_NEXT_KEY_VALUE_PATTERN))
extension_field_key, raw_extension_field_value = match.captures
message = match.post_match

# expand abbreviated extension field keys
extension_field_key = @decode_mapping.fetch(extension_field_key, extension_field_key)

Expand All @@ -281,6 +290,9 @@ def handle(data, &block)

extension_fields[extension_field_key] = extension_field_value
end
if !message.empty?
fail("invalid extensions; keyless value present `#{message}`")
end

# in ECS mode, normalize timestamps including timezone.
if ecs_compatibility != :disabled
Expand All @@ -300,7 +312,7 @@ def handle(data, &block)
yield event
rescue => e
@logger.error("Failed to decode CEF payload. Generating failure event with payload in message field.",
:exception => e.class, :message => e.message, :backtrace => e.backtrace, :original_data => original_data)
log_metadata(:original_data => original_data))
yield event_factory.new_event("message" => data, "tags" => ["_cefparsefailure"])
end

Expand Down Expand Up @@ -349,6 +361,19 @@ def generate_header_fields!
@syslog_header = ecs_select[disabled:'syslog',v1:'[log][syslog][header]']
end

##
# produces log metadata, injecting the current exception and log-level-relevant backtraces
# @param context [Hash{Symbol=>Object}]: the base context
def log_metadata(context={})
return context unless $!

exception_context = {}
exception_context[:exception] = "#{$!.class}: #{$!.message}"
exception_context[:backtrace] = $!.backtrace if @logger.debug?

exception_context.merge(context)
end

class CEFField
##
# @param name [String]: the full CEF name of a field
Expand Down
101 changes: 95 additions & 6 deletions spec/codecs/cef_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -409,14 +409,16 @@ def validate_ecs_disabled(e)
# @yieldparam event [Event]
# @yieldreturn [void]
# @return [Event]
def decode_one(codec, data)
def decode_one(codec, data, &block)
events = do_decode(codec, data)
fail("Expected one event, got #{events.size} events: #{events.inspect}") unless events.size == 1
event = events.first

if block_given?
aggregate_failures('decode one') do
yield event
if block
enriched_event_validation(event) do |e|
aggregate_failures('decode one') do
yield e
end
end
end

Expand All @@ -434,16 +436,32 @@ def decode_one(codec, data)
# @yieldparam event [Event]
# @yieldreturn [void]
# @return [Array<Event>]
def do_decode(codec, data)
def do_decode(codec, data, &block)
events = []
codec.decode(data) do |event|
events << event
end

events.each { |event| yield event } if block_given?
if block
events.each do |event|
enriched_event_validation(event, &block)
end
end

events
end

##
# Enrich event validation by outputting the serialized event to stderr
# if-and-only-if the provided block's rspec expectations are not met.
#
# @param event [#to_hash_with_metadata]
def enriched_event_validation(event)
yield(event)
rescue RSpec::Expectations::ExpectationNotMetError
$stderr.puts("\e[35m#{event.to_hash_with_metadata}\e[0m\n")
raise
end
end

context "#decode", :ecs_compatibility_support do
Expand Down Expand Up @@ -478,6 +496,77 @@ def do_decode(codec, data)
end
end

# CEF requires seven pipe-terminated headers before optional extensions
context 'with a non-CEF payload' do
let(:logger_stub) { double('Logger').as_null_object }
before(:each) do
allow_any_instance_of(described_class).to receive(:logger).and_return(logger_stub)
end

context 'containing 0 header-like sections' do
let(:message) { 'this is not cef' }
it 'logs helpfully and produces a tagged event' do
do_decode(subject,message) do |event|
expect(event.get('tags')).to include('_cefparsefailure')
expect(event.get('message')).to eq(message)
end
expect(logger_stub).to have_received(:error)
.with(a_string_including('Failed to decode CEF payload. Generating failure event with payload in message field'),
a_hash_including(exception: a_string_including("found 0 of 7 required pipe-terminated header fields"),
original_data: message))
end
end
context 'containing 4 header-like sections' do
let(:message) { "a|b|c with several \\| escaped\\| pipes|d|bananas" }
it 'logs helpfully and produces a tagged event' do
do_decode(subject,message) do |event|
expect(event.get('tags')).to include('_cefparsefailure')
expect(event.get('message')).to eq(message)
end
expect(logger_stub).to have_received(:error)
.with(a_string_including('Failed to decode CEF payload. Generating failure event with payload in message field'),
a_hash_including(exception: a_string_including("found 4 of 7 required pipe-terminated header fields"),
original_data: message))
end
end
context 'containing non-key/value extensions' do
let (:message) { "CEF:0|security|threatmanager|1.0|100|trojan successfully stopped|10|this is in the extensions space but it is not valid because it is not equals-separated key/value" }
it 'logs helpfully and produces a tagged event' do
do_decode(subject,message) do |event|
expect(event.get('tags')).to include('_cefparsefailure')
expect(event.get('message')).to eq(message)
end
expect(logger_stub).to have_received(:error)
.with(a_string_including('Failed to decode CEF payload. Generating failure event with payload in message field'),
a_hash_including(exception: a_string_including("invalid extensions; keyless value present"),
original_data: message))
end
end
context 'containing unescaped newlines' do
# when not using a `delimiter`, we expect exactly one CEF log per call to decode.
let (:message) {
<<~EOMESSAGE
CEF:0|security|threatmanager|1.0|100|trojan successfully stopped|10|src=10.0.0.67
CEF:0|security|threatmanager|1.0|100|trojan successfully stopped|10|src=10.0.0.67
CEF:0|security|threatmanager|1.0|100|trojan successfully stopped|10|src=10.0.0.67
CEF:0|security|threatmanager|1.0|100|trojan successfully stopped|10|src=10.0.0.67
CEF:0|security|threatmanager|1.0|100|trojan successfully stopped|10|src=10.0.0.67
EOMESSAGE
}
it 'logs helpfully and produces a tagged event' do
do_decode(subject, message) do |event|
expect(event.get('tags')).to include('_cefparsefailure')
expect(event.get('message')).to eq(message)
end
expect(logger_stub).to have_received(:error)
.with(a_string_including('Failed to decode CEF payload. Generating failure event with payload in message field'),
a_hash_including(exception: a_string_including("message is not valid CEF because it contains unescaped newline characters",
"use the `delimiter` setting to enable in-codec buffering and delimiter-splitting"),
original_data: message))
end
end
end

context 'when a CEF header ends with a pair of properly-escaped backslashes' do
let(:backslash) { '\\' }
let(:pipe) { '|' }
Expand Down

0 comments on commit e34436f

Please sign in to comment.