Skip to content

Commit 963a7b6

Browse files
committed
Merge pull request #39 from rahul007ashok/38-gzip-compression-support
#38 - Support zlib compression
2 parents 93cc026 + 0f9cb69 commit 963a7b6

File tree

3 files changed

+99
-8
lines changed

3 files changed

+99
-8
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,12 @@ fails all retries an error log will be emitted.
207207
Boolean, default is false.
208208
In case you find error `Encoding::UndefinedConversionError` with multibyte texts, you can avoid that error with this option.
209209

210+
### zlib_compression
211+
212+
Boolean, default is false.
213+
Zlib compresses the message data blob.
214+
Each zlib compressed message must remain within megabyte in size.
215+
210216
### debug
211217

212218
Boolean. Enable if you need to debug Amazon Kinesis API call. Default is false.

lib/fluent/plugin/out_kinesis.rb

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
require 'logger'
1818
require 'securerandom'
1919
require 'fluent/plugin/version'
20+
require 'zlib'
2021

2122
module FluentPluginKinesis
2223
class OutputFilter < Fluent::BufferedOutput
@@ -57,6 +58,7 @@ class OutputFilter < Fluent::BufferedOutput
5758
config_param :order_events, :bool, default: false
5859
config_param :retries_on_putrecords, :integer, default: 3
5960
config_param :use_yajl, :bool, default: false
61+
config_param :zlib_compression, :bool, default: false
6062

6163
config_param :debug, :bool, default: false
6264

@@ -123,15 +125,15 @@ def format(tag, time, record)
123125
end
124126

125127
def write(chunk)
126-
data_list = chunk.to_enum(:msgpack_each).find_all{|record|
127-
unless record_exceeds_max_size?(record['data'])
128+
data_list = chunk.to_enum(:msgpack_each).map{|record|
129+
build_data_to_put(record)
130+
}.find_all{|record|
131+
unless record_exceeds_max_size?(record[:data])
128132
true
129133
else
130-
log.error sprintf('Record exceeds the %.3f KB(s) per-record size limit and will not be delivered: %s', PUT_RECORD_MAX_DATA_SIZE / 1024.0, record['data'])
134+
log.error sprintf('Record exceeds the %.3f KB(s) per-record size limit and will not be delivered: %s', PUT_RECORD_MAX_DATA_SIZE / 1024.0, record[:data])
131135
false
132136
end
133-
}.map{|record|
134-
build_data_to_put(record)
135137
}
136138

137139
if @order_events
@@ -223,7 +225,11 @@ def get_key(name, record)
223225
end
224226

225227
def build_data_to_put(data)
226-
Hash[data.map{|k, v| [k.to_sym, v] }]
228+
if @zlib_compression
229+
Hash[data.map{|k, v| [k.to_sym, k=="data" ? Zlib::Deflate.deflate(v) : v] }]
230+
else
231+
Hash[data.map{|k, v| [k.to_sym, v] }]
232+
end
227233
end
228234

229235
def put_record_for_order_events(data_list)

test/plugin/test_out_kinesis.rb

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ def setup
3030
use_yajl true
3131
]
3232

33+
CONFIG_WITH_COMPRESSION = CONFIG + %[
34+
zlib_compression true
35+
]
36+
37+
CONFIG_YAJL_WITH_COMPRESSION = CONFIG_YAJL + %[
38+
zlib_compression true
39+
]
40+
3341
def create_driver(conf = CONFIG, tag='test')
3442
Fluent::Test::BufferedOutputTestDriver
3543
.new(FluentPluginKinesis::OutputFilter, tag).configure(conf)
@@ -239,7 +247,7 @@ def test_mode_configuration
239247

240248

241249
data("json"=>CONFIG, "yajl"=>CONFIG_YAJL)
242-
def test_format(config)
250+
def test_format_without_compression(config)
243251

244252
d = create_driver(config)
245253

@@ -278,6 +286,46 @@ def test_format(config)
278286
d.run
279287
end
280288

289+
data("json"=>CONFIG_WITH_COMPRESSION, "yajl"=>CONFIG_YAJL_WITH_COMPRESSION)
290+
def test_format_with_compression(config)
291+
292+
d = create_driver(config)
293+
294+
data1 = {"test_partition_key"=>"key1","a"=>1,"time"=>"2011-01-02T13:14:15Z","tag"=>"test"}
295+
data2 = {"test_partition_key"=>"key2","a"=>2,"time"=>"2011-01-02T13:14:15Z","tag"=>"test"}
296+
297+
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
298+
d.emit(data1, time)
299+
d.emit(data2, time)
300+
301+
d.expect_format({
302+
'data' => data1.to_json,
303+
'partition_key' => 'key1' }.to_msgpack
304+
)
305+
d.expect_format({
306+
'data' => data2.to_json,
307+
'partition_key' => 'key2' }.to_msgpack
308+
)
309+
310+
client = create_mock_client
311+
client.describe_stream(stream_name: 'test_stream')
312+
client.put_records(
313+
stream_name: 'test_stream',
314+
records: [
315+
{
316+
data: Zlib::Deflate.deflate(data1.to_json),
317+
partition_key: 'key1'
318+
},
319+
{
320+
data: Zlib::Deflate.deflate(data2.to_json),
321+
partition_key: 'key2'
322+
}
323+
]
324+
) { {} }
325+
326+
d.run
327+
end
328+
281329
def test_order_events
282330

283331
d = create_driver(CONFIG + "\norder_events true")
@@ -351,7 +399,7 @@ def test_format_at_lowlevel_with_more_options
351399
)
352400
end
353401

354-
def test_multibyte_with_yajl
402+
def test_multibyte_with_yajl_without_compression
355403

356404
d = create_driver(CONFIG_YAJL)
357405

@@ -382,6 +430,37 @@ def test_multibyte_with_yajl
382430
d.run
383431
end
384432

433+
def test_multibyte_with_yajl_with_compression
434+
435+
d = create_driver(CONFIG_YAJL_WITH_COMPRESSION)
436+
437+
data1 = {"test_partition_key"=>"key1","a"=>"\xE3\x82\xA4\xE3\x83\xB3\xE3\x82\xB9\xE3\x83\x88\xE3\x83\xBC\xE3\x83\xAB","time"=>"2011-01-02T13:14:15Z","tag"=>"test"}
438+
json = Yajl.dump(data1)
439+
data1["a"].force_encoding("ASCII-8BIT")
440+
441+
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
442+
d.emit(data1, time)
443+
444+
d.expect_format({
445+
'data' => json,
446+
'partition_key' => 'key1' }.to_msgpack
447+
)
448+
449+
client = create_mock_client
450+
client.describe_stream(stream_name: 'test_stream')
451+
client.put_records(
452+
stream_name: 'test_stream',
453+
records: [
454+
{
455+
data: Zlib::Deflate.deflate(json),
456+
partition_key: 'key1'
457+
}
458+
]
459+
) { {} }
460+
461+
d.run
462+
end
463+
385464
def test_get_key
386465
d = create_driver
387466
assert_equal(

0 commit comments

Comments
 (0)