Skip to content

Commit da91759

Browse files
Reader: allow config to return OffsetOutOfRange errors (#917)
1 parent e7c2c10 commit da91759

File tree

5 files changed

+63
-6
lines changed

5 files changed

+63
-6
lines changed

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ module github.com/segmentio/kafka-go
33
go 1.15
44

55
require (
6+
github.com/davecgh/go-spew v1.1.1 // indirect
67
github.com/klauspost/compress v1.14.2
78
github.com/pierrec/lz4/v4 v4.1.14
8-
github.com/stretchr/testify v1.6.1
9+
github.com/stretchr/testify v1.7.1
910
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
1011
github.com/xdg/stringprep v1.0.0 // indirect
1112
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 // indirect
1213
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
14+
gopkg.in/yaml.v3 v3.0.0-20220512140231-539c8e751b99 // indirect
1315
)

go.sum

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
21
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
2+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
3+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
34
github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw=
45
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
56
github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE=
67
github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
78
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
89
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
910
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
10-
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
11-
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
11+
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
12+
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
1213
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
1314
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
1415
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
@@ -24,5 +25,6 @@ golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
2425
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
2526
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
2627
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
27-
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
2828
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
29+
gopkg.in/yaml.v3 v3.0.0-20220512140231-539c8e751b99 h1:dbuHpmKjkDzSOMKAWl10QNlgaZUd3V1q99xc81tt2Kc=
30+
gopkg.in/yaml.v3 v3.0.0-20220512140231-539c8e751b99/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

kafka_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,5 +183,9 @@ func newTestKafkaLogger(t *testing.T, prefix string) Logger {
183183

184184
func (l *testKafkaLogger) Printf(msg string, args ...interface{}) {
185185
l.T.Helper()
186-
l.T.Logf(l.Prefix+" "+msg, args...)
186+
if l.Prefix != "" {
187+
l.T.Logf(l.Prefix+" "+msg, args...)
188+
} else {
189+
l.T.Logf(msg, args...)
190+
}
187191
}

reader.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,12 @@ type ReaderConfig struct {
509509
//
510510
// The default is to try 3 times.
511511
MaxAttempts int
512+
513+
// OffsetOutOfRangeError indicates that the reader should return an error in
514+
// the event of an OffsetOutOfRange error, rather than retrying indefinitely.
515+
// This flag is being added to retain backwards-compatibility, so it will be
516+
// removed in a future version of kafka-go.
517+
OffsetOutOfRangeError bool
512518
}
513519

514520
// Validate method validates ReaderConfig properties.
@@ -1191,6 +1197,9 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
11911197
stats: r.stats,
11921198
isolationLevel: r.config.IsolationLevel,
11931199
maxAttempts: r.config.MaxAttempts,
1200+
1201+
// backwards-compatibility flags
1202+
offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
11941203
}).run(ctx, offset)
11951204
}(ctx, key, offset, &r.join)
11961205
}
@@ -1216,6 +1225,8 @@ type reader struct {
12161225
stats *readerStats
12171226
isolationLevel IsolationLevel
12181227
maxAttempts int
1228+
1229+
offsetOutOfRangeError bool
12191230
}
12201231

12211232
type readerMessage struct {
@@ -1249,12 +1260,18 @@ func (r *reader) run(ctx context.Context, offset int64) {
12491260
conn, start, err := r.initialize(ctx, offset)
12501261
if err != nil {
12511262
if errors.Is(err, OffsetOutOfRange) {
1263+
if r.offsetOutOfRangeError {
1264+
r.sendError(ctx, err)
1265+
return
1266+
}
1267+
12521268
// This would happen if the requested offset is passed the last
12531269
// offset on the partition leader. In that case we're just going
12541270
// to retry later hoping that enough data has been produced.
12551271
r.withErrorLogger(func(log Logger) {
12561272
log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err)
12571273
})
1274+
12581275
continue
12591276
}
12601277

reader_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ func TestReader(t *testing.T) {
6262
scenario: "reading from an out-of-range offset waits until the context is cancelled",
6363
function: testReaderOutOfRangeGetsCanceled,
6464
},
65+
66+
{
67+
scenario: "topic being recreated will return an error",
68+
function: testReaderTopicRecreated,
69+
},
6570
}
6671

6772
for _, test := range tests {
@@ -78,6 +83,7 @@ func TestReader(t *testing.T) {
7883
MinBytes: 1,
7984
MaxBytes: 10e6,
8085
MaxWait: 100 * time.Millisecond,
86+
Logger: newTestKafkaLogger(t, ""),
8187
})
8288
defer r.Close()
8389
testFunc(t, ctx, r)
@@ -1950,3 +1956,29 @@ func createTopicWithCompaction(t *testing.T, topic string, partitions int) {
19501956
defer cancel()
19511957
waitForTopic(ctx, t, topic)
19521958
}
1959+
1960+
// The current behavior of the Reader is to retry OffsetOutOfRange errors
1961+
// indefinitely, which results in programs hanging in the event of a topic being
1962+
// re-created while a consumer is running. To retain backwards-compatibility,
1963+
// ReaderConfig.OffsetOutOfRangeError is being used to instruct the Reader to
1964+
// return an error in this case instead, allowing callers to react.
1965+
func testReaderTopicRecreated(t *testing.T, ctx context.Context, r *Reader) {
1966+
r.config.OffsetOutOfRangeError = true
1967+
1968+
topic := r.config.Topic
1969+
1970+
// add 1 message to the topic
1971+
prepareReader(t, ctx, r, makeTestSequence(1)...)
1972+
1973+
// consume the message (moving the offset from 0 -> 1)
1974+
_, err := r.ReadMessage(ctx)
1975+
require.NoError(t, err)
1976+
1977+
// destroy the topic, then recreate it so the offset now becomes 0
1978+
deleteTopic(t, topic)
1979+
createTopic(t, topic, 1)
1980+
1981+
// expect an error, since the offset should now be out of range
1982+
_, err = r.ReadMessage(ctx)
1983+
require.ErrorIs(t, err, OffsetOutOfRange)
1984+
}

0 commit comments

Comments
 (0)