Skip to content

Commit 94ae2dc

Browse files
committed
Add Connect and Disconnect functionality
GODRIVER-285 Change-Id: I1d2eae1cc94a93c0664665541b12682131a49018
1 parent 9bee77c commit 94ae2dc

File tree

21 files changed

+1388
-225
lines changed

21 files changed

+1388
-225
lines changed

core/connection/connection.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ type Dialer interface {
5050
DialContext(ctx context.Context, network, address string) (net.Conn, error)
5151
}
5252

53+
// DialerFunc is a type implemented by functions that can be used as a Dialer.
54+
type DialerFunc func(ctx context.Context, network, address string) (net.Conn, error)
55+
56+
// DialContext implements the Dialer interface.
57+
func (df DialerFunc) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
58+
return df(ctx, network, address)
59+
}
60+
5361
// DefaultDialer is the Dialer implementation that is used by this package. Changing this
5462
// will also change the Dialer used for this package. This should only be changed why all
5563
// of the connections being made need to use a different Dialer. Most of the time, using a

core/connection/connection_test.go

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,83 @@
77
package connection
88

99
import (
10+
"context"
1011
"net"
12+
"sync"
1113
"testing"
1214
)
1315

1416
// bootstrapConnection creates a listener that will listen for a single connection
1517
// on the return address. The user provided run function will be called with the accepted
1618
// connection. The user is responsible for closing the connection.
17-
func bootstrapConnection(t *testing.T, run func(net.Conn)) net.Addr {
18-
l, err := net.Listen("tcp", ":0")
19+
func bootstrapConnections(t *testing.T, num int, run func(net.Conn)) net.Addr {
20+
l, err := net.Listen("tcp", "localhost:0")
1921
if err != nil {
2022
t.Errorf("Could not set up a listener: %v", err)
2123
t.FailNow()
2224
}
2325
go func() {
24-
c, err := l.Accept()
25-
if err != nil {
26-
t.Errorf("Could not accept a connection: %v", err)
26+
for i := 0; i < num; i++ {
27+
c, err := l.Accept()
28+
if err != nil {
29+
t.Errorf("Could not accept a connection: %v", err)
30+
}
31+
go run(c)
2732
}
2833
_ = l.Close()
29-
run(c)
3034
}()
3135
return l.Addr()
3236
}
37+
38+
type netconn struct {
39+
net.Conn
40+
closed chan struct{}
41+
d *dialer
42+
}
43+
44+
func (nc *netconn) Close() error {
45+
nc.closed <- struct{}{}
46+
nc.d.connclosed(nc)
47+
return nc.Conn.Close()
48+
}
49+
50+
type dialer struct {
51+
Dialer
52+
opened map[*netconn]struct{}
53+
closed map[*netconn]struct{}
54+
sync.Mutex
55+
}
56+
57+
func newdialer(d Dialer) *dialer {
58+
return &dialer{Dialer: d, opened: make(map[*netconn]struct{}), closed: make(map[*netconn]struct{})}
59+
}
60+
61+
func (d *dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
62+
d.Lock()
63+
defer d.Unlock()
64+
c, err := d.Dialer.DialContext(ctx, network, address)
65+
if err != nil {
66+
return nil, err
67+
}
68+
nc := &netconn{Conn: c, closed: make(chan struct{}, 1), d: d}
69+
d.opened[nc] = struct{}{}
70+
return nc, nil
71+
}
72+
73+
func (d *dialer) connclosed(nc *netconn) {
74+
d.Lock()
75+
defer d.Unlock()
76+
d.closed[nc] = struct{}{}
77+
}
78+
79+
func (d *dialer) lenopened() int {
80+
d.Lock()
81+
defer d.Unlock()
82+
return len(d.opened)
83+
}
84+
85+
func (d *dialer) lenclosed() int {
86+
d.Lock()
87+
defer d.Unlock()
88+
return len(d.closed)
89+
}

core/connection/error.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,8 @@ type NetworkError struct {
3434
func (ne NetworkError) Error() string {
3535
return fmt.Sprintf("connection(%s): %s", ne.ConnectionID, ne.Wrapped.Error())
3636
}
37+
38+
// PoolError is an error returned from a Pool method.
39+
type PoolError string
40+
41+
func (pe PoolError) Error() string { return string(pe) }

0 commit comments

Comments
 (0)