Skip to content

Commit f487e01

Browse files
authored
Skip refresh metadata for errored topic, fix #806 (#820)
* Skip refresh metadata for errored topic. Skip refrash metadata in roundTrip about errored topic, such as write message to a not exist topic, kafka will response error UNKNOWN_TOPIC_OR_PARTITION to the topic, it causes indefinitely metadata refreshing. Fixes #806 * Create unit test to detect issue #806
1 parent cdc927e commit f487e01

File tree

2 files changed

+151
-1
lines changed

2 files changed

+151
-1
lines changed

transport.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,18 @@ func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error)
416416
// we didn't have that topic in our cache so we should update
417417
// the cache.
418418
if m.AllowAutoTopicCreation {
419-
p.refreshMetadata(ctx, m.TopicNames)
419+
topicsToRefresh := make([]string, 0, len(resp.Topics))
420+
for _, topic := range resp.Topics {
421+
// fixes issue 806: don't refresh topics that failed to create,
422+
// it may means kafka doesn't enable auto topic creation.
423+
// This causes the library to hang indefinitely, same as createtopics process.
424+
if topic.ErrorCode != 0 {
425+
continue
426+
}
427+
428+
topicsToRefresh = append(topicsToRefresh, topic.Name)
429+
}
430+
p.refreshMetadata(ctx, topicsToRefresh)
420431
}
421432
}
422433

transport_test.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/segmentio/kafka-go/protocol"
1212
"github.com/segmentio/kafka-go/protocol/createtopics"
13+
meta "github.com/segmentio/kafka-go/protocol/metadata"
1314
)
1415

1516
func TestIssue477(t *testing.T) {
@@ -165,3 +166,141 @@ func TestIssue672(t *testing.T) {
165166
t.Fatalf("expected a createtopics.Response but got %T", r)
166167
}
167168
}
169+
170+
func TestIssue806(t *testing.T) {
171+
// ensure the test times out if the bug is re-introduced
172+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
173+
defer cancel()
174+
175+
// simulate unknown topic want auto create with unknownTopicName,
176+
const unknownTopicName = "unknown-topic"
177+
const okTopicName = "good-topic"
178+
179+
// make the connection pool think it's immediately ready to send
180+
ready := make(chan struct{})
181+
close(ready)
182+
183+
// allow the system to wake as much as it wants
184+
wake := make(chan event)
185+
defer close(wake)
186+
go func() {
187+
for {
188+
select {
189+
case <-ctx.Done():
190+
return
191+
case e := <-wake:
192+
if e == nil {
193+
return
194+
}
195+
e.trigger()
196+
}
197+
}
198+
}()
199+
200+
// handle requests by immediately resolving them with a create topics response,
201+
// the "unknown topic" will have err UNKNOWN_TOPIC_OR_PARTITION
202+
requests := make(chan connRequest, 1)
203+
defer close(requests)
204+
go func() {
205+
request := <-requests
206+
request.res.resolve(&meta.Response{
207+
Topics: []meta.ResponseTopic{
208+
{
209+
Name: unknownTopicName,
210+
ErrorCode: int16(UnknownTopicOrPartition),
211+
},
212+
{
213+
Name: okTopicName,
214+
Partitions: []meta.ResponsePartition{
215+
{
216+
PartitionIndex: 0,
217+
},
218+
},
219+
},
220+
},
221+
})
222+
}()
223+
224+
pool := &connPool{
225+
ready: ready,
226+
wake: wake,
227+
conns: map[int32]*connGroup{},
228+
}
229+
230+
// configure the state,
231+
//
232+
// set cached metadata only have good topic,
233+
// so it need to request metadata,
234+
// caused by unknown topic cannot find in cached metadata
235+
//
236+
// set layout only have good topic,
237+
// so it can find the good topic, but not the one that fails to create
238+
pool.setState(connPoolState{
239+
metadata: &meta.Response{
240+
Topics: []meta.ResponseTopic{
241+
{
242+
Name: okTopicName,
243+
Partitions: []meta.ResponsePartition{
244+
{
245+
PartitionIndex: 0,
246+
},
247+
},
248+
},
249+
},
250+
},
251+
layout: protocol.Cluster{
252+
Topics: map[string]protocol.Topic{
253+
okTopicName: {
254+
Name: okTopicName,
255+
Partitions: map[int32]protocol.Partition{
256+
0: {},
257+
},
258+
},
259+
},
260+
},
261+
})
262+
263+
// trick the connection pool into thinking it has a valid connection to request metadata
264+
pool.ctrl = &connGroup{
265+
pool: pool,
266+
broker: Broker{},
267+
idleConns: []*conn{
268+
{
269+
reqs: requests,
270+
},
271+
},
272+
}
273+
274+
// perform the round trip:
275+
// - if the issue is presenting this will hang waiting for metadata to arrive that will
276+
// never arrive, causing a deadline timeout.
277+
// - if the issue is fixed this will resolve almost instantaneously
278+
r, err := pool.roundTrip(ctx, &meta.Request{
279+
TopicNames: []string{unknownTopicName},
280+
AllowAutoTopicCreation: true,
281+
})
282+
// detect if the issue is presenting using the context timeout (note that checking the err return value
283+
// isn't good enough as the original implementation didn't return the context cancellation error due to
284+
// being run in a defer)
285+
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
286+
t.Fatalf("issue 806 is presenting! roundTrip should not have timed out")
287+
}
288+
289+
// ancillary assertions as general house-keeping, not directly related to the issue:
290+
291+
// we're not expecting any errors in this test
292+
if err != nil {
293+
t.Fatalf("unexpected error provoking connection pool roundTrip: %v", err)
294+
}
295+
296+
// we expect a response containing the errors from the broker
297+
if r == nil {
298+
t.Fatal("expected a non-nil response")
299+
}
300+
301+
// we expect to have the create topic response with created earlier
302+
_, ok := r.(*meta.Response)
303+
if !ok {
304+
t.Fatalf("expected a meta.Response but got %T", r)
305+
}
306+
}

0 commit comments

Comments
 (0)