Skip to content

ECS Compatiblity #57

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .ci/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ set -ex

export USER='logstash'

bundle exec rspec spec && bundle exec rspec spec --tag integration
bundle exec rspec spec --format documentation
bundle exec rspec spec --tag integration --format documentation
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## 5.1.0
- ECS Compatibility Improvements [#57](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/57)
- JDBC Input:
- Added `target` option, making it easier for users to avoid populating the top-level namespace of events.
- Added helpful warning for instances of this plugin that are run in an ECS-compatibility mode without specifying a `target`, since this configuration is likely to cause events to violate the Elastic Common Schema.
- JDBC Streaming Filter:
- Fixed validation of existing `target` option to occur before plugin starts up, preventing runtime crashes that would occur when an invalid field reference was supplied

## 5.0.6
- DOC:Replaced plugin_header file with plugin_header-integration file. [#40](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/40)

Expand Down
4 changes: 2 additions & 2 deletions docs/filter-jdbc_streaming.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-statement>> |<<string,string>>|Yes
| <<plugins-{type}s-{plugin}-tag_on_default_use>> |<<array,array>>|No
| <<plugins-{type}s-{plugin}-tag_on_failure>> |<<array,array>>|No
| <<plugins-{type}s-{plugin}-target>> |<<string,string>>|Yes
| <<plugins-{type}s-{plugin}-target>> |<<field-reference,field-reference>>|Yes
| <<plugins-{type}s-{plugin}-use_cache>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-use_prepared_statements>> |<<boolean,boolean>>|No
|=======================================================================
Expand Down Expand Up @@ -292,7 +292,7 @@ Append values to the `tags` field if sql error occurred.
===== `target`

* This is a required setting.
* Value type is <<string,string>>
* Value type is <<field-reference,field-reference>>
* There is no default value for this setting.

Define the target field to store the extracted result(s).
Expand Down
35 changes: 35 additions & 0 deletions docs/input-jdbc.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ input {
}
---------------------------------------------------------------------------------------------------

==== Compatibility with the Elastic Common Schema (ECS)

This plugin will produce events to your pipeline in the shape produced by
your SQL query, and on its own doesn't ensure compatibility with ECS. To
make it easier to produce ECS-compliant events, a pipeline can specify a
<<plugins-{type}s-{plugin}-target>> field in which to place the raw record.

When run in an <<plugins-{type}s-{plugin}-ecs_compatibility>> mode, this
input will emit a log warning of the risk unless a <<plugins-{type}s-{plugin}-target>>
is specified.

[id="plugins-{type}s-{plugin}-options"]
==== Jdbc Input Configuration Options
Expand All @@ -186,6 +196,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-columns_charset>> |<<hash,hash>>|No
| <<plugins-{type}s-{plugin}-connection_retry_attempts>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-connection_retry_attempts_wait_time>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-ecs_compatibility>> | <<string,string>>|No
| <<plugins-{type}s-{plugin}-jdbc_connection_string>> |<<string,string>>|Yes
| <<plugins-{type}s-{plugin}-jdbc_default_timezone>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-jdbc_driver_class>> |<<string,string>>|Yes
Expand All @@ -211,6 +222,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-sql_log_level>> |<<string,string>>, one of `["fatal", "error", "warn", "info", "debug"]`|No
| <<plugins-{type}s-{plugin}-statement>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-statement_filepath>> |a valid filesystem path|No
| <<plugins-{type}s-{plugin}-target>> |<<field-reference>>|No
| <<plugins-{type}s-{plugin}-tracking_column>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-tracking_column_type>> |<<string,string>>, one of `["numeric", "timestamp"]`|No
| <<plugins-{type}s-{plugin}-use_column_value>> |<<boolean,boolean>>|No
Expand Down Expand Up @@ -268,6 +280,17 @@ Maximum number of times to try connecting to database

Number of seconds to sleep between connection attempts

[id="plugins-{type}s-{plugin}-ecs_compatibility"]
===== `ecs_compatibility`

* Value type is <<string,string>>
* Supported values are:
** `disabled`: no schema help is offered
** `v1`: logs a helpful warning on startup unless <<plugins-{type}s-{plugin}-target>> is specified
* Default value depends on which version of Logstash is running:
** When Logstash provides a `pipeline.ecs_compatibility` setting, its value is used as the default
** Otherwise, the default value is `disabled`.

[id="plugins-{type}s-{plugin}-jdbc_connection_string"]
===== `jdbc_connection_string`

Expand Down Expand Up @@ -535,6 +558,18 @@ with the `parameters` setting.

Path of file containing statement to execute

[id="plugins-{type}s-{plugin}-target"]
===== `target`

* Value type is https://www.elastic.co/guide/en/logstash/master/field-references-deepdive.html[field reference]
* There is no default value for this setting.

Without a `target`, events are created from each record's fields at the root level.
When the `target` is set to a field reference, the fields are nested in the target field instead.

This option can be useful to avoid populating unknown fields when a downstream schema such as ECS is enforced.
It is also possible to target an entry in the event's metadata, which will be available during event processing but not exported to your outputs (e.g., `target \=> "[@metadata][jdbc_record]"`).

[id="plugins-{type}s-{plugin}-tracking_column"]
===== `tracking_column`

Expand Down
5 changes: 4 additions & 1 deletion lib/logstash/filters/jdbc_streaming.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "logstash/plugin_mixins/jdbc_streaming/cache_payload"
require "logstash/plugin_mixins/jdbc_streaming/statement_handler"
require "logstash/plugin_mixins/jdbc_streaming/parameter_handler"
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'
require "lru_redux"

# This filter executes a SQL query and store the result set in the field
Expand Down Expand Up @@ -47,6 +48,8 @@
# }
#
module LogStash module Filters class JdbcStreaming < LogStash::Filters::Base
extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter

include LogStash::PluginMixins::Jdbc::Common
include LogStash::PluginMixins::JdbcStreaming

Expand All @@ -61,7 +64,7 @@ module LogStash module Filters class JdbcStreaming < LogStash::Filters::Base

# Define the target field to store the extracted result(s)
# Field is overwritten if exists
config :target, :validate => :string, :required => true
config :target, :validate => :field_reference, :required => true

# Define a default object to use when lookup fails to return a matching row.
# ensure that the key names of this object match the columns from the statement
Expand Down
25 changes: 24 additions & 1 deletion lib/logstash/inputs/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
require "logstash/namespace"
require "logstash/plugin_mixins/jdbc/common"
require "logstash/plugin_mixins/jdbc/jdbc"
require 'logstash/plugin_mixins/ecs_compatibility_support'
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'

# this require_relative returns early unless the JRuby version is between 9.2.0.0 and 9.2.8.0
require_relative "tzinfo_jruby_patch"
Expand Down Expand Up @@ -127,6 +129,9 @@
# ---------------------------------------------------------------------------------------------------
#
module LogStash module Inputs class Jdbc < LogStash::Inputs::Base
extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter

include LogStash::PluginMixins::ECSCompatibilitySupport
include LogStash::PluginMixins::Jdbc::Common
include LogStash::PluginMixins::Jdbc::Jdbc
config_name "jdbc"
Expand Down Expand Up @@ -161,6 +166,9 @@ module LogStash module Inputs class Jdbc < LogStash::Inputs::Base
# exactly once.
config :schedule, :validate => :string

# If set, the fields from each record will be added nested under the target instead of at the top-level
config :target, :valiadte => :field_reference

# Path to file with last run time
config :last_run_metadata_path, :validate => :string, :default => "#{ENV['HOME']}/.logstash_jdbc_last_run"

Expand Down Expand Up @@ -260,6 +268,13 @@ def register
converters[encoding] = converter
end
end

if target.nil? && ecs_compatibility != :disabled
logger.warn("The JDBC Input is configured to run in ECS Compatibility mode `#{ecs_compatibility}`, but without a specified `target`. " +
"Fields from your SQL query will be added directly to the root level of Events emitted by this input; " +
"depending on the shape of your SQL query, this may produce events that clash with the Elastic Common Schema." +
"To resolve, either provide a `target` or specify `ecs_compatibility => disabled`.")
end
end # def register

# test injection points
Expand Down Expand Up @@ -318,7 +333,7 @@ def execute_query(queue)
## do the necessary conversions to string elements
row = Hash[row.map { |k, v| [k.to_s, convert(k, v)] }]
end
event = LogStash::Event.new(row)
event = create_targeted_event(row)
decorate(event)
queue << event
end
Expand All @@ -327,6 +342,14 @@ def execute_query(queue)

private

def create_targeted_event(row)
return LogStash::Event.new(row) unless @target

event = LogStash::Event.new
event.set(@target, row)
event
end

def enable_encoding?
@enable_encoding
end
Expand Down
4 changes: 3 additions & 1 deletion logstash-integration-jdbc.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-jdbc'
s.version = '5.0.6'
s.version = '5.1.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Integration with JDBC - input and filter plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -26,6 +26,8 @@ Gem::Specification.new do |s|
s.add_development_dependency 'jar-dependencies', '~> 0.3'

s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-mixin-validator_support', '~> 1.0'
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~> 1.0'
# Restrict use of this plugin to versions of Logstash where support for integration plugins is present.
s.add_runtime_dependency "logstash-core", ">= 6.5.0"
s.add_runtime_dependency 'logstash-codec-plain'
Expand Down
17 changes: 17 additions & 0 deletions spec/filters/jdbc_streaming_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -347,5 +347,22 @@ class TestJdbcStreaming < JdbcStreaming
end
end

describe "using `target` parameter" do
let(:settings) do
{
"statement" => "SELECT 1",
"target" => target,
"jdbc_user" => ENV['USER'],
"jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver",
"jdbc_connection_string" => jdbc_connection_string,
}
end
context 'when target is not a valid field reference' do
let(:target) { '][ |\| \/ /\ |_ ][ [)' }
it 'refuses to register the plugin' do
expect { JdbcStreaming.new(settings) }.to raise_exception(LogStash::ConfigurationError)
end
end
end
end
end end
50 changes: 50 additions & 0 deletions spec/inputs/jdbc_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,56 @@
end
end

context "when using `target` config" do
let(:target) { "[@metadata][jdbc_raw]" }
let(:settings) do
{
"statement" => "SELECT * from types_table",
"target" => target,
}
end
before do
db << "INSERT INTO types_table (num, string, started_at, custom_time, ranking) VALUES (1, 'A test', '1999-12-31', '1999-12-31 23:59:59', 95.67)"

plugin.register
end

after do
plugin.stop
end

it "should place all columns in the target field" do
plugin.run(queue)
event = queue.pop
aggregate_failures('the resulting event') do
expect(event.get("[#{target}][num]")).to eq(1)
expect(event.get("[#{target}][string]")).to eq("A test")
expect(event.get("[#{target}][started_at]")).to be_a(LogStash::Timestamp)
expect(event.get("[#{target}][started_at]").to_s).to eq("1999-12-31T00:00:00.000Z")
expect(event.get("[#{target}][custom_time]")).to be_a(LogStash::Timestamp)
expect(event.get("[#{target}][custom_time]").to_s).to eq("1999-12-31T23:59:59.000Z")
expect(event.get("[#{target}][ranking]").to_f).to eq(95.67)
end
end
end

context 'when `target` is unspecified and `ecs_compatibilty` is requested' do
before(:each) do
allow(plugin).to receive(:logger).and_return(double('Logger').as_null_object)
end
let(:settings) do
{
"statement" => "SELECT * from types_table",
"ecs_compatibility" => "v1"
}
end
it 'emits a log warning when registered' do
plugin.register

expect(plugin.logger).to have_received(:warn).with(/ECS Compatibility mode/)
end
end

context "when debug logging and a count query raises a count related error" do
let(:settings) do
{ "statement" => "SELECT * from types_table" }
Expand Down