Skip to content

Commit 8f063ce

Browse files
feat(reader): handle io.ErrNoProgress more gracefully (#941)
1 parent ba6f442 commit 8f063ce

File tree

1 file changed

+8
-0
lines changed

1 file changed

+8
-0
lines changed

reader.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,6 +1321,14 @@ func (r *reader) run(ctx context.Context, offset int64) {
13211321
errcount = 0
13221322
continue
13231323

1324+
case errors.Is(err, io.ErrNoProgress):
1325+
// This error is returned by the Conn when it believes the connection
1326+
// has been corrupted, so we need to explicitly close it. Since we are
1327+
// explicitly handling it and a retry will pick up, we can suppress the
1328+
// error metrics and logs for this case.
1329+
conn.Close()
1330+
break readLoop
1331+
13241332
case errors.Is(err, UnknownTopicOrPartition):
13251333
r.withErrorLogger(func(log Logger) {
13261334
log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, toHumanOffset(offset), r.brokers)

0 commit comments

Comments
 (0)