Skip to content

Commit 89f3edd

Browse files
committed
Increase test coverage for commit function.
1 parent f74a1c8 commit 89f3edd

File tree

2 files changed

+74
-15
lines changed

2 files changed

+74
-15
lines changed

reader.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash
184184
offsetStash.removeGenerationID(generationID)
185185
illegalGenerationErr = true
186186
err = nil
187+
break
187188
}
188189
}
189190
}
@@ -228,11 +229,14 @@ func (o offsetStash) reset() {
228229
}
229230

230231
func (o offsetStash) removeGenerationID(genID int32) {
231-
for _, offsetsForTopic := range o {
232+
for topic, offsetsForTopic := range o {
232233
for partition, offsetsForPartition := range offsetsForTopic {
233234
if offsetsForPartition.generationID == genID {
234235
delete(offsetsForTopic, partition)
235236
}
237+
if len(offsetsForTopic) == 0 {
238+
delete(o, topic)
239+
}
236240
}
237241
}
238242
}

reader_test.go

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1451,36 +1451,87 @@ func TestCommitLoopImmediateFlushOnGenerationEnd(t *testing.T) {
14511451
}
14521452

14531453
func TestCommitOffsetsWithRetry(t *testing.T) {
1454-
offsets := offsetStash{"topic": {0: {0, 1}}}
1454+
offsets := func() offsetStash {
1455+
return offsetStash{"topic": {0: {0, 1}}}
1456+
}
14551457

14561458
tests := map[string]struct {
1457-
Fails int
1458-
Invocations int
1459-
HasError bool
1459+
Fails int
1460+
Invocations int
1461+
HasError bool
1462+
Error error
1463+
Offsets offsetStash
1464+
Config ReaderConfig
1465+
ExpectedOffsets offsetStash
14601466
}{
14611467
"happy path": {
1462-
Invocations: 1,
1468+
Invocations: 1,
1469+
Error: io.EOF,
1470+
Offsets: offsets(),
1471+
ExpectedOffsets: offsets(),
14631472
},
14641473
"1 retry": {
1465-
Fails: 1,
1466-
Invocations: 2,
1474+
Fails: 1,
1475+
Invocations: 2,
1476+
Error: io.EOF,
1477+
Offsets: offsets(),
1478+
ExpectedOffsets: offsets(),
14671479
},
14681480
"out of retries": {
1469-
Fails: defaultCommitRetries + 1,
1470-
Invocations: defaultCommitRetries,
1471-
HasError: true,
1481+
Fails: defaultCommitRetries + 1,
1482+
Invocations: defaultCommitRetries,
1483+
HasError: true,
1484+
Error: io.EOF,
1485+
Offsets: offsets(),
1486+
ExpectedOffsets: offsets(),
1487+
},
1488+
"illegal generation error only 1 generation": {
1489+
Fails: 1,
1490+
Invocations: 1,
1491+
Error: IllegalGeneration,
1492+
Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 1}}},
1493+
ExpectedOffsets: offsetStash{},
1494+
Config: ReaderConfig{ErrorOnWrongGenerationCommit: false},
1495+
},
1496+
"illegal generation error only 2 generations": {
1497+
Fails: 1,
1498+
Invocations: 1,
1499+
Error: IllegalGeneration,
1500+
Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 2}}},
1501+
ExpectedOffsets: offsetStash{"topic": {1: {0, 2}}},
1502+
Config: ReaderConfig{ErrorOnWrongGenerationCommit: false},
1503+
},
1504+
"illegal generation error only 1 generation - error propagation": {
1505+
Fails: 1,
1506+
Invocations: 1,
1507+
Error: IllegalGeneration,
1508+
Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 1}}},
1509+
ExpectedOffsets: offsetStash{},
1510+
Config: ReaderConfig{ErrorOnWrongGenerationCommit: true},
1511+
HasError: true,
1512+
},
1513+
"illegal generation error only 2 generations - error propagation": {
1514+
Fails: 1,
1515+
Invocations: 1,
1516+
Error: IllegalGeneration,
1517+
Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 2}}},
1518+
ExpectedOffsets: offsetStash{"topic": {1: {0, 2}}},
1519+
Config: ReaderConfig{ErrorOnWrongGenerationCommit: true},
1520+
HasError: true,
14721521
},
14731522
}
14741523

14751524
for label, test := range tests {
14761525
t.Run(label, func(t *testing.T) {
1526+
requests := make([]offsetCommitRequestV2, 0)
14771527
count := 0
14781528
gen := &Generation{
14791529
conn: mockCoordinator{
1480-
offsetCommitFunc: func(offsetCommitRequestV2) (offsetCommitResponseV2, error) {
1530+
offsetCommitFunc: func(r offsetCommitRequestV2) (offsetCommitResponseV2, error) {
1531+
requests = append(requests, r)
14811532
count++
14821533
if count <= test.Fails {
1483-
return offsetCommitResponseV2{}, io.EOF
1534+
return offsetCommitResponseV2{}, test.Error
14841535
}
14851536
return offsetCommitResponseV2{}, nil
14861537
},
@@ -1491,13 +1542,17 @@ func TestCommitOffsetsWithRetry(t *testing.T) {
14911542
Assignments: map[string][]PartitionAssignment{"topic": {{0, 1}}},
14921543
}
14931544

1494-
r := &Reader{stctx: context.Background()}
1495-
err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries)
1545+
r := &Reader{stctx: context.Background(), config: test.Config}
1546+
err := r.commitOffsetsWithRetry(gen, test.Offsets, defaultCommitRetries)
14961547
switch {
14971548
case test.HasError && err == nil:
14981549
t.Error("bad err: expected not nil; got nil")
14991550
case !test.HasError && err != nil:
15001551
t.Errorf("bad err: expected nil; got %v", err)
1552+
default:
1553+
if !reflect.DeepEqual(test.ExpectedOffsets, test.Offsets) {
1554+
t.Errorf("bad expected offsets: expected %+v; got %v", test.ExpectedOffsets, test.Offsets)
1555+
}
15011556
}
15021557
})
15031558
}

0 commit comments

Comments
 (0)