Skip to content

Commit cfd82fa

Browse files
andselyaauie
andauthored
Use integration metadata to create ES actions (part 2) (#1158)
Change the creation of actions that are passed down to Elasticsearch to use also the metadata fields set by an integration. The interested fields are version, version_type and routing, the field values are taken verbatim without placeholders resolution. The version, version_type and routing that are configured in the plugin settings have precedence on the integration ones because manifest an explicit choice made by the user. Co-authored-by: Ry Biesemeyer <yaauie@users.noreply.github.com>
1 parent 47a5169 commit cfd82fa

File tree

4 files changed

+132
-6
lines changed

4 files changed

+132
-6
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 11.22.0
2+
- Added support for propagating event processing metadata when this output is downstream of an Elastic Integration Filter and configured _without_ explicit `version`, `version_type`, or `routing` directives [#1158](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1158)
3+
14
## 11.21.0
25
- Added support for propagating event processing metadata when this output is downstream of an Elastic Integration Filter and configured _without_ explicit `index`, `document_id`, or `pipeline` directives [#1155](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1155)
36

lib/logstash/outputs/elasticsearch.rb

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -499,9 +499,6 @@ def event_action_tuple(event)
499499
params[retry_on_conflict_action_name] = @retry_on_conflict
500500
end
501501

502-
params[:version] = event.sprintf(@version) if @version
503-
params[:version_type] = event.sprintf(@version_type) if @version_type
504-
505502
EventActionTuple.new(action, params, event)
506503
end
507504

@@ -541,12 +538,12 @@ def initialize(bad_action)
541538
# @private shared event params factory between index and data_stream mode
542539
def common_event_params(event)
543540
event_control = event.get("[@metadata][_ingest_document]")
544-
event_id, event_pipeline, event_index = event_control&.values_at("id","pipeline","index") rescue nil
541+
event_id, event_pipeline, event_index, event_routing, event_version, event_version_type = event_control&.values_at("id","pipeline","index", "routing", "version", "version_type") rescue nil
545542

546543
params = {
547544
:_id => resolve_document_id(event, event_id),
548545
:_index => resolve_index!(event, event_index),
549-
routing_field_name => @routing ? event.sprintf(@routing) : nil
546+
routing_field_name => resolve_routing(event, event_routing)
550547
}
551548

552549
target_pipeline = resolve_pipeline(event, event_pipeline)
@@ -557,9 +554,33 @@ def common_event_params(event)
557554
# }
558555
params[:pipeline] = target_pipeline unless (target_pipeline.nil? || target_pipeline.empty?)
559556

557+
resolved_version = resolve_version(event, event_version)
558+
resolved_version_type = resolve_version_type(event, event_version_type)
559+
# avoid to add nil valued key-value pairs
560+
params[:version] = resolved_version unless resolved_version.nil?
561+
params[:version_type] = resolved_version_type unless resolved_version_type.nil?
562+
560563
params
561564
end
562565

566+
def resolve_version(event, event_version)
567+
return event_version if event_version && !@version
568+
event.sprintf(@version) if @version
569+
end
570+
private :resolve_version
571+
572+
def resolve_version_type(event, event_version_type)
573+
return event_version_type if event_version_type && !@version_type
574+
event.sprintf(@version_type) if @version_type
575+
end
576+
private :resolve_version_type
577+
578+
def resolve_routing(event, event_routing)
579+
return event_routing if event_routing && !@routing
580+
@routing ? event.sprintf(@routing) : nil
581+
end
582+
private :resolve_routing
583+
563584
def resolve_document_id(event, event_id)
564585
return event.sprintf(@document_id) if @document_id
565586
return event_id || nil

logstash-output-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '11.21.0'
3+
s.version = '11.22.0'
44
s.licenses = ['apache-2.0']
55
s.summary = "Stores logs in Elasticsearch"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/unit/outputs/elasticsearch_spec.rb

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,108 @@
275275
let(:event_fields) {{}}
276276
let(:event) { LogStash::Event.new(event_fields)}
277277

278+
context "when plugin's version is specified" do
279+
let(:options) { super().merge("version" => "123")}
280+
281+
context "when the event contains an integration metadata version" do
282+
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version" => "456"}}}) }
283+
284+
it "plugin's version is used" do
285+
expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "123")
286+
end
287+
end
288+
289+
context "when the event DOESN'T contains an integration metadata version" do
290+
it "plugin's version is used" do
291+
expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "123")
292+
end
293+
end
294+
end
295+
296+
context "when plugin's version is NOT specified" do
297+
context "when the event contains an integration metadata version" do
298+
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version" => "456"}}}) }
299+
300+
it "event's metadata version is used" do
301+
expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "456")
302+
end
303+
end
304+
305+
context "when the event DOESN'T contain an integration metadata version" do
306+
it "plugin's default id mechanism is used" do
307+
expect(subject.send(:event_action_tuple, event)[1]).to_not include(:version)
308+
end
309+
end
310+
end
311+
312+
context "when plugin's version_type is specified" do
313+
let(:options) { super().merge("version_type" => "internal")}
314+
315+
context "when the event contains an integration metadata version_type" do
316+
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version_type" => "external"}}}) }
317+
318+
it "plugin's version_type is used" do
319+
expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "internal")
320+
end
321+
end
322+
323+
context "when the event DOESN'T contains an integration metadata version_type" do
324+
it "plugin's version_type is used" do
325+
expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "internal")
326+
end
327+
end
328+
end
329+
330+
context "when plugin's version_type is NOT specified" do
331+
context "when the event contains an integration metadata version_type" do
332+
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version_type" => "external"}}}) }
333+
334+
it "event's metadata version_type is used" do
335+
expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "external")
336+
end
337+
end
338+
339+
context "when the event DOESN'T contain an integration metadata version_type" do
340+
it "plugin's default id mechanism is used" do
341+
expect(subject.send(:event_action_tuple, event)[1]).to_not include(:version_type)
342+
end
343+
end
344+
end
345+
346+
context "when plugin's routing is specified" do
347+
let(:options) { super().merge("routing" => "settings_routing")}
348+
349+
context "when the event contains an integration metadata routing" do
350+
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"routing" => "meta-document-routing"}}}) }
351+
352+
it "plugin's routing is used" do
353+
expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => "settings_routing")
354+
end
355+
end
356+
357+
context "when the event DOESN'T contains an integration metadata routing" do
358+
it "plugin's routing is used" do
359+
expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => "settings_routing")
360+
end
361+
end
362+
end
363+
364+
context "when plugin's routing is NOT specified" do
365+
context "when the event contains an integration metadata routing" do
366+
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"routing" => "meta-document-routing"}}}) }
367+
368+
it "event's metadata routing is used" do
369+
expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => "meta-document-routing")
370+
end
371+
end
372+
373+
context "when the event DOESN'T contain an integration metadata routing" do
374+
it "plugin's default id mechanism is used" do
375+
expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => nil)
376+
end
377+
end
378+
end
379+
278380
context "when plugin's index is specified" do
279381
let(:options) { super().merge("index" => "index_from_settings")}
280382

0 commit comments

Comments
 (0)