diff --git a/CHANGELOG.md b/CHANGELOG.md index b5449aee..e6242e09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 12.0.6 + - Add headers reporting uncompressed size and doc count for bulk requests [#1217](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1217) + ## 12.0.5 - [DOC] Fix link to Logstash DLQ docs [#1214](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1214) diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 120d3e67..8d6e02cb 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -21,7 +21,8 @@ module LogStash; module Outputs; class ElasticSearch; # We wound up agreeing that a number greater than 10 MiB and less than 100MiB # made sense. We picked one on the lowish side to not use too much heap. TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB - + EVENT_COUNT_HEADER = "X-Elastic-Event-Count".freeze + UNCOMPRESSED_LENGTH_HEADER = "X-Elastic-Uncompressed-Request-Length".freeze class HttpClient attr_reader :client, :options, :logger, :pool, :action_count, :recv_count @@ -143,7 +144,11 @@ def bulk(actions) :payload_size => stream_writer.pos, :content_length => body_stream.size, :batch_offset => (index + 1 - batch_actions.size)) - bulk_responses << bulk_send(body_stream, batch_actions) + headers = { + EVENT_COUNT_HEADER => batch_actions.size.to_s, + UNCOMPRESSED_LENGTH_HEADER => stream_writer.pos.to_s + } + bulk_responses << bulk_send(body_stream, batch_actions, headers) body_stream.truncate(0) && body_stream.seek(0) stream_writer = gzip_writer(body_stream) if compression_level? batch_actions.clear @@ -159,7 +164,14 @@ def bulk(actions) :payload_size => stream_writer.pos, :content_length => body_stream.size, :batch_offset => (actions.size - batch_actions.size)) - bulk_responses << bulk_send(body_stream, batch_actions) if body_stream.size > 0 + + if body_stream.size > 0 + headers = { + EVENT_COUNT_HEADER => batch_actions.size.to_s, + UNCOMPRESSED_LENGTH_HEADER => stream_writer.pos.to_s + } + bulk_responses << bulk_send(body_stream, batch_actions, headers) + end body_stream.close unless compression_level? join_bulk_responses(bulk_responses) @@ -179,8 +191,8 @@ def join_bulk_responses(bulk_responses) } end - def bulk_send(body_stream, batch_actions) - params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {} + def bulk_send(body_stream, batch_actions, headers = {}) + params = compression_level? ? {:headers => headers.merge("Content-Encoding" => "gzip") } : { :headers => headers } begin response = @pool.post(@bulk_path, params, body_stream.string) diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index 8e65ca13..072f25d0 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '12.0.5' + s.version = '12.0.6' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/integration/outputs/compressed_indexing_spec.rb b/spec/integration/outputs/compressed_indexing_spec.rb index c1c30550..f6bb2ad8 100644 --- a/spec/integration/outputs/compressed_indexing_spec.rb +++ b/spec/integration/outputs/compressed_indexing_spec.rb @@ -36,7 +36,9 @@ { "Content-Encoding" => "gzip", "Content-Type" => "application/json", - 'x-elastic-product-origin' => 'logstash-output-elasticsearch' + 'x-elastic-product-origin' => 'logstash-output-elasticsearch', + 'X-Elastic-Event-Count' => anything, + 'X-Elastic-Uncompressed-Request-Length' => anything, } } diff --git a/spec/integration/outputs/index_spec.rb b/spec/integration/outputs/index_spec.rb index 35d031c4..496abdcc 100644 --- a/spec/integration/outputs/index_spec.rb +++ b/spec/integration/outputs/index_spec.rb @@ -215,12 +215,22 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false); it "sets the correct content-type header" do expected_manticore_opts = { - :headers => {"Content-Type" => "application/json", 'x-elastic-product-origin' => 'logstash-output-elasticsearch'}, + :headers => { + "Content-Type" => "application/json", + 'x-elastic-product-origin' => 'logstash-output-elasticsearch', + 'X-Elastic-Event-Count' => anything, + 'X-Elastic-Uncompressed-Request-Length' => anything + }, :body => anything } if secure expected_manticore_opts = { - :headers => {"Content-Type" => "application/json", 'x-elastic-product-origin' => 'logstash-output-elasticsearch'}, + :headers => { + "Content-Type" => "application/json", + 'x-elastic-product-origin' => 'logstash-output-elasticsearch', + 'X-Elastic-Event-Count' => anything, + 'X-Elastic-Uncompressed-Request-Length' => anything + }, :body => anything, :auth => { :user => user, diff --git a/spec/unit/outputs/elasticsearch/http_client_spec.rb b/spec/unit/outputs/elasticsearch/http_client_spec.rb index d4cee8f3..c2a93ac1 100644 --- a/spec/unit/outputs/elasticsearch/http_client_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client_spec.rb @@ -270,6 +270,83 @@ end end + context "the 'user-agent' header" do + let(:pool) { double("pool") } + let(:compression_level) { 6 } + let(:base_options) { super().merge( :client_settings => {:compression_level => compression_level}) } + let(:actions) { [ + ["index", {:_id=>nil, :_index=>"logstash"}, {"message_1"=> message_1}], + ["index", {:_id=>nil, :_index=>"logstash"}, {"message_2"=> message_2}], + ["index", {:_id=>nil, :_index=>"logstash"}, {"message_3"=> message_3}], + ]} + let(:message_1) { "hello" } + let(:message_2_size) { 1_000 } + let(:message_2) { SecureRandom.alphanumeric(message_2_size / 2 ) * 2 } + let(:message_3_size) { 1_000 } + let(:message_3) { "m" * message_3_size } + let(:messages_size) { message_1.size + message_2.size + message_3.size } + let(:action_overhead) { 42 + 16 + 2 } # header plus doc key size plus new line overhead per action + + let(:response) do + response = double("response") + allow(response).to receive(:code).and_return(response) + allow(response).to receive(:body).and_return({"errors" => false}.to_json) + response + end + + before(:each) do + subject.instance_variable_set("@pool", pool) + end + + it "carries bulk request's uncompressed size" do + expect(pool).to receive(:post) do |path, params, body| + headers = params.fetch(:headers, {}) + expect(headers["X-Elastic-Event-Count"]).to eq("3") + expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (messages_size + (action_overhead * 3)).to_s + end.and_return(response) + + subject.send(:bulk, actions) + end + context "without compression" do + let(:compression_level) { 0 } + it "carries bulk request's uncompressed size" do + expect(pool).to receive(:post) do |path, params, body| + headers = params.fetch(:headers, {}) + expect(headers["X-Elastic-Event-Count"]).to eq("3") + expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (messages_size + (action_overhead * 3)).to_s + end.and_return(response) + subject.send(:bulk, actions) + end + end + + context "with compressed messages over 20MB" do + let(:message_2_size) { 21_000_000 } + it "carries bulk request's uncompressed size" do + # only the first, tiny, message is sent first + expect(pool).to receive(:post) do |path, params, body| + headers = params.fetch(:headers, {}) + expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (message_1.size + action_overhead).to_s + expect(headers["X-Elastic-Event-Count"]).to eq("1") + end.and_return(response) + + # huge message_2 is sent afterwards alone + expect(pool).to receive(:post) do |path, params, body| + headers = params.fetch(:headers, {}) + expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (message_2.size + action_overhead).to_s + expect(headers["X-Elastic-Event-Count"]).to eq("1") + end.and_return(response) + + # finally medium message_3 is sent alone as well + expect(pool).to receive(:post) do |path, params, body| + headers = params.fetch(:headers, {}) + expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (message_3.size + action_overhead).to_s + expect(headers["X-Elastic-Event-Count"]).to eq("1") + end.and_return(response) + + subject.send(:bulk, actions) + end + end + end end describe "sniffing" do diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index ac713c5c..5d1d6628 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -770,7 +770,7 @@ end before(:each) do - allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO), instance_of(Array)) do |stream, actions| + allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO), instance_of(Array), instance_of(Hash)) do |stream, actions, headers| expect( stream.string ).to include '"foo":"bar1"' expect( stream.string ).to include '"foo":"bar2"' end.and_return(bulk_response, {"errors"=>false}) # let's make it go away (second call) to not retry indefinitely