Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Session handling reprise #130

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 5.4.3
- FIX: the input plugin's prior behaviour of opening a new database pool for each query (removed in `v5.4.1`) is restored, ensuring that infrequently-run schedules do not hold open connections to their databases indefinitely.

## 5.4.2
- Doc: described default_hash and tag_on_default_use interaction filter plugin [#122](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/122)
- Added new settings `statement_retry_attempts` and `statement_retry_attempts_wait_time` for retry of failed sql statement execution [#123](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/123)
Expand Down
23 changes: 5 additions & 18 deletions lib/logstash/inputs/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ def register
end
end

prepare_jdbc_connection

if @use_column_value
# Raise an error if @use_column_value is true, but no @tracking_column is set
if @tracking_column.nil?
Expand All @@ -283,7 +285,6 @@ def register
end

set_value_tracker LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self)
set_statement_handler LogStash::PluginMixins::Jdbc::StatementHandler.build_statement_handler(self)

@enable_encoding = [email protected]? || !@columns_charset.empty?

Expand All @@ -303,32 +304,18 @@ def register
converters[encoding] = converter
end
end

load_driver
begin
open_jdbc_connection
rescue Sequel::DatabaseConnectionError,
Sequel::DatabaseError,
Sequel::InvalidValue,
Java::JavaSql::SQLException => e
details = { exception: e.class, message: e.message }
details[:cause] = e.cause.inspect if e.cause
details[:backtrace] = e.backtrace if @logger.debug?
@logger.warn("Exception when executing JDBC query", details)
raise(LogStash::ConfigurationError, "Can't create a connection pool to the database")
end
end # def register

# test injection points
def set_statement_handler(handler)
@statement_handler = handler
def new_statement_handler
LogStash::PluginMixins::Jdbc::StatementHandler.build_statement_handler(self)
end

def set_value_tracker(instance)
@value_tracker = instance
end

def run(queue)
load_driver
if @schedule
# scheduler input thread name example: "[my-oracle]|input|jdbc|scheduler"
scheduler.cron(@schedule) { execute_query(queue) }
Expand Down
23 changes: 21 additions & 2 deletions lib/logstash/plugin_mixins/jdbc/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

# Tentative of abstracting JDBC logic to a mixin
# for potential reuse in other plugins (input/output)
#
# CAUTION: implementation of this "potential reuse" module is
# VERY tightly-coupled with the JDBC Input's implementation.
module LogStash module PluginMixins module Jdbc
module Jdbc
# This method is called when someone includes this module
Expand Down Expand Up @@ -115,7 +118,7 @@ def setup_jdbc_config

private
def jdbc_connect
sequel_opts = complete_sequel_opts(:pool_timeout => @jdbc_pool_timeout, :keep_reference => false)
sequel_opts = complete_sequel_opts(pool_timeout: @jdbc_pool_timeout, keep_reference: false, single_threaded: true)
retry_attempts = @connection_retry_attempts
loop do
retry_attempts -= 1
Expand Down Expand Up @@ -191,16 +194,27 @@ def open_jdbc_connection
else
@database.identifier_output_method = :to_s
end

@statement_handler = new_statement_handler
end

public
def prepare_jdbc_connection
@connection_lock = ReentrantLock.new
end

public
def close_jdbc_connection
begin
# pipeline restarts can also close the jdbc connection, block until the current executing statement is finished to avoid leaking connections
# connections in use won't really get closed
@database.disconnect if @database
@connection_lock.lock
@database&.disconnect
@statement_handler = nil
rescue => e
@logger.warn("Failed to close connection", :exception => e)
ensure
@connection_lock.unlock
end
end

Expand All @@ -211,6 +225,8 @@ def execute_statement

begin
retry_attempts -= 1
@connection_lock.lock
open_jdbc_connection
sql_last_value = @use_column_value ? @value_tracker.value : Time.now.utc
@tracking_column_warning_sent = false
@statement_handler.perform_query(@database, @value_tracker.value) do |row|
Expand All @@ -233,6 +249,9 @@ def execute_statement
end
else
@value_tracker.set_value(sql_last_value)
ensure
close_jdbc_connection
@connection_lock.unlock
end

return success
Expand Down
42 changes: 16 additions & 26 deletions lib/logstash/plugin_mixins/jdbc/statement_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,14 @@ def perform_query(db, sql_last_value)
end

class PreparedStatementHandler < StatementHandler
attr_reader :name, :bind_values_array, :statement_prepared, :prepared, :parameters
attr_reader :name

def initialize(plugin)
super(plugin)
@name = plugin.prepared_statement_name.to_sym
@bind_values_array = plugin.prepared_statement_bind_values
@parameters = plugin.parameters
@statement_prepared = Concurrent::AtomicBoolean.new(false)

@positional_bind_mapping = create_positional_bind_mapping(plugin.prepared_statement_bind_values).freeze
@positional_bind_placeholders = @positional_bind_mapping.keys.map { |v| :"$#{v}" }.freeze
end

# Performs the query, ignoring our pagination settings, yielding once per row of data
Expand All @@ -148,41 +148,31 @@ def perform_query(db, sql_last_value)
private

def build_query(db, sql_last_value)
@parameters = create_bind_values_hash
if statement_prepared.false?
prepended = parameters.keys.map{|v| v.to_s.prepend("$").to_sym}
@prepared = db[statement, *prepended].prepare(:select, name)
statement_prepared.make_true
end
# under the scheduler the Sequel database instance is recreated each time
# so the previous prepared statements are lost, add back
if db.prepared_statement(name).nil?
prepared = db[statement, *positional_bind_placeholders].prepare(:select, name)

db.set_prepared_statement(name, prepared)
end
bind_value_sql_last_value(sql_last_value)
begin
db.call(name, parameters)
rescue => e
# clear the statement prepared flag - the statement may be closed by this
# time.
statement_prepared.make_false
raise e
end

db.call(name, positional_bind_mapping(sql_last_value))
end

def create_bind_values_hash
def create_positional_bind_mapping(bind_values_array)
hash = {}
bind_values_array.each_with_index {|v,i| hash[:"p#{i}"] = v}
hash
end

def bind_value_sql_last_value(sql_last_value)
parameters.keys.each do |key|
value = parameters[key]
if value == ":sql_last_value"
parameters[key] = sql_last_value
end
def positional_bind_mapping(sql_last_value)
@positional_bind_mapping.transform_values do |value|
value == ":sql_last_value" ? sql_last_value : value
end
end

def positional_bind_placeholders
@positional_bind_mapping.keys.map { |v| :"$#{v}" }.freeze
end
end
end end end
12 changes: 9 additions & 3 deletions spec/inputs/integration/integ_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@
end

it "should not register correctly" do
plugin.register
q = Queue.new
expect do
plugin.register
plugin.run(q)
end.to raise_error(::LogStash::PluginLoadingError)
end
end
Expand All @@ -90,21 +92,25 @@
end

it "log warning msg when plugin run" do
plugin.register
expect( plugin ).to receive(:log_java_exception)
expect(plugin.logger).to receive(:warn).once.with("Exception when executing JDBC query",
hash_including(:message => instance_of(String)))
expect{ plugin.register }.to raise_error(::LogStash::ConfigurationError)
q = Queue.new
expect{ plugin.run(q) }.not_to raise_error
end

it "should log (native) Java driver error" do
plugin.register
expect( org.apache.logging.log4j.LogManager ).to receive(:getLogger).and_wrap_original do |m, *args|
logger = m.call(*args)
expect( logger ).to receive(:error) do |_, e|
expect( e ).to be_a org.postgresql.util.PSQLException
end.and_call_original
logger
end
expect{ plugin.register }.to raise_error(::LogStash::ConfigurationError)
q = Queue.new
expect{ plugin.run(q) }.not_to raise_error
end
end
end
Expand Down
14 changes: 11 additions & 3 deletions spec/inputs/jdbc_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,9 @@
queue = Queue.new
plugin.register

handler = plugin.instance_variable_get(:@statement_handler)
handler = double('StatementHandler')
expect(plugin).to receive(:new_statement_handler).and_return(handler).once

allow(handler).to receive(:perform_query).with(instance_of(Sequel::JDBC::Database), instance_of(Time)).and_raise(Sequel::PoolTimeout)
expect(plugin.logger).to receive(:error).with("Unable to execute statement. Trying again.")
expect(plugin.logger).to receive(:error).with("Unable to execute statement. Tried 2 times.")
Expand All @@ -1373,7 +1375,9 @@
queue = Queue.new
plugin.register

handler = plugin.instance_variable_get(:@statement_handler)
handler = double('StatementHandler')
expect(plugin).to receive(:new_statement_handler).and_return(handler).once

allow(handler).to receive(:perform_query).with(instance_of(Sequel::JDBC::Database), instance_of(Time)).and_call_original
expect(plugin.logger).not_to receive(:error)

Expand Down Expand Up @@ -1593,12 +1597,16 @@
{ "statement" => "SELECT * from types_table", "jdbc_driver_library" => invalid_driver_jar_path }
end

before do
plugin.register
end

after do
plugin.stop
end

it "raise a loading error" do
expect { plugin.register }.
expect { plugin.run(queue) }.
to raise_error(LogStash::PluginLoadingError, /unable to load .*? from :jdbc_driver_library, file not readable/)
end
end
Expand Down