Skip to content

Fix: better logging + test last run value behavior #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .ci/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ set -ex

export USER='logstash'

bundle exec rspec spec && bundle exec rspec spec --tag integration
export LOG_LEVEL='trace'

jruby -rbundler/setup -S rspec -fd && jruby -rbundler/setup -S rspec -fd --tag integration
10 changes: 7 additions & 3 deletions .ci/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ create DATABASE jdbc_input_db;
CREATE TABLE employee (
emp_no integer NOT NULL,
first_name VARCHAR (50) NOT NULL,
last_name VARCHAR (50) NOT NULL
last_name VARCHAR (50) NOT NULL,
created_at DATE NOT NULL DEFAULT CURRENT_DATE,
updated_at TIMESTAMP
);

INSERT INTO employee VALUES (1, 'David', 'Blenkinsop');
INSERT INTO employee VALUES (2, 'Mark', 'Guckenheimer');
INSERT INTO employee VALUES (1, 'David', 'Blenkinsop', '2000-01-02', '2000-02-01 00:30:40');
INSERT INTO employee VALUES (2, 'Mark', 'Guckenheimer', '2000-01-01', '2020-01-01 20:30:40+00');
INSERT INTO employee VALUES (3, 'Ján', 'Borůvka', '2000-02-01', '2020-01-31 20:30:40+00');
INSERT INTO employee VALUES (4, 'Jožko', 'Šuška', '2010-01-01', NULL);
8 changes: 7 additions & 1 deletion lib/logstash/inputs/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ module LogStash module Inputs class Jdbc < LogStash::Inputs::Base

config :prepared_statement_bind_values, :validate => :array, :default => []

attr_reader :database # for test mocking/stubbing
attr_reader :database, :value_tracker # for test mocking/stubbing

public

Expand Down Expand Up @@ -260,6 +260,11 @@ def register
converters[encoding] = converter
end
end

require "sequel"
require "sequel/adapters/jdbc"

Sequel.application_timezone = @plugin_timezone.to_sym
end # def register

# test injection points
Expand All @@ -268,6 +273,7 @@ def set_statement_logger(instance)
end

def set_value_tracker(instance)
@logger.debug "using last run value tracker: #{instance.inspect}"
@value_tracker = instance
end

Expand Down
6 changes: 1 addition & 5 deletions lib/logstash/plugin_mixins/jdbc/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@ def complete_sequel_opts(defaults = {})
def load_driver
return @driver_impl if @driver_impl ||= nil

require "java"
require "sequel"
require "sequel/adapters/jdbc"

load_driver_jars
begin
@driver_impl = Sequel::JDBC.load_driver(@jdbc_driver_class)
rescue Sequel::AdapterNotFound => e # Sequel::AdapterNotFound, "#{@jdbc_driver_class} not loaded"
# fix this !!!
@logger.debug("driver loading failed", :exception => e.class, :message => e.message)
message = if jdbc_driver_library_set?
"Are you sure you've included the correct jdbc driver in :jdbc_driver_library?"
else
Expand Down
5 changes: 3 additions & 2 deletions lib/logstash/plugin_mixins/jdbc/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ def jdbc_connect

def open_jdbc_connection
# at this point driver is already loaded
Sequel.application_timezone = @plugin_timezone.to_sym

@database = jdbc_connect()
@database.extension(:pagination)
Expand Down Expand Up @@ -198,7 +197,8 @@ def execute_statement
begin
sql_last_value = @use_column_value ? @value_tracker.value : Time.now.utc
@tracking_column_warning_sent = false
@statement_handler.perform_query(@database, @value_tracker.value, @jdbc_paging_enabled, @jdbc_page_size) do |row|
@statement_handler.perform_query(@database, @value_tracker.value) do |row|
@logger.trace? && @logger.trace("result row:", row)
sql_last_value = get_column_value(row) if @use_column_value
yield extract_values_from(row)
end
Expand All @@ -208,6 +208,7 @@ def execute_statement
details[:backtrace] = e.backtrace if @logger.debug?
@logger.warn("Exception when executing JDBC query", details)
else
@logger.debug "last run value", :sql_last_value => sql_last_value
@value_tracker.set_value(sql_last_value)
ensure
close_jdbc_connection
Expand Down
82 changes: 46 additions & 36 deletions lib/logstash/plugin_mixins/jdbc/statement_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@
module LogStash module PluginMixins module Jdbc
class StatementHandler
def self.build_statement_handler(plugin, logger)
klass = plugin.use_prepared_statements ? PreparedStatementHandler : NormalStatementHandler
if plugin.use_prepared_statements
klass = PreparedStatementHandler
else
if plugin.jdbc_paging_enabled
klass = PagingStatementHandler
else
klass = NormalStatementHandler
end
end
klass.new(plugin, logger)
end

