Skip to content

Commit 382e96d

Browse files
Steve van Loben Selsrhansen2
andauthored
Do a final commit on end consumer group generation for immediate commits (#715)
* Do a final commit on end consumer group generation for immediate commits * add test for final commit on generation end Co-authored-by: rhansen2 <rob.hansen@sendgrid.com>
1 parent d81d37a commit 382e96d

File tree

2 files changed

+67
-2
lines changed

2 files changed

+67
-2
lines changed

reader.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,25 @@ func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) {
212212
for {
213213
select {
214214
case <-ctx.Done():
215+
// drain the commit channel and prepare a single, final commit.
216+
// the commit will combine any outstanding requests and the result
217+
// will be sent back to all the callers of CommitMessages so that
218+
// they can return.
219+
var errchs []chan<- error
220+
for hasCommits := true; hasCommits; {
221+
select {
222+
case req := <-r.commits:
223+
offsets.merge(req.commits)
224+
errchs = append(errchs, req.errch)
225+
default:
226+
hasCommits = false
227+
}
228+
}
229+
err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries)
230+
for _, errch := range errchs {
231+
// NOTE : this will be a buffered channel and will not block.
232+
errch <- err
233+
}
215234
return
216235

217236
case req := <-r.commits:

reader_test.go

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ func makeTestSequence(n int) []Message {
495495
}
496496

497497
func prepareReader(t *testing.T, ctx context.Context, r *Reader, msgs ...Message) {
498-
var config = r.Config()
498+
config := r.Config()
499499
var conn *Conn
500500
var err error
501501

@@ -710,7 +710,6 @@ func TestReaderPartitionWhenConsumerGroupsEnabled(t *testing.T) {
710710
if !invoke() {
711711
t.Fatalf("expected panic; but NewReader worked?!")
712712
}
713-
714713
}
715714

716715
func TestExtractTopics(t *testing.T) {
@@ -1281,6 +1280,53 @@ func TestValidateReader(t *testing.T) {
12811280
}
12821281
}
12831282

1283+
func TestCommitLoopImmediateFlushOnGenerationEnd(t *testing.T) {
1284+
t.Parallel()
1285+
var committedOffset int64
1286+
var commitCount int
1287+
gen := &Generation{
1288+
conn: mockCoordinator{
1289+
offsetCommitFunc: func(r offsetCommitRequestV2) (offsetCommitResponseV2, error) {
1290+
commitCount++
1291+
committedOffset = r.Topics[0].Partitions[0].Offset
1292+
return offsetCommitResponseV2{}, nil
1293+
},
1294+
},
1295+
done: make(chan struct{}),
1296+
log: func(func(Logger)) {},
1297+
logError: func(func(Logger)) {},
1298+
}
1299+
1300+
// initialize commits so that the commitLoopImmediate select statement blocks
1301+
r := &Reader{stctx: context.Background(), commits: make(chan commitRequest, 100)}
1302+
1303+
for i := 0; i < 100; i++ {
1304+
cr := commitRequest{
1305+
commits: []commit{{
1306+
topic: "topic",
1307+
partition: 0,
1308+
offset: int64(i) + 1,
1309+
}},
1310+
errch: make(chan<- error, 1),
1311+
}
1312+
r.commits <- cr
1313+
}
1314+
1315+
gen.Start(func(ctx context.Context) {
1316+
r.commitLoopImmediate(ctx, gen)
1317+
})
1318+
1319+
gen.close()
1320+
1321+
if committedOffset != 100 {
1322+
t.Fatalf("expected commited offset to be 100 but got %d", committedOffset)
1323+
}
1324+
1325+
if commitCount >= 100 {
1326+
t.Fatalf("expected a single final commit on generation end got %d", commitCount)
1327+
}
1328+
}
1329+
12841330
func TestCommitOffsetsWithRetry(t *testing.T) {
12851331
offsets := offsetStash{"topic": {0: 0}}
12861332

0 commit comments

Comments
 (0)