@@ -5,7 +5,7 @@ import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
5
5
import {
6
6
RELAY_V2_HOP_CODEC
7
7
} from '../constants.js'
8
- import type { ComponentLogger , Logger , Peer , PeerId , PeerStore , Startable , TopologyFilter } from '@libp2p/interface'
8
+ import type { ComponentLogger , Libp2pEvents , Logger , Peer , PeerId , PeerInfo , PeerStore , Startable , TopologyFilter , TypedEventTarget } from '@libp2p/interface'
9
9
import type { ConnectionManager , RandomWalk , Registrar , TransportManager } from '@libp2p/interface-internal'
10
10
11
11
export interface RelayDiscoveryEvents {
@@ -19,6 +19,7 @@ export interface RelayDiscoveryComponents {
19
19
registrar : Registrar
20
20
logger : ComponentLogger
21
21
randomWalk : RandomWalk
22
+ events : TypedEventTarget < Libp2pEvents >
22
23
}
23
24
24
25
export interface RelayDiscoveryInit {
@@ -30,10 +31,7 @@ export interface RelayDiscoveryInit {
30
31
* peers that support the circuit v2 HOP protocol.
31
32
*/
32
33
export class RelayDiscovery extends TypedEventEmitter < RelayDiscoveryEvents > implements Startable {
33
- private readonly peerStore : PeerStore
34
- private readonly registrar : Registrar
35
- private readonly connectionManager : ConnectionManager
36
- private readonly randomWalk : RandomWalk
34
+ private readonly components : RelayDiscoveryComponents
37
35
private started : boolean
38
36
private running : boolean
39
37
private topologyId ?: string
@@ -46,15 +44,14 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
46
44
super ( )
47
45
48
46
this . log = components . logger . forComponent ( 'libp2p:circuit-relay:discover-relays' )
47
+ this . components = components
49
48
this . started = false
50
49
this . running = false
51
- this . peerStore = components . peerStore
52
- this . registrar = components . registrar
53
- this . connectionManager = components . connectionManager
54
- this . randomWalk = components . randomWalk
55
50
this . filter = init . filter
56
51
this . discoveryController = new AbortController ( )
57
52
setMaxListeners ( Infinity , this . discoveryController . signal )
53
+ this . dialPeer = this . dialPeer . bind ( this )
54
+ this . onPeer = this . onPeer . bind ( this )
58
55
}
59
56
60
57
isStarted ( ) : boolean {
@@ -64,7 +61,7 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
64
61
async start ( ) : Promise < void > {
65
62
// register a topology listener for when new peers are encountered
66
63
// that support the hop protocol
67
- this . topologyId = await this . registrar . register ( RELAY_V2_HOP_CODEC , {
64
+ this . topologyId = await this . components . registrar . register ( RELAY_V2_HOP_CODEC , {
68
65
filter : this . filter ,
69
66
onConnect : ( peerId ) => {
70
67
this . log . trace ( 'discovered relay %p queue (length: %d, active %d)' , peerId , this . queue ?. size , this . queue ?. running )
@@ -77,10 +74,13 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
77
74
78
75
stop ( ) : void {
79
76
if ( this . topologyId != null ) {
80
- this . registrar . unregister ( this . topologyId )
77
+ this . components . registrar . unregister ( this . topologyId )
78
+ }
79
+
80
+ if ( this . running ) {
81
+ this . stopDiscovery ( )
81
82
}
82
83
83
- this . discoveryController ?. abort ( )
84
84
this . started = false
85
85
}
86
86
@@ -90,7 +90,8 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
90
90
*
91
91
* 1. Check the metadata store for known relays, try to listen on the ones we are already connected to
92
92
* 2. Dial and try to listen on the peers we know that support hop but are not connected
93
- * 3. Search the network
93
+ * 3. Search the network - this requires a peer routing implementation to be configured but will fail gracefully
94
+ * 4. Dial any peers discovered - this covers when no peer routing implementation has been configured but some peer discovery mechanism is also present
94
95
*/
95
96
startDiscovery ( ) : void {
96
97
if ( this . running ) {
@@ -102,11 +103,14 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
102
103
this . discoveryController = new AbortController ( )
103
104
setMaxListeners ( Infinity , this . discoveryController . signal )
104
105
106
+ // dial any peer we discover
107
+ this . components . events . addEventListener ( 'peer:discovery' , this . onPeer )
108
+
105
109
Promise . resolve ( )
106
110
. then ( async ( ) => {
107
111
this . log ( 'searching peer store for relays' )
108
112
109
- const peers = ( await this . peerStore . all ( {
113
+ const peers = ( await this . components . peerStore . all ( {
110
114
filters : [
111
115
// filter by a list of peers supporting RELAY_V2_HOP and ones we are not listening on
112
116
( peer ) => {
@@ -149,7 +153,7 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
149
153
150
154
this . log ( 'start random walk' )
151
155
152
- for await ( const peer of this . randomWalk . walk ( { signal : this . discoveryController . signal } ) ) {
156
+ for await ( const peer of this . components . randomWalk . walk ( { signal : this . discoveryController . signal } ) ) {
153
157
this . log . trace ( 'found random peer %p' , peer . id )
154
158
155
159
if ( queue . has ( peer . id ) ) {
@@ -159,14 +163,14 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
159
163
continue
160
164
}
161
165
162
- if ( this . connectionManager . getConnections ( peer . id ) ?. length > 0 ) {
166
+ if ( this . components . connectionManager . getConnections ( peer . id ) ?. length > 0 ) {
163
167
this . log . trace ( 'random peer %p was already connected' , peer . id )
164
168
165
169
// skip peers we are already connected to
166
170
continue
167
171
}
168
172
169
- if ( ! ( await this . connectionManager . isDialable ( peer . multiaddrs ) ) ) {
173
+ if ( ! ( await this . components . connectionManager . isDialable ( peer . multiaddrs ) ) ) {
170
174
this . log . trace ( 'random peer %p was not dialable' , peer . id , peer . multiaddrs . map ( ma => ma . toString ( ) ) )
171
175
172
176
// skip peers we can't dial
@@ -186,16 +190,7 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
186
190
187
191
// dial the peer - this will cause identify to run and our topology to
188
192
// be notified and we'll attempt to create reservations
189
- queue . add ( async ( ) => {
190
- const signal = anySignal ( [ this . discoveryController . signal , AbortSignal . timeout ( 5000 ) ] )
191
- setMaxListeners ( Infinity , signal )
192
-
193
- try {
194
- await this . connectionManager . openConnection ( peer . id , { signal } )
195
- } finally {
196
- signal . clear ( )
197
- }
198
- } , {
193
+ queue . add ( this . dialPeer , {
199
194
peerId : peer . id ,
200
195
signal : this . discoveryController . signal
201
196
} )
@@ -219,6 +214,70 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
219
214
this . log ( 'stop discovery' )
220
215
this . running = false
221
216
this . discoveryController ?. abort ( )
217
+ this . queue ?. clear ( )
218
+
219
+ // stop dialing any peer we discover
220
+ this . components . events . removeEventListener ( 'peer:discovery' , this . onPeer )
221
+ }
222
+
223
+ onPeer ( evt : CustomEvent < PeerInfo > ) : void {
224
+ this . log . trace ( 'maybe dialing discovered peer %p - %e' , evt . detail . id )
225
+
226
+ this . maybeDialPeer ( evt )
227
+ . catch ( err => {
228
+ this . log . trace ( 'error dialing discovered peer %p - %e' , evt . detail . id , err )
229
+ } )
230
+ }
231
+
232
+ async maybeDialPeer ( evt : CustomEvent < PeerInfo > ) : Promise < void > {
233
+ if ( this . queue == null ) {
234
+ return
235
+ }
236
+
237
+ const peerId = evt . detail . id
238
+ const multiaddrs = evt . detail . multiaddrs
239
+
240
+ if ( this . queue . has ( peerId ) ) {
241
+ this . log . trace ( 'random peer %p was already in queue' , peerId )
242
+
243
+ // skip peers already in the queue
244
+ return
245
+ }
246
+
247
+ if ( this . components . connectionManager . getConnections ( peerId ) ?. length > 0 ) {
248
+ this . log . trace ( 'random peer %p was already connected' , peerId )
249
+
250
+ // skip peers we are already connected to
251
+ return
252
+ }
253
+
254
+ if ( ! ( await this . components . connectionManager . isDialable ( multiaddrs ) ) ) {
255
+ this . log . trace ( 'random peer %p was not dialable' , peerId )
256
+
257
+ // skip peers we can't dial
258
+ return
259
+ }
260
+
261
+ this . queue ?. add ( this . dialPeer , {
262
+ peerId : evt . detail . id ,
263
+ signal : this . discoveryController . signal
264
+ } )
265
+ . catch ( err => {
266
+ this . log . error ( 'error opening connection to discovered peer %p' , evt . detail . id , err )
267
+ } )
268
+ }
269
+
270
+ async dialPeer ( { peerId, signal } : { peerId : PeerId , signal ?: AbortSignal } ) : Promise < void > {
271
+ const combinedSignal = anySignal ( [ AbortSignal . timeout ( 5_000 ) , signal ] )
272
+ setMaxListeners ( Infinity , combinedSignal )
273
+
274
+ try {
275
+ await this . components . connectionManager . openConnection ( peerId , {
276
+ signal : combinedSignal
277
+ } )
278
+ } finally {
279
+ combinedSignal . clear ( )
280
+ }
222
281
}
223
282
}
224
283
0 commit comments