Expand All @@ -29,18 +37,10 @@ class NormalStatementHandler < StatementHandler
# @param db [Sequel::Database]
# @param sql_last_value [Integet|DateTime|Time]
# @yieldparam row [Hash{Symbol=>Object}]
def perform_query(db, sql_last_value, jdbc_paging_enabled, jdbc_page_size)
def perform_query(db, sql_last_value)
query = build_query(db, sql_last_value)
if jdbc_paging_enabled
query.each_page(jdbc_page_size) do |paged_dataset|
paged_dataset.each do |row|
yield row
end
end
else
query.each do |row|
yield row
end
query.each do |row|
yield row
end
end

Expand All @@ -67,14 +67,32 @@ def post_init(plugin)
end
end

class PagingStatementHandler < NormalStatementHandler

def initialize(plugin, statement_logger)
super(plugin, statement_logger)
@page_size = plugin.jdbc_page_size
end

def perform_query(db, sql_last_value)
query = build_query(db, sql_last_value)
query.each_page(@page_size) do |paged_dataset|
paged_dataset.each do |row|
yield row
end
end
end

end

class PreparedStatementHandler < StatementHandler
attr_reader :name, :bind_values_array, :statement_prepared, :prepared
attr_reader :name

# Performs the query, ignoring our pagination settings, yielding once per row of data
# @param db [Sequel::Database]
# @param sql_last_value [Integet|DateTime|Time]
# @yieldparam row [Hash{Symbol=>Object}]
def perform_query(db, sql_last_value, jdbc_paging_enabled, jdbc_page_size)
def perform_query(db, sql_last_value)
query = build_query(db, sql_last_value)
query.each do |row|
yield row
Expand All @@ -84,25 +102,24 @@ def perform_query(db, sql_last_value, jdbc_paging_enabled, jdbc_page_size)
private

def build_query(db, sql_last_value)
@parameters = create_bind_values_hash
if statement_prepared.false?
prepended = parameters.keys.map{|v| v.to_s.prepend("$").to_sym}
if @statement_prepared.false?
prepended = parameters.keys.map { |v| v.to_s.prepend('$').to_sym }
@prepared = db[statement, *prepended].prepare(:select, name)
statement_prepared.make_true
@statement_prepared.make_true
end
# under the scheduler the Sequel database instance is recreated each time
# so the previous prepared statements are lost, add back
if db.prepared_statement(name).nil?
db.set_prepared_statement(name, prepared)
db.set_prepared_statement(name, @prepared)
end
bind_value_sql_last_value(sql_last_value)
bind_sql_last_value_parameter(sql_last_value)
statement_logger.log_statement_parameters(statement, parameters, nil)
begin
db.call(name, parameters)
rescue => e
# clear the statement prepared flag - the statement may be closed by this
# time.
statement_prepared.make_false
@statement_prepared.make_false
raise e
end
end
Expand All @@ -113,24 +130,17 @@ def post_init(plugin)
@statement_logger.disable_count

@name = plugin.prepared_statement_name.to_sym
@bind_values_array = plugin.prepared_statement_bind_values
@parameters = plugin.parameters
@statement_prepared = Concurrent::AtomicBoolean.new(false)
end
@parameters = {} # plugin.parameters are ignored in favor of prepared_statement_bind_values
plugin.prepared_statement_bind_values.each_with_index { |v,i| @parameters[:"p#{i}"] = v }

sql_last_value_pair = @parameters.find { |_, value| value == ":sql_last_value" }
@sql_last_value_key = sql_last_value_pair ? sql_last_value_pair.first : nil

def create_bind_values_hash
hash = {}
bind_values_array.each_with_index {|v,i| hash[:"p#{i}"] = v}
hash
@statement_prepared = Concurrent::AtomicBoolean.new(false)
end

def bind_value_sql_last_value(sql_last_value)
parameters.keys.each do |key|
value = parameters[key]
if value == ":sql_last_value"
parameters[key] = sql_last_value
end
end
def bind_sql_last_value_parameter(sql_last_value)
parameters[@sql_last_value_key] = sql_last_value if @sql_last_value_key
end
end
end end end
Loading