@@ -104,118 +104,124 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu
104
104
log . error ( 'error during query - %e' , evt . detail )
105
105
} )
106
106
107
- signal . addEventListener ( 'abort' , ( ) => {
107
+ const onAbort = ( ) : void => {
108
108
queue . abort ( )
109
109
events . end ( new AbortError ( ) )
110
- } )
110
+ }
111
111
112
- // perform lookups on kadId, not the actual value
113
- const kadId = await convertBuffer ( key , {
114
- signal
115
- } )
112
+ signal . addEventListener ( 'abort' , onAbort )
116
113
117
- /**
118
- * Adds the passed peer to the query queue if it's not us and no other path
119
- * has passed through this peer
120
- */
121
- function queryPeer ( peer : PeerInfo , peerKadId : Uint8Array ) : void {
122
- if ( peer == null ) {
123
- return
124
- }
125
-
126
- peersSeen . add ( peer . id . toMultihash ( ) . bytes )
127
-
128
- const peerXor = uint8ArrayXor ( peerKadId , kadId )
129
-
130
- queue . add ( async ( ) => {
131
- try {
132
- for await ( const event of query ( {
133
- ...options ,
134
- key,
135
- peer,
136
- path : {
137
- index : path ,
138
- queued : queue . queued ,
139
- running : queue . running ,
140
- total : queue . size
141
- } ,
142
- numPaths,
143
- peerKadId,
144
- signal
145
- } ) ) {
146
- // if there are closer peers and the query has not completed, continue the query
147
- if ( event . name === 'PEER_RESPONSE' ) {
148
- for ( const closerPeer of event . closer ) {
149
- if ( peersSeen . has ( closerPeer . id . toMultihash ( ) . bytes ) ) { // eslint-disable-line max-depth
150
- log ( 'already seen %p in query' , closerPeer . id )
151
- continue
152
- }
114
+ try {
115
+ // perform lookups on kadId, not the actual value
116
+ const kadId = await convertBuffer ( key , {
117
+ signal
118
+ } )
153
119
154
- if ( ourPeerId . equals ( closerPeer . id ) ) { // eslint-disable-line max-depth
155
- log ( 'not querying ourselves' )
156
- continue
157
- }
120
+ /**
121
+ * Adds the passed peer to the query queue if it's not us and no other path
122
+ * has passed through this peer
123
+ */
124
+ function queryPeer ( peer : PeerInfo , peerKadId : Uint8Array ) : void {
125
+ if ( peer == null ) {
126
+ return
127
+ }
158
128
159
- if ( ! ( await connectionManager . isDialable ( closerPeer . multiaddrs ) ) ) { // eslint-disable-line max-depth
160
- log ( 'not querying undialable peer' )
161
- continue
162
- }
129
+ peersSeen . add ( peer . id . toMultihash ( ) . bytes )
163
130
164
- const closerPeerKadId = await convertPeerId ( closerPeer . id , {
165
- signal
166
- } )
167
- const closerPeerXor = uint8ArrayXor ( closerPeerKadId , kadId )
131
+ const peerXor = uint8ArrayXor ( peerKadId , kadId )
168
132
169
- // only continue query if closer peer is actually closer
170
- if ( uint8ArrayXorCompare ( closerPeerXor , peerXor ) !== - 1 ) { // eslint-disable-line max-depth
171
- log ( 'skipping %p as they are not closer to %b than %p' , closerPeer . id , key , peer )
172
- continue
133
+ queue . add ( async ( ) => {
134
+ try {
135
+ for await ( const event of query ( {
136
+ ...options ,
137
+ key,
138
+ peer,
139
+ path : {
140
+ index : path ,
141
+ queued : queue . queued ,
142
+ running : queue . running ,
143
+ total : queue . size
144
+ } ,
145
+ numPaths,
146
+ peerKadId,
147
+ signal
148
+ } ) ) {
149
+ // if there are closer peers and the query has not completed, continue the query
150
+ if ( event . name === 'PEER_RESPONSE' ) {
151
+ for ( const closerPeer of event . closer ) {
152
+ if ( peersSeen . has ( closerPeer . id . toMultihash ( ) . bytes ) ) { // eslint-disable-line max-depth
153
+ log ( 'already seen %p in query' , closerPeer . id )
154
+ continue
155
+ }
156
+
157
+ if ( ourPeerId . equals ( closerPeer . id ) ) { // eslint-disable-line max-depth
158
+ log ( 'not querying ourselves' )
159
+ continue
160
+ }
161
+
162
+ if ( ! ( await connectionManager . isDialable ( closerPeer . multiaddrs ) ) ) { // eslint-disable-line max-depth
163
+ log ( 'not querying undialable peer' )
164
+ continue
165
+ }
166
+
167
+ const closerPeerKadId = await convertPeerId ( closerPeer . id , {
168
+ signal
169
+ } )
170
+ const closerPeerXor = uint8ArrayXor ( closerPeerKadId , kadId )
171
+
172
+ // only continue query if closer peer is actually closer
173
+ if ( uint8ArrayXorCompare ( closerPeerXor , peerXor ) !== - 1 ) { // eslint-disable-line max-depth
174
+ log ( 'skipping %p as they are not closer to %b than %p' , closerPeer . id , key , peer )
175
+ continue
176
+ }
177
+
178
+ log ( 'querying closer peer %p' , closerPeer . id )
179
+ queryPeer ( closerPeer , closerPeerKadId )
173
180
}
174
-
175
- log ( 'querying closer peer %p' , closerPeer . id )
176
- queryPeer ( closerPeer , closerPeerKadId )
177
181
}
178
- }
179
182
180
- events . push ( {
181
- ...event ,
183
+ events . push ( {
184
+ ...event ,
185
+ path : {
186
+ index : path ,
187
+ queued : queue . queued ,
188
+ running : queue . running ,
189
+ total : queue . size
190
+ }
191
+ } )
192
+ }
193
+ } catch ( err : any ) {
194
+ // yield error event if query is continuing
195
+ events . push ( queryErrorEvent ( {
196
+ from : peer . id ,
197
+ error : err ,
182
198
path : {
183
199
index : path ,
184
200
queued : queue . queued ,
185
- running : queue . running ,
186
- total : queue . size
201
+ running : queue . running - 1 ,
202
+ total : queue . size - 1
187
203
}
188
- } )
204
+ } , options ) )
189
205
}
190
- } catch ( err : any ) {
191
- // yield error event if query is continuing
192
- events . push ( queryErrorEvent ( {
193
- from : peer . id ,
194
- error : err ,
195
- path : {
196
- index : path ,
197
- queued : queue . queued ,
198
- running : queue . running - 1 ,
199
- total : queue . size - 1
200
- }
201
- } , options ) )
202
- }
203
- } , {
204
- distance : peerXor
205
- } ) . catch ( err => {
206
- log . error ( 'error during query - %e' , err )
207
- } )
208
- }
209
-
210
- // begin the query with the starting peers
211
- await Promise . all (
212
- startingPeers . map ( async startingPeer => {
213
- queryPeer ( { id : startingPeer , multiaddrs : [ ] } , await convertPeerId ( startingPeer , {
214
- signal
215
- } ) )
216
- } )
217
- )
206
+ } , {
207
+ distance : peerXor
208
+ } ) . catch ( err => {
209
+ log . error ( 'error during query - %e' , err )
210
+ } )
211
+ }
218
212
219
- // yield results as they come in
220
- yield * events
213
+ // begin the query with the starting peers
214
+ await Promise . all (
215
+ startingPeers . map ( async startingPeer => {
216
+ queryPeer ( { id : startingPeer , multiaddrs : [ ] } , await convertPeerId ( startingPeer , {
217
+ signal
218
+ } ) )
219
+ } )
220
+ )
221
+
222
+ // yield results as they come in
223
+ yield * events
224
+ } finally {
225
+ signal . removeEventListener ( 'abort' , onAbort )
226
+ }
221
227
}
0 commit comments