Skip to content

Commit 0f9cb69

Browse files
rahulashokRahul Ashok
authored andcommitted
#38 - Support zlib compression
1 parent cc3bff8 commit 0f9cb69

File tree

3 files changed

+99
-8
lines changed

3 files changed

+99
-8
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,12 @@ fails all retries an error log will be emitted.
203203
Boolean, default is false.
204204
In case you find error `Encoding::UndefinedConversionError` with multibyte texts, you can avoid that error with this option.
205205

206+
### zlib_compression
207+
208+
Boolean, default is false.
209+
Zlib compresses the message data blob.
210+
Each zlib compressed message must remain within megabyte in size.
211+
206212
### debug
207213

208214
Boolean. Enable if you need to debug Amazon Kinesis API call. Default is false.

lib/fluent/plugin/out_kinesis.rb

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
require 'logger'
1818
require 'securerandom'
1919
require 'fluent/plugin/version'
20+
require 'zlib'
2021

2122
module FluentPluginKinesis
2223
class OutputFilter < Fluent::BufferedOutput
@@ -56,6 +57,7 @@ class OutputFilter < Fluent::BufferedOutput
5657
config_param :order_events, :bool, default: false
5758
config_param :retries_on_putrecords, :integer, default: 3
5859
config_param :use_yajl, :bool, default: false
60+
config_param :zlib_compression, :bool, default: false
5961

6062
config_param :debug, :bool, default: false
6163

@@ -120,15 +122,15 @@ def format(tag, time, record)
120122
end
121123

122124
def write(chunk)
123-
data_list = chunk.to_enum(:msgpack_each).find_all{|record|
124-
unless record_exceeds_max_size?(record['data'])
125+
data_list = chunk.to_enum(:msgpack_each).map{|record|
126+
build_data_to_put(record)
127+
}.find_all{|record|
128+
unless record_exceeds_max_size?(record[:data])
125129
true
126130
else
127-
log.error sprintf('Record exceeds the %.3f KB(s) per-record size limit and will not be delivered: %s', PUT_RECORD_MAX_DATA_SIZE / 1024.0, record['data'])
131+
log.error sprintf('Record exceeds the %.3f KB(s) per-record size limit and will not be delivered: %s', PUT_RECORD_MAX_DATA_SIZE / 1024.0, record[:data])
128132
false
129133
end
130-
}.map{|record|
131-
build_data_to_put(record)
132134
}
133135

134136
if @order_events
@@ -220,7 +222,11 @@ def get_key(name, record)
220222
end
221223

222224
def build_data_to_put(data)
223-
Hash[data.map{|k, v| [k.to_sym, v] }]
225+
if @zlib_compression
226+
Hash[data.map{|k, v| [k.to_sym, k=="data" ? Zlib::Deflate.deflate(v) : v] }]
227+
else
228+
Hash[data.map{|k, v| [k.to_sym, v] }]
229+
end
224230
end
225231

226232
def put_record_for_order_events(data_list)

test/plugin/test_out_kinesis.rb

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ def setup
3030
use_yajl true
3131
]
3232

33+
CONFIG_WITH_COMPRESSION = CONFIG + %[
34+
zlib_compression true
35+
]
36+
37+
CONFIG_YAJL_WITH_COMPRESSION = CONFIG_YAJL + %[
38+
zlib_compression true
39+
]
40+
3341
def create_driver(conf = CONFIG, tag='test')
3442
Fluent::Test::BufferedOutputTestDriver
3543
.new(FluentPluginKinesis::OutputFilter, tag).configure(conf)
@@ -237,7 +245,7 @@ def test_mode_configuration
237245

238246

239247
data("json"=>CONFIG, "yajl"=>CONFIG_YAJL)
240-
def test_format(config)
248+
def test_format_without_compression(config)
241249

242250
d = create_driver(config)
243251

@@ -276,6 +284,46 @@ def test_format(config)
276284
d.run
277285
end
278286

287+
data("json"=>CONFIG_WITH_COMPRESSION, "yajl"=>CONFIG_YAJL_WITH_COMPRESSION)
288+
def test_format_with_compression(config)
289+
290+
d = create_driver(config)
291+
292+
data1 = {"test_partition_key"=>"key1","a"=>1,"time"=>"2011-01-02T13:14:15Z","tag"=>"test"}
293+
data2 = {"test_partition_key"=>"key2","a"=>2,"time"=>"2011-01-02T13:14:15Z","tag"=>"test"}
294+
295+
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
296+
d.emit(data1, time)
297+
d.emit(data2, time)
298+
299+
d.expect_format({
300+
'data' => data1.to_json,
301+
'partition_key' => 'key1' }.to_msgpack
302+
)
303+
d.expect_format({
304+
'data' => data2.to_json,
305+
'partition_key' => 'key2' }.to_msgpack
306+
)
307+
308+
client = create_mock_client
309+
client.describe_stream(stream_name: 'test_stream')
310+
client.put_records(
311+
stream_name: 'test_stream',
312+
records: [
313+
{
314+
data: Zlib::Deflate.deflate(data1.to_json),
315+
partition_key: 'key1'
316+
},
317+
{
318+
data: Zlib::Deflate.deflate(data2.to_json),
319+
partition_key: 'key2'
320+
}
321+
]
322+
) { {} }
323+
324+
d.run
325+
end
326+
279327
def test_order_events
280328

281329
d = create_driver(CONFIG + "\norder_events true")
@@ -349,7 +397,7 @@ def test_format_at_lowlevel_with_more_options
349397
)
350398
end
351399

352-
def test_multibyte_with_yajl
400+
def test_multibyte_with_yajl_without_compression
353401

354402
d = create_driver(CONFIG_YAJL)
355403

@@ -380,6 +428,37 @@ def test_multibyte_with_yajl
380428
d.run
381429
end
382430

431+
def test_multibyte_with_yajl_with_compression
432+
433+
d = create_driver(CONFIG_YAJL_WITH_COMPRESSION)
434+
435+
data1 = {"test_partition_key"=>"key1","a"=>"\xE3\x82\xA4\xE3\x83\xB3\xE3\x82\xB9\xE3\x83\x88\xE3\x83\xBC\xE3\x83\xAB","time"=>"2011-01-02T13:14:15Z","tag"=>"test"}
436+
json = Yajl.dump(data1)
437+
data1["a"].force_encoding("ASCII-8BIT")
438+
439+
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
440+
d.emit(data1, time)
441+
442+
d.expect_format({
443+
'data' => json,
444+
'partition_key' => 'key1' }.to_msgpack
445+
)
446+
447+
client = create_mock_client
448+
client.describe_stream(stream_name: 'test_stream')
449+
client.put_records(
450+
stream_name: 'test_stream',
451+
records: [
452+
{
453+
data: Zlib::Deflate.deflate(json),
454+
partition_key: 'key1'
455+
}
456+
]
457+
) { {} }
458+
459+
d.run
460+
end
461+
383462
def test_get_key
384463
d = create_driver
385464
assert_equal(

0 commit comments

Comments
 (0)