Skip to content

Add method to fetch messages in batch #1390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,68 @@ func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
}
}

// FetchMessageBatch fetches a batch of messages from the reader. It is similar to
// FetchMessage, except it blocks until no. of messages read reaches batchSize.
func (r *Reader) FetchMessageBatch(ctx context.Context, batchSize int) ([]Message, error) {
r.activateReadLag()
msgBatch := make([]Message, 0, batchSize)

var i int
for i <= batchSize {
r.mutex.Lock()

if !r.closed && r.version == 0 {
r.start(r.getTopicPartitionOffset())
}

version := r.version
r.mutex.Unlock()

select {
case <-ctx.Done():
return []Message{}, ctx.Err()

case err := <-r.runError:
return []Message{}, err

case m, ok := <-r.msgs:
if !ok {
return []Message{}, io.EOF
}

if m.version < version {
continue
}

r.mutex.Lock()

switch {
case m.error != nil:
case version == r.version:
r.offset = m.message.Offset + 1
r.lag = m.watermark - r.offset
}

r.mutex.Unlock()

if errors.Is(m.error, io.EOF) {
// io.EOF is used as a marker to indicate that the stream
// has been closed, in case it was received from the inner
// reader we don't want to confuse the program and replace
// the error with io.ErrUnexpectedEOF.
m.error = io.ErrUnexpectedEOF
}
if m.error != nil {
return nil, m.error
}

msgBatch = append(msgBatch, m.message)
}
i++
}
return msgBatch, nil
}
Copy link
Author

@krsoninikhil krsoninikhil Jun 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplication of the code can be avoided by calling this method in FetchMessage. I'll refactor if once the approach gets reviewed.

Copy link

@ghaninia ghaninia Jun 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the number of messages doesn't reach the desired batchSize?

you changed the offset when the batch is processed, what happens if one of the messages in the batch fails? Is there any mechanism in place to handle that? Do you have any ideas for a fallback strategy for this?!

@krsoninikhil

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ghaninia for batch processing possible use manuall ack, maybe?

If one of message failed, we can ack all messages before failed, except message with problem.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if one of the messages in the batch fails

We should let consumer decide on how they want to handle it. They can commit or read again from the last commit. Let me know if there is a better approach to handle this.

if one of message failed, we can ack all messages before failed

This sounds good, we can do this. My only concern if consumer is processing batch by batch, it might confusing behavior that a part of the batch committed it's neither abort nor fully committed.

What happens if the number of messages doesn't reach the desired batchSize?

I see, if the current code changes look okay, I can add a ticker with a timeout for a maximum wait time. So it would be returning if there are some messages available but not the full batch.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@max107 @ghaninia let me know your thoughts.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krsoninikhil in my opinion

This sounds good, we can do this. My only concern if consumer is processing batch by batch, it might confusing behavior that a part of the batch committed it's neither abort nor fully committed.

It's absolute normal behavior. So if some process can't handle message correctly then should raise panic / return error / stop consuming messages and ack last successfully processed message. Whats next? Restart consumer? Raise panic? It's based on developer decision.

So we can imagine next situation - we receive 3 of 10 (batchSize) messages, we are not exceed deadline, we successfully handle first 2 messages, but failed on 3th message. We know, we can't handler 3 message so we can ack 1 and 2. In next attempt we receive 3,4,5... etc messages and can try again.

I see, if the current code changes look okay, I can add a ticker with a timeout for a maximum wait time. So it would be returning if there are some messages available but not the full batch.

also sounds very good, because all "batch processing" it's compromise between timeout and batchSize

Sorry, english is not my first language.

Copy link

@max107 max107 Jul 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, my simple batch consumer looks like:

package kafkamux

import (
	"context"
	"errors"
	"sync"
	"time"

	"github.com/rs/zerolog/log"
	"github.com/segmentio/kafka-go"
)

var (
	ErrSmallQueueCapacity = errors.New("batch lower than queue capacity")
)

func NewBatchConsumer(
	reader Reader,
	batchSize int,
	duration time.Duration,
) (*BatchConsumer, error) {
	if r, ok := reader.(*kafka.Reader); ok && r.Config().QueueCapacity < batchSize {
		return nil, ErrSmallQueueCapacity
	}

	return &BatchConsumer{
		reader:    reader,
		batchSize: batchSize,
		messages:  make([]kafka.Message, 0, batchSize),
		duration:  duration,
	}, nil
}

type BatchConsumer struct {
	reader    Reader
	batchSize int
	duration  time.Duration
	l         sync.Mutex
	messages  []kafka.Message
}

func (b *BatchConsumer) flush(ctx context.Context, fn handler.BatchCallback) error {
	l := log.Ctx(ctx)

	b.l.Lock()
	defer b.l.Unlock()

	if len(b.messages) == 0 {
		return nil
	}

	if err := fn(ctx, b.messages); err != nil {
		l.Err(err).Msg("error in callback")
		return werr.Wrap(err)
	}

	if err := b.reader.CommitMessages(ctx, b.messages...); err != nil {
		l.Err(err).Msg("error in commit messages")
		return werr.Wrap(err)
	}

	b.messages = make([]kafka.Message, 0, b.batchSize)

	return nil
}

func (b *BatchConsumer) Listen(ctx context.Context, fn handler.BatchCallback) error {
	l := log.Ctx(ctx)

	errCh := make(chan error, 1)

	msgCh := make(chan kafka.Message, b.batchSize)

	ticker := time.NewTicker(b.duration)
	defer ticker.Stop()

	go func() {
		defer close(msgCh)

		for {
			msg, err := fetchMessage(ctx, b.reader)
			if err != nil {
				errCh <- err
				return
			}

			msgCh <- msg
		}
	}()

	for {
		select {
		case readErr := <-errCh:
			l.Err(readErr).Msg("read message error, stop main loop")
			return nil

		case <-ctx.Done():
			l.Debug().Msg("context done, stop main loop")
			return nil

		case <-ticker.C:
			l.Debug().Int("messages_count", len(b.messages)).Msg("ticker flush")
			if err := b.flush(ctx, fn); err != nil {
				l.Err(err).Msg("error flushing messages")
				return werr.Wrap(err)
			}

		case msg, ok := <-msgCh:
			if !ok {
				continue
			}

			b.messages = append(b.messages, msg)

			if len(b.messages) < b.batchSize {
				l.Debug().Int("messages_count", len(b.messages)).Msg("not enough messages, wait")
				continue
			}

			l.Info().Int("messages_count", len(b.messages)).Msg("main loop flush")
			if err := b.flush(ctx, fn); err != nil {
				l.Err(err).Msg("error flushing messages")
				return werr.Wrap(err)
			}

			ticker.Reset(b.duration)
		}
	}
}

this ^ consumer or ack all messages or do nothing, because error happened. I'am not sure my solution is correct in generally, but for my project with idempotency it's okay.

So if we can have ability for fetch messages with batch Size, it can help in many situations.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krsoninikhil please check MR #1395 with deadline timeout support.


// ReadLag returns the current lag of the reader by fetching the last offset of
// the topic and partition and computing the difference between that value and
// the offset of the last message returned by ReadMessage.
Expand Down