Skip to content

Commit 2e02f37

Browse files
authored
Nettest fix (#788)
* don't skip messages when offset == highwatermark
1 parent eebea66 commit 2e02f37

File tree

2 files changed

+10
-51
lines changed

2 files changed

+10
-51
lines changed

.circleci/config.yml

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -176,17 +176,6 @@ jobs:
176176
working_directory: *working_directory
177177
environment:
178178
KAFKA_VERSION: "2.4.1"
179-
180-
# Need to skip nettest to avoid these kinds of errors:
181-
# --- FAIL: TestConn/nettest (17.56s)
182-
# --- FAIL: TestConn/nettest/PingPong (7.40s)
183-
# conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
184-
# conntest.go:118: mismatching value: got 77, want 78
185-
# conntest.go:118: mismatching value: got 78, want 79
186-
# ...
187-
#
188-
# TODO: Figure out why these are happening and fix them (they don't appear to be new).
189-
KAFKA_SKIP_NETTEST: "1"
190179
docker:
191180
- image: circleci/golang
192181
- image: wurstmeister/zookeeper
@@ -203,17 +192,6 @@ jobs:
203192
working_directory: *working_directory
204193
environment:
205194
KAFKA_VERSION: "2.6.0"
206-
207-
# Need to skip nettest to avoid these kinds of errors:
208-
# --- FAIL: TestConn/nettest (17.56s)
209-
# --- FAIL: TestConn/nettest/PingPong (7.40s)
210-
# conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
211-
# conntest.go:118: mismatching value: got 77, want 78
212-
# conntest.go:118: mismatching value: got 78, want 79
213-
# ...
214-
#
215-
# TODO: Figure out why these are happening and fix them (they don't appear to be new).
216-
KAFKA_SKIP_NETTEST: "1"
217195
docker:
218196
- image: circleci/golang
219197
- image: wurstmeister/zookeeper
@@ -230,17 +208,6 @@ jobs:
230208
working_directory: *working_directory
231209
environment:
232210
KAFKA_VERSION: "2.7.1"
233-
234-
# Need to skip nettest to avoid these kinds of errors:
235-
# --- FAIL: TestConn/nettest (17.56s)
236-
# --- FAIL: TestConn/nettest/PingPong (7.40s)
237-
# conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
238-
# conntest.go:118: mismatching value: got 77, want 78
239-
# conntest.go:118: mismatching value: got 78, want 79
240-
# ...
241-
#
242-
# TODO: Figure out why these are happening and fix them (they don't appear to be new).
243-
KAFKA_SKIP_NETTEST: "1"
244211
docker:
245212
- image: circleci/golang
246213
- image: wurstmeister/zookeeper

conn.go

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,9 @@ const (
133133
ReadCommitted IsolationLevel = 1
134134
)
135135

136-
var (
137-
// DefaultClientID is the default value used as ClientID of kafka
138-
// connections.
139-
DefaultClientID string
140-
)
136+
// DefaultClientID is the default value used as ClientID of kafka
137+
// connections.
138+
var DefaultClientID string
141139

142140
func init() {
143141
progname := filepath.Base(os.Args[0])
@@ -263,10 +261,12 @@ func (c *Conn) Controller() (broker Broker, err error) {
263261
}
264262
for _, brokerMeta := range res.Brokers {
265263
if brokerMeta.NodeID == res.ControllerID {
266-
broker = Broker{ID: int(brokerMeta.NodeID),
264+
broker = Broker{
265+
ID: int(brokerMeta.NodeID),
267266
Port: int(brokerMeta.Port),
268267
Host: brokerMeta.Host,
269-
Rack: brokerMeta.Rack}
268+
Rack: brokerMeta.Rack,
269+
}
270270
break
271271
}
272272
}
@@ -322,7 +322,6 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
322322
err := c.readOperation(
323323
func(deadline time.Time, id int32) error {
324324
return c.writeRequest(findCoordinator, v0, id, request)
325-
326325
},
327326
func(deadline time.Time, size int) error {
328327
return expectZeroSize(func() (remain int, err error) {
@@ -752,9 +751,8 @@ func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
752751
// ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured
753752
// with the default values in ReadBatchConfig except for minBytes and maxBytes.
754753
func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
755-
756754
var adjustedDeadline time.Time
757-
var maxFetch = int(c.fetchMaxBytes)
755+
maxFetch := int(c.fetchMaxBytes)
758756

759757
if cfg.MinBytes < 0 || cfg.MinBytes > maxFetch {
760758
return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds", cfg.MinBytes, maxFetch)}
@@ -859,11 +857,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
859857

860858
var msgs *messageSetReader
861859
if err == nil {
862-
if highWaterMark == offset {
863-
msgs = &messageSetReader{empty: true}
864-
} else {
865-
msgs, err = newMessageSetReader(&c.rbuf, remain)
866-
}
860+
msgs, err = newMessageSetReader(&c.rbuf, remain)
867861
}
868862
if err == errShortRead {
869863
err = checkTimeoutErr(adjustedDeadline)
@@ -959,7 +953,6 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) {
959953
// connection. If there are none, the method fetches all partitions of the kafka
960954
// cluster.
961955
func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) {
962-
963956
if len(topics) == 0 {
964957
if len(c.topic) != 0 {
965958
defaultTopics := [...]string{c.topic}
@@ -1188,7 +1181,6 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11881181
}
11891182
return size, err
11901183
}
1191-
11921184
})
11931185
if err != nil {
11941186
return size, err
@@ -1556,7 +1548,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
15561548
return nil, err
15571549
}
15581550
if version == v1 {
1559-
var request = saslAuthenticateRequestV0{Data: data}
1551+
request := saslAuthenticateRequestV0{Data: data}
15601552
var response saslAuthenticateResponseV0
15611553

15621554
err := c.writeOperation(

0 commit comments

Comments
 (0)