diff --git a/.ci/run.sh b/.ci/run.sh index 49b60b8..42b2e25 100755 --- a/.ci/run.sh +++ b/.ci/run.sh @@ -5,4 +5,5 @@ set -ex export USER='logstash' -bundle exec rspec spec && bundle exec rspec spec --tag integration +bundle exec rspec spec --format documentation +bundle exec rspec spec --tag integration --format documentation diff --git a/CHANGELOG.md b/CHANGELOG.md index d4e4ce0..45cd750 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +## 5.1.0 + - ECS Compatibility Improvements [#57](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/57) + - JDBC Input: + - Added `target` option, making it easier for users to avoid populating the top-level namespace of events. + - Added helpful warning for instances of this plugin that are run in an ECS-compatibility mode without specifying a `target`, since this configuration is likely to cause events to violate the Elastic Common Schema. + - JDBC Streaming Filter: + - Fixed validation of existing `target` option to occur before plugin starts up, preventing runtime crashes that would occur when an invalid field reference was supplied + ## 5.0.6 - DOC:Replaced plugin_header file with plugin_header-integration file. [#40](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/40) diff --git a/docs/filter-jdbc_streaming.asciidoc b/docs/filter-jdbc_streaming.asciidoc index 84cfd2a..21a88f1 100644 --- a/docs/filter-jdbc_streaming.asciidoc +++ b/docs/filter-jdbc_streaming.asciidoc @@ -110,7 +110,7 @@ This plugin supports the following configuration options plus the <> |<>|Yes | <> |<>|No | <> |<>|No -| <> |<>|Yes +| <> |<>|Yes | <> |<>|No | <> |<>|No |======================================================================= @@ -292,7 +292,7 @@ Append values to the `tags` field if sql error occurred. ===== `target` * This is a required setting. - * Value type is <> + * Value type is <> * There is no default value for this setting. Define the target field to store the extracted result(s). diff --git a/docs/input-jdbc.asciidoc b/docs/input-jdbc.asciidoc index 16f37b7..7861594 100644 --- a/docs/input-jdbc.asciidoc +++ b/docs/input-jdbc.asciidoc @@ -173,6 +173,16 @@ input { } --------------------------------------------------------------------------------------------------- +==== Compatibility with the Elastic Common Schema (ECS) + +This plugin will produce events to your pipeline in the shape produced by +your SQL query, and on its own doesn't ensure compatibility with ECS. To +make it easier to produce ECS-compliant events, a pipeline can specify a +<> field in which to place the raw record. + +When run in an <> mode, this +input will emit a log warning of the risk unless a <> +is specified. [id="plugins-{type}s-{plugin}-options"] ==== Jdbc Input Configuration Options @@ -186,6 +196,7 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |<>|No | <> |<>|No +| <> | <>|No | <> |<>|Yes | <> |<>|No | <> |<>|Yes @@ -211,6 +222,7 @@ This plugin supports the following configuration options plus the <> |<>, one of `["fatal", "error", "warn", "info", "debug"]`|No | <> |<>|No | <> |a valid filesystem path|No +| <> |<>|No | <> |<>|No | <> |<>, one of `["numeric", "timestamp"]`|No | <> |<>|No @@ -268,6 +280,17 @@ Maximum number of times to try connecting to database Number of seconds to sleep between connection attempts +[id="plugins-{type}s-{plugin}-ecs_compatibility"] +===== `ecs_compatibility` + +* Value type is <> +* Supported values are: +** `disabled`: no schema help is offered +** `v1`: logs a helpful warning on startup unless <> is specified +* Default value depends on which version of Logstash is running: +** When Logstash provides a `pipeline.ecs_compatibility` setting, its value is used as the default +** Otherwise, the default value is `disabled`. + [id="plugins-{type}s-{plugin}-jdbc_connection_string"] ===== `jdbc_connection_string` @@ -535,6 +558,18 @@ with the `parameters` setting. Path of file containing statement to execute +[id="plugins-{type}s-{plugin}-target"] +===== `target` + +* Value type is https://www.elastic.co/guide/en/logstash/master/field-references-deepdive.html[field reference] +* There is no default value for this setting. + +Without a `target`, events are created from each record's fields at the root level. +When the `target` is set to a field reference, the fields are nested in the target field instead. + +This option can be useful to avoid populating unknown fields when a downstream schema such as ECS is enforced. +It is also possible to target an entry in the event's metadata, which will be available during event processing but not exported to your outputs (e.g., `target \=> "[@metadata][jdbc_record]"`). + [id="plugins-{type}s-{plugin}-tracking_column"] ===== `tracking_column` diff --git a/lib/logstash/filters/jdbc_streaming.rb b/lib/logstash/filters/jdbc_streaming.rb index 90a4be7..85f2bda 100644 --- a/lib/logstash/filters/jdbc_streaming.rb +++ b/lib/logstash/filters/jdbc_streaming.rb @@ -6,6 +6,7 @@ require "logstash/plugin_mixins/jdbc_streaming/cache_payload" require "logstash/plugin_mixins/jdbc_streaming/statement_handler" require "logstash/plugin_mixins/jdbc_streaming/parameter_handler" +require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter' require "lru_redux" # This filter executes a SQL query and store the result set in the field @@ -47,6 +48,8 @@ # } # module LogStash module Filters class JdbcStreaming < LogStash::Filters::Base + extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter + include LogStash::PluginMixins::Jdbc::Common include LogStash::PluginMixins::JdbcStreaming @@ -61,7 +64,7 @@ module LogStash module Filters class JdbcStreaming < LogStash::Filters::Base # Define the target field to store the extracted result(s) # Field is overwritten if exists - config :target, :validate => :string, :required => true + config :target, :validate => :field_reference, :required => true # Define a default object to use when lookup fails to return a matching row. # ensure that the key names of this object match the columns from the statement diff --git a/lib/logstash/inputs/jdbc.rb b/lib/logstash/inputs/jdbc.rb index 4b138c4..e9b1bf1 100755 --- a/lib/logstash/inputs/jdbc.rb +++ b/lib/logstash/inputs/jdbc.rb @@ -3,6 +3,8 @@ require "logstash/namespace" require "logstash/plugin_mixins/jdbc/common" require "logstash/plugin_mixins/jdbc/jdbc" +require 'logstash/plugin_mixins/ecs_compatibility_support' +require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter' # this require_relative returns early unless the JRuby version is between 9.2.0.0 and 9.2.8.0 require_relative "tzinfo_jruby_patch" @@ -127,6 +129,9 @@ # --------------------------------------------------------------------------------------------------- # module LogStash module Inputs class Jdbc < LogStash::Inputs::Base + extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter + + include LogStash::PluginMixins::ECSCompatibilitySupport include LogStash::PluginMixins::Jdbc::Common include LogStash::PluginMixins::Jdbc::Jdbc config_name "jdbc" @@ -161,6 +166,9 @@ module LogStash module Inputs class Jdbc < LogStash::Inputs::Base # exactly once. config :schedule, :validate => :string + # If set, the fields from each record will be added nested under the target instead of at the top-level + config :target, :valiadte => :field_reference + # Path to file with last run time config :last_run_metadata_path, :validate => :string, :default => "#{ENV['HOME']}/.logstash_jdbc_last_run" @@ -260,6 +268,13 @@ def register converters[encoding] = converter end end + + if target.nil? && ecs_compatibility != :disabled + logger.warn("The JDBC Input is configured to run in ECS Compatibility mode `#{ecs_compatibility}`, but without a specified `target`. " + + "Fields from your SQL query will be added directly to the root level of Events emitted by this input; " + + "depending on the shape of your SQL query, this may produce events that clash with the Elastic Common Schema." + + "To resolve, either provide a `target` or specify `ecs_compatibility => disabled`.") + end end # def register # test injection points @@ -318,7 +333,7 @@ def execute_query(queue) ## do the necessary conversions to string elements row = Hash[row.map { |k, v| [k.to_s, convert(k, v)] }] end - event = LogStash::Event.new(row) + event = create_targeted_event(row) decorate(event) queue << event end @@ -327,6 +342,14 @@ def execute_query(queue) private + def create_targeted_event(row) + return LogStash::Event.new(row) unless @target + + event = LogStash::Event.new + event.set(@target, row) + event + end + def enable_encoding? @enable_encoding end diff --git a/logstash-integration-jdbc.gemspec b/logstash-integration-jdbc.gemspec index f6bb23c..2fda06a 100755 --- a/logstash-integration-jdbc.gemspec +++ b/logstash-integration-jdbc.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-integration-jdbc' - s.version = '5.0.6' + s.version = '5.1.0' s.licenses = ['Apache License (2.0)'] s.summary = "Integration with JDBC - input and filter plugins" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" @@ -26,6 +26,8 @@ Gem::Specification.new do |s| s.add_development_dependency 'jar-dependencies', '~> 0.3' s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" + s.add_runtime_dependency 'logstash-mixin-validator_support', '~> 1.0' + s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~> 1.0' # Restrict use of this plugin to versions of Logstash where support for integration plugins is present. s.add_runtime_dependency "logstash-core", ">= 6.5.0" s.add_runtime_dependency 'logstash-codec-plain' diff --git a/spec/filters/jdbc_streaming_spec.rb b/spec/filters/jdbc_streaming_spec.rb index ce19329..7eeffb8 100644 --- a/spec/filters/jdbc_streaming_spec.rb +++ b/spec/filters/jdbc_streaming_spec.rb @@ -347,5 +347,22 @@ class TestJdbcStreaming < JdbcStreaming end end + describe "using `target` parameter" do + let(:settings) do + { + "statement" => "SELECT 1", + "target" => target, + "jdbc_user" => ENV['USER'], + "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver", + "jdbc_connection_string" => jdbc_connection_string, + } + end + context 'when target is not a valid field reference' do + let(:target) { '][ |\| \/ /\ |_ ][ [)' } + it 'refuses to register the plugin' do + expect { JdbcStreaming.new(settings) }.to raise_exception(LogStash::ConfigurationError) + end + end + end end end end diff --git a/spec/inputs/jdbc_spec.rb b/spec/inputs/jdbc_spec.rb index 7147256..e6ed05f 100755 --- a/spec/inputs/jdbc_spec.rb +++ b/spec/inputs/jdbc_spec.rb @@ -1358,6 +1358,56 @@ end end + context "when using `target` config" do + let(:target) { "[@metadata][jdbc_raw]" } + let(:settings) do + { + "statement" => "SELECT * from types_table", + "target" => target, + } + end + before do + db << "INSERT INTO types_table (num, string, started_at, custom_time, ranking) VALUES (1, 'A test', '1999-12-31', '1999-12-31 23:59:59', 95.67)" + + plugin.register + end + + after do + plugin.stop + end + + it "should place all columns in the target field" do + plugin.run(queue) + event = queue.pop + aggregate_failures('the resulting event') do + expect(event.get("[#{target}][num]")).to eq(1) + expect(event.get("[#{target}][string]")).to eq("A test") + expect(event.get("[#{target}][started_at]")).to be_a(LogStash::Timestamp) + expect(event.get("[#{target}][started_at]").to_s).to eq("1999-12-31T00:00:00.000Z") + expect(event.get("[#{target}][custom_time]")).to be_a(LogStash::Timestamp) + expect(event.get("[#{target}][custom_time]").to_s).to eq("1999-12-31T23:59:59.000Z") + expect(event.get("[#{target}][ranking]").to_f).to eq(95.67) + end + end + end + + context 'when `target` is unspecified and `ecs_compatibilty` is requested' do + before(:each) do + allow(plugin).to receive(:logger).and_return(double('Logger').as_null_object) + end + let(:settings) do + { + "statement" => "SELECT * from types_table", + "ecs_compatibility" => "v1" + } + end + it 'emits a log warning when registered' do + plugin.register + + expect(plugin.logger).to have_received(:warn).with(/ECS Compatibility mode/) + end + end + context "when debug logging and a count query raises a count related error" do let(:settings) do { "statement" => "SELECT * from types_table" }