Skip to content

Commit 4788faf

Browse files
authored
feat: reuse decompressed buffer to avoid allocating buffer whenever a batch is read (#920)
1 parent 294fbdb commit 4788faf

File tree

1 file changed

+17
-13
lines changed

1 file changed

+17
-13
lines changed

message_reader.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ type messageSetReader struct {
2121
//
2222
// This is used to detect truncation of the response.
2323
lengthRemain int
24+
25+
decompressed bytes.Buffer
2426
}
2527

2628
type readerStack struct {
@@ -162,14 +164,15 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB
162164
if err = r.discardN(4); err != nil {
163165
return
164166
}
167+
165168
// read and decompress the contained message set.
166-
var decompressed bytes.Buffer
167-
if err = r.readBytesWith(func(r *bufio.Reader, sz int, n int) (remain int, err error) {
169+
r.decompressed.Reset()
170+
if err = r.readBytesWith(func(br *bufio.Reader, sz int, n int) (remain int, err error) {
168171
// x4 as a guess that the average compression ratio is near 75%
169-
decompressed.Grow(4 * n)
170-
limitReader := io.LimitedReader{R: r, N: int64(n)}
172+
r.decompressed.Grow(4 * n)
173+
limitReader := io.LimitedReader{R: br, N: int64(n)}
171174
codecReader := codec.NewReader(&limitReader)
172-
_, err = decompressed.ReadFrom(codecReader)
175+
_, err = r.decompressed.ReadFrom(codecReader)
173176
remain = sz - (n - int(limitReader.N))
174177
codecReader.Close()
175178
return
@@ -184,7 +187,7 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB
184187
// messages at offsets 10-13, then the container message will have
185188
// offset 13 and the contained messages will be 0,1,2,3. the base
186189
// offset for the container, then is 13-3=10.
187-
if offset, err = extractOffset(offset, decompressed.Bytes()); err != nil {
190+
if offset, err = extractOffset(offset, r.decompressed.Bytes()); err != nil {
188191
return
189192
}
190193

@@ -196,8 +199,8 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB
196199
// Allocate a buffer of size 0, which gets capped at 16 bytes
197200
// by the bufio package. We are already reading buffered data
198201
// here, no need to reserve another 4KB buffer.
199-
reader: bufio.NewReaderSize(&decompressed, 0),
200-
remain: decompressed.Len(),
202+
reader: bufio.NewReaderSize(&r.decompressed, 0),
203+
remain: r.decompressed.Len(),
201204
base: offset,
202205
parent: r.readerStack,
203206
}
@@ -263,19 +266,20 @@ func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readByt
263266
err = fmt.Errorf("batch remain < 0 (%d)", batchRemain)
264267
return
265268
}
266-
var decompressed bytes.Buffer
267-
decompressed.Grow(4 * batchRemain)
269+
r.decompressed.Reset()
270+
// x4 as a guess that the average compression ratio is near 75%
271+
r.decompressed.Grow(4 * batchRemain)
268272
limitReader := io.LimitedReader{R: r.reader, N: int64(batchRemain)}
269273
codecReader := codec.NewReader(&limitReader)
270-
_, err = decompressed.ReadFrom(codecReader)
274+
_, err = r.decompressed.ReadFrom(codecReader)
271275
codecReader.Close()
272276
if err != nil {
273277
return
274278
}
275279
r.remain -= batchRemain - int(limitReader.N)
276280
r.readerStack = &readerStack{
277-
reader: bufio.NewReaderSize(&decompressed, 0), // the new stack reads from the decompressed buffer
278-
remain: decompressed.Len(),
281+
reader: bufio.NewReaderSize(&r.decompressed, 0), // the new stack reads from the decompressed buffer
282+
remain: r.decompressed.Len(),
279283
base: -1, // base is unused here
280284
parent: r.readerStack,
281285
header: r.header,

0 commit comments

Comments
 (0)