Skip to content

Commit b770811

Browse files
committed
Add method to fetch messages in batch
Since FetchMessage is already reading messages from a fetched batch, this new method just hold the messages util the batchSize number of messages are read. Fixes #123
1 parent af1725f commit b770811

File tree

2 files changed

+62
-2
lines changed

2 files changed

+62
-2
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/segmentio/kafka-go
22

3-
go 1.23
3+
go 1.23.0
44

55
require (
66
github.com/klauspost/compress v1.15.9

reader.go

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,66 @@ func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
912912
}
913913
}
914914

915+
func (r *Reader) FetchMessageBatch(ctx context.Context, batchSize int) ([]Message, error) {
916+
r.activateReadLag()
917+
msgBatch := make([]Message, 0, batchSize)
918+
919+
var i int
920+
for i <= batchSize {
921+
r.mutex.Lock()
922+
923+
if !r.closed && r.version == 0 {
924+
r.start(r.getTopicPartitionOffset())
925+
}
926+
927+
version := r.version
928+
r.mutex.Unlock()
929+
930+
select {
931+
case <-ctx.Done():
932+
return []Message{}, ctx.Err()
933+
934+
case err := <-r.runError:
935+
return []Message{}, err
936+
937+
case m, ok := <-r.msgs:
938+
if !ok {
939+
return []Message{}, io.EOF
940+
}
941+
942+
if m.version < version {
943+
continue
944+
}
945+
946+
r.mutex.Lock()
947+
948+
switch {
949+
case m.error != nil:
950+
case version == r.version:
951+
r.offset = m.message.Offset + 1
952+
r.lag = m.watermark - r.offset
953+
}
954+
955+
r.mutex.Unlock()
956+
957+
if errors.Is(m.error, io.EOF) {
958+
// io.EOF is used as a marker to indicate that the stream
959+
// has been closed, in case it was received from the inner
960+
// reader we don't want to confuse the program and replace
961+
// the error with io.ErrUnexpectedEOF.
962+
m.error = io.ErrUnexpectedEOF
963+
}
964+
if m.error != nil {
965+
return nil, m.error
966+
}
967+
968+
msgBatch = append(msgBatch, m.message)
969+
}
970+
i++
971+
}
972+
return msgBatch, nil
973+
}
974+
915975
// ReadLag returns the current lag of the reader by fetching the last offset of
916976
// the topic and partition and computing the difference between that value and
917977
// the offset of the last message returned by ReadMessage.
@@ -1487,7 +1547,7 @@ func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, star
14871547
return
14881548
}
14891549

1490-
func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
1550+
func (r *reader) read(ctx context.Context, offset int64, conn *Conn, batchSize int) (int64, error) {
14911551
r.stats.fetches.observe(1)
14921552
r.stats.offset.observe(offset)
14931553

0 commit comments

Comments
 (0)