Skip to content

Commit

Permalink
Merge pull request #486 from roaksoax/revert_lsf_removal
Browse files Browse the repository at this point in the history
Revert "Remove LSF from integration testing and core references (#481)"
  • Loading branch information
mashhurs authored Nov 20, 2023
2 parents f07049f + 491703e commit a6cdac9
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 2 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 6.7.2
- Restore accidentally removed Lumberjack event parse source lines [#486](https://github.com/logstash-plugins/logstash-input-beats/pull/486)

## 6.7.1
- bump netty to 4.1.100 and jackson to 2.15.3 [#484](https://github.com/logstash-plugins/logstash-input-beats/pull/484)

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6.7.1
6.7.2
7 changes: 7 additions & 0 deletions lib/logstash/inputs/beats/message_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class MessageListener
include org.logstash.beats.IMessageListener

FILEBEAT_LOG_LINE_FIELD = "message".freeze
LSF_LOG_LINE_FIELD = "line".freeze

ConnectionState = Struct.new(:ctx, :codec, :ip_address)

Expand Down Expand Up @@ -182,13 +183,19 @@ def set_nested(hash, field_name, value)
def extract_target_field(hash)
if from_filebeat?(hash)
hash.delete(FILEBEAT_LOG_LINE_FIELD).to_s
elsif from_logstash_forwarder?(hash)
hash.delete(LSF_LOG_LINE_FIELD).to_s
end
end

def from_filebeat?(hash)
!hash[FILEBEAT_LOG_LINE_FIELD].nil?
end

def from_logstash_forwarder?(hash)
!hash[LSF_LOG_LINE_FIELD].nil?
end

def increment_connection_count
current_connection_count = @connections_list.size
@metric.gauge(:current_connections, current_connection_count)
Expand Down
16 changes: 15 additions & 1 deletion lib/tasks/test.rake
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ elsif OS_PLATFORM == "darwin"
FILEBEAT_URL = "https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.6.0-darwin-x86_64.tar.gz"
end

LSF_URL = "https://download.elastic.co/logstash-forwarder/binaries/logstash-forwarder_#{OS_PLATFORM}_amd64"

require "fileutils"
@files=[]

Expand All @@ -25,9 +27,21 @@ namespace :test do
namespace :integration do
task :setup do
Rake::Task["test:integration:setup:filebeat"].invoke
Rake::Task["test:integration:setup:lsf"].invoke
end

namespace :setup do
desc "Download latest stable version of Logstash-forwarder"
task :lsf do
destination = File.join(VENDOR_PATH, "logstash-forwarder")
FileUtils.rm_rf(destination)
FileUtils.mkdir_p(destination)
download_destination = File.join(destination, "logstash-forwarder")
puts "Logstash-forwarder: downloading from #{LSF_URL} to #{download_destination}"
download(LSF_URL, download_destination)
File.chmod(0755, download_destination)
end

desc "Download nigthly filebeat for integration testing"
task :filebeat do
FileUtils.mkdir_p(VENDOR_PATH)
Expand All @@ -46,7 +60,7 @@ namespace :test do
end

# Uncompress all the file from the archive this only work with
# one level directory structure and filebeat packaging.
# one level directory structure and its fine for LSF and filebeat packaging.
def untar_all(file, destination)
untar(file) do |entry|
out = entry.full_name.split("/").last
Expand Down
10 changes: 10 additions & 0 deletions spec/inputs/beats/message_listener_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,16 @@ def flush(&block)
end
end

context "when the message is from LSF" do
let(:message) { MockMessage.new("abc", { "line" => "hello world", '@metadata' => {} } )}

it "extract the event" do
subject.onNewMessage(ctx, message)
event = queue.pop
expect(event.get("message")).to eq("hello world")
end
end

it_behaves_like "when the message is from any libbeat", :disabled, "[@metadata][ip_address]"
it_behaves_like "when the message is from any libbeat", :v1, "[@metadata][input][beats][host][ip]"
it_behaves_like "when the message is from any libbeat", :v8, "[@metadata][input][beats][host][ip]"
Expand Down
108 changes: 108 additions & 0 deletions spec/integration/logstash_forwarder_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# encoding: utf-8
require "logstash/inputs/beats"
require "logstash/json"
require "fileutils"
require_relative "../support/flores_extensions"
require_relative "../support/file_helpers"
require_relative "../support/integration_shared_context"
require_relative "../support/client_process_helpers"
require "spec_helper"

LSF_BINARY = File.expand_path(File.join(File.dirname(__FILE__), "..", "..", "vendor", "logstash-forwarder", "logstash-forwarder"))

describe "Logstash-Forwarder", :integration => true do
include ClientProcessHelpers

before :all do
unless File.exist?(LSF_BINARY)
raise "Cannot find `logstash-forwarder` binary in `vendor/logstash-forwarder` Did you run `bundle exec rake test:integration:setup` before running the integration suite?"
end
end

include FileHelpers
include_context "beats configuration"

let(:client_wait_time) { 5 }

# Filebeat related variables
let(:cmd) { [lsf_exec, "-config", lsf_config_path] }

let(:lsf_exec) { LSF_BINARY }
let(:registry_file) { File.expand_path(File.join(File.dirname(__FILE__), "..", "..", ".logstash-forwarder")) }
let_tmp_file(:lsf_config_path) { lsf_config }
let(:log_file) { f = Stud::Temporary.file; f.close; f.path }
let(:lsf_config) do
<<-CONFIG
{
"network": {
"servers": [ "#{host}:#{port}" ],
"ssl ca": "#{certificate_authorities}"
},
"files": [
{ "paths": [ "#{log_file}" ] }
]
}
CONFIG
end
#

before :each do
FileUtils.rm_rf(registry_file) # ensure clean state between runs
start_client
sleep(1)
# let LSF start and than write the logs
File.open(log_file, "a") do |f|
f.write(events.join("\n") + "\n")
end
sleep(1) # give some time to the clients to pick up the changes
end

after :each do
stop_client
end

xcontext "Plain TCP" do
include ClientProcessHelpers

let(:certificate_authorities) { "" }

it "should not send any events" do
expect(queue.size).to eq(0), @execution_output
end
end

context "TLS" do
context "Server Verification" do
let(:input_config) do
super().merge({
"ssl_enabled" => true,
"ssl_certificate" => certificate_file,
"ssl_key" => certificate_key_file,
})
end

let(:certificate_data) { Flores::PKI.generate }
let_tmp_file(:certificate_file) { certificate_data.first }
let_tmp_file(:certificate_key_file) { convert_to_pkcs8(certificate_data.last) }
let(:certificate_authorities) { certificate_file }

context "self signed certificate" do
include_examples "send events"
end

context "with large batches" do
let(:number_of_events) { 10_000 }
include_examples "send events"
end

context "invalid CA on the client" do
let(:invalid_data) { Flores::PKI.generate }
let(:certificate_authorities) { f = Stud::Temporary.file; f.close; f.path }

it "should not send any events" do
expect(queue.size).to eq(0), @execution_output
end
end
end
end
end

0 comments on commit a6cdac9

Please sign in to comment.