Skip to content

Commit eebea66

Browse files
author
Achille
authored
document CommitOffsets behavior (#785)
1 parent 88b462b commit eebea66

File tree

2 files changed

+16
-0
lines changed

2 files changed

+16
-0
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,13 @@ for {
328328
}
329329
```
330330

331+
When committing messages in consumer groups, the message with the highest offset
332+
for a given topic/partition determines the value of the committed offset for
333+
that partition. For example, if messages at offset 1, 2, and 3 of a single
334+
partition were retrieved by call to `FetchMessage`, calling `CommitMessages`
335+
with message offset 3 will also result in committing the messages at offsets 1
336+
and 2 for that partition.
337+
331338
### Managing Commits
332339

333340
By default, CommitMessages will synchronously commit offsets to Kafka. For

reader.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,15 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
847847
// CommitMessages commits the list of messages passed as argument. The program
848848
// may pass a context to asynchronously cancel the commit operation when it was
849849
// configured to be blocking.
850+
//
851+
// Because kafka consumer groups track a single offset per partition, the
852+
// highest message offset passed to CommitMessages will cause all previous
853+
// messages to be committed. Applications need to account for these Kafka
854+
// limitations when committing messages, and maintain message ordering if they
855+
// need strong delivery guarantees. This property makes it valid to pass only
856+
// the last message seen to CommitMessages in order to move the offset of the
857+
// topic/partition it belonged to forward, effectively committing all previous
858+
// messages in the partition.
850859
func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
851860
if !r.useConsumerGroup() {
852861
return errOnlyAvailableWithGroup

0 commit comments

Comments
 (0)