Skip to content

Commit e7c2c10

Browse files
authored
Merge pull request #925 from rhansen2/cyx-fix-handle-error-in-response
fix: handle error in response
2 parents 4788faf + 4f3f3dd commit e7c2c10

File tree

2 files changed

+92
-8
lines changed

2 files changed

+92
-8
lines changed

protocol/describeconfigs/describeconfigs.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package describeconfigs
22

33
import (
4+
"fmt"
45
"strconv"
56

67
"github.com/segmentio/kafka-go/protocol"
@@ -87,11 +88,19 @@ func (r *Response) Merge(requests []protocol.Message, results []interface{}) (
8788
response := &Response{}
8889

8990
for _, result := range results {
90-
brokerResp := result.(*Response)
91-
response.Resources = append(
92-
response.Resources,
93-
brokerResp.Resources...,
94-
)
91+
switch v := result.(type) {
92+
case *Response:
93+
response.Resources = append(
94+
response.Resources,
95+
v.Resources...,
96+
)
97+
98+
case error:
99+
return nil, v
100+
101+
default:
102+
panic(fmt.Sprintf("unknown result type in Merge: %T", result))
103+
}
95104
}
96105

97106
return response, nil
@@ -123,6 +132,4 @@ type ResponseConfigSynonym struct {
123132
ConfigSource int8 `kafka:"min=v1,max=v3"`
124133
}
125134

126-
var (
127-
_ protocol.BrokerMessage = (*Request)(nil)
128-
)
135+
var _ protocol.BrokerMessage = (*Request)(nil)
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package describeconfigs
2+
3+
import (
4+
"errors"
5+
"io"
6+
"reflect"
7+
"testing"
8+
9+
"github.com/segmentio/kafka-go/protocol"
10+
)
11+
12+
func TestResponse_Merge(t *testing.T) {
13+
t.Run("happy path", func(t *testing.T) {
14+
r := &Response{}
15+
16+
r1 := &Response{
17+
Resources: []ResponseResource{
18+
{ResourceName: "r1"},
19+
},
20+
}
21+
r2 := &Response{
22+
Resources: []ResponseResource{
23+
{ResourceName: "r2"},
24+
},
25+
}
26+
27+
got, err := r.Merge([]protocol.Message{&Request{}}, []interface{}{r1, r2})
28+
if err != nil {
29+
t.Fatal(err)
30+
}
31+
32+
want := &Response{
33+
Resources: []ResponseResource{
34+
{ResourceName: "r1"},
35+
{ResourceName: "r2"},
36+
},
37+
}
38+
39+
if !reflect.DeepEqual(want, got) {
40+
t.Fatalf("wanted response: \n%+v, got \n%+v", want, got)
41+
}
42+
})
43+
44+
t.Run("with errors", func(t *testing.T) {
45+
r := &Response{}
46+
47+
r1 := &Response{
48+
Resources: []ResponseResource{
49+
{ResourceName: "r1"},
50+
},
51+
}
52+
53+
_, err := r.Merge([]protocol.Message{&Request{}}, []interface{}{r1, io.EOF})
54+
if !errors.Is(err, io.EOF) {
55+
t.Fatalf("wanted err io.EOF, got %v", err)
56+
}
57+
})
58+
59+
t.Run("panic with unexpected type", func(t *testing.T) {
60+
defer func() {
61+
msg := recover()
62+
if msg != "unknown result type in Merge: string" {
63+
t.Fatal("unexpected panic", msg)
64+
}
65+
}()
66+
r := &Response{}
67+
68+
r1 := &Response{
69+
Resources: []ResponseResource{
70+
{ResourceName: "r1"},
71+
},
72+
}
73+
74+
_, _ = r.Merge([]protocol.Message{&Request{}}, []interface{}{r1, "how did a string got here"})
75+
t.Fatal("did not panic")
76+
})
77+
}

0 commit comments

Comments
 (0)