Skip to content

Commit 451e011

Browse files
authored
fix: debounce sending identify push messages (#3193)
To handle the case where several protocol handlers are regstered and deregistered rapidly, introduce a small delay before actually sending the push messages. Fixes #2389
1 parent 53001ad commit 451e011

File tree

4 files changed

+116
-58
lines changed

4 files changed

+116
-58
lines changed

packages/protocol-identify/src/consts.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,5 @@ export const MAX_IDENTIFY_MESSAGE_SIZE = 1024 * 8
1313

1414
// https://github.com/libp2p/go-libp2p/blob/0385ec924bad172f74a74db09939e97c079b1420/p2p/protocol/identify/id.go#L47C7-L47C25
1515
export const MAX_PUSH_CONCURRENCY = 32
16+
17+
export const PUSH_DEBOUNCE_MS = 1_000

packages/protocol-identify/src/identify-push.ts

Lines changed: 73 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { serviceCapabilities } from '@libp2p/interface'
22
import { RecordEnvelope, PeerRecord } from '@libp2p/peer-record'
3+
import { debounce } from '@libp2p/utils/debounce'
34
import { protocols } from '@multiformats/multiaddr'
45
import drain from 'it-drain'
56
import parallel from 'it-parallel'
@@ -9,7 +10,8 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
910
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
1011
import {
1112
MULTICODEC_IDENTIFY_PUSH_PROTOCOL_NAME,
12-
MULTICODEC_IDENTIFY_PUSH_PROTOCOL_VERSION
13+
MULTICODEC_IDENTIFY_PUSH_PROTOCOL_VERSION,
14+
PUSH_DEBOUNCE_MS
1315
} from './consts.js'
1416
import { Identify as IdentifyMessage } from './pb/message.js'
1517
import { AbstractIdentify, consumeIdentifyMessage, defaultValues } from './utils.js'
@@ -20,6 +22,7 @@ import type { ConnectionManager } from '@libp2p/interface-internal'
2022
export class IdentifyPush extends AbstractIdentify implements Startable, IdentifyPushInterface {
2123
private readonly connectionManager: ConnectionManager
2224
private readonly concurrency: number
25+
private _push: () => void
2326

2427
constructor (components: IdentifyPushComponents, init: IdentifyPushInit = {}) {
2528
super(components, {
@@ -31,10 +34,14 @@ export class IdentifyPush extends AbstractIdentify implements Startable, Identif
3134
this.connectionManager = components.connectionManager
3235
this.concurrency = init.concurrency ?? defaultValues.concurrency
3336

37+
this._push = debounce(this.sendPushMessage.bind(this), init.debounce ?? PUSH_DEBOUNCE_MS)
38+
3439
if ((init.runOnSelfUpdate ?? defaultValues.runOnSelfUpdate)) {
3540
// When self peer record changes, trigger identify-push
3641
components.events.addEventListener('self:peer:update', (evt) => {
37-
void this.push().catch(err => { this.log.error(err) })
42+
this.push().catch(err => {
43+
this.log.error('error pushing updates to peers - %e', err)
44+
})
3845
})
3946
}
4047
}
@@ -47,72 +54,80 @@ export class IdentifyPush extends AbstractIdentify implements Startable, Identif
4754
* Calls `push` on all peer connections
4855
*/
4956
async push (): Promise<void> {
57+
this._push()
58+
}
59+
60+
private async sendPushMessage (): Promise<void> {
5061
// Do not try to push if we are not running
5162
if (!this.isStarted()) {
5263
return
5364
}
5465

55-
const listenAddresses = this.addressManager.getAddresses().map(ma => ma.decapsulateCode(protocols('p2p').code))
56-
const peerRecord = new PeerRecord({
57-
peerId: this.peerId,
58-
multiaddrs: listenAddresses
59-
})
60-
const signedPeerRecord = await RecordEnvelope.seal(peerRecord, this.privateKey)
61-
const supportedProtocols = this.registrar.getProtocols()
62-
const peer = await this.peerStore.get(this.peerId)
63-
const agentVersion = uint8ArrayToString(peer.metadata.get('AgentVersion') ?? uint8ArrayFromString(this.host.agentVersion))
64-
const protocolVersion = uint8ArrayToString(peer.metadata.get('ProtocolVersion') ?? uint8ArrayFromString(this.host.protocolVersion))
65-
const self = this
66-
67-
async function * pushToConnections (): AsyncGenerator<() => Promise<void>> {
68-
for (const connection of self.connectionManager.getConnections()) {
69-
const peer = await self.peerStore.get(connection.remotePeer)
70-
71-
if (!peer.protocols.includes(self.protocol)) {
72-
continue
73-
}
66+
try {
67+
const listenAddresses = this.addressManager.getAddresses().map(ma => ma.decapsulateCode(protocols('p2p').code))
68+
const peerRecord = new PeerRecord({
69+
peerId: this.peerId,
70+
multiaddrs: listenAddresses
71+
})
72+
const signedPeerRecord = await RecordEnvelope.seal(peerRecord, this.privateKey)
73+
const supportedProtocols = this.registrar.getProtocols()
74+
const peer = await this.peerStore.get(this.peerId)
75+
const agentVersion = uint8ArrayToString(peer.metadata.get('AgentVersion') ?? uint8ArrayFromString(this.host.agentVersion))
76+
const protocolVersion = uint8ArrayToString(peer.metadata.get('ProtocolVersion') ?? uint8ArrayFromString(this.host.protocolVersion))
77+
const self = this
78+
79+
async function * pushToConnections (): AsyncGenerator<() => Promise<void>> {
80+
for (const connection of self.connectionManager.getConnections()) {
81+
const peer = await self.peerStore.get(connection.remotePeer)
82+
83+
if (!peer.protocols.includes(self.protocol)) {
84+
continue
85+
}
7486

75-
yield async () => {
76-
let stream: Stream | undefined
77-
const signal = AbortSignal.timeout(self.timeout)
78-
79-
setMaxListeners(Infinity, signal)
80-
81-
try {
82-
stream = await connection.newStream(self.protocol, {
83-
signal,
84-
runOnLimitedConnection: self.runOnLimitedConnection
85-
})
86-
87-
const pb = pbStream(stream, {
88-
maxDataLength: self.maxMessageSize
89-
}).pb(IdentifyMessage)
90-
91-
await pb.write({
92-
listenAddrs: listenAddresses.map(ma => ma.bytes),
93-
signedPeerRecord: signedPeerRecord.marshal(),
94-
protocols: supportedProtocols,
95-
agentVersion,
96-
protocolVersion
97-
}, {
98-
signal
99-
})
100-
101-
await stream.close({
102-
signal
103-
})
104-
} catch (err: any) {
105-
// Just log errors
106-
self.log.error('could not push identify update to peer', err)
107-
stream?.abort(err)
87+
yield async () => {
88+
let stream: Stream | undefined
89+
const signal = AbortSignal.timeout(self.timeout)
90+
91+
setMaxListeners(Infinity, signal)
92+
93+
try {
94+
stream = await connection.newStream(self.protocol, {
95+
signal,
96+
runOnLimitedConnection: self.runOnLimitedConnection
97+
})
98+
99+
const pb = pbStream(stream, {
100+
maxDataLength: self.maxMessageSize
101+
}).pb(IdentifyMessage)
102+
103+
await pb.write({
104+
listenAddrs: listenAddresses.map(ma => ma.bytes),
105+
signedPeerRecord: signedPeerRecord.marshal(),
106+
protocols: supportedProtocols,
107+
agentVersion,
108+
protocolVersion
109+
}, {
110+
signal
111+
})
112+
113+
await stream.close({
114+
signal
115+
})
116+
} catch (err: any) {
117+
// Just log errors
118+
self.log.error('could not push identify update to peer', err)
119+
stream?.abort(err)
120+
}
108121
}
109122
}
110123
}
111-
}
112124

113-
await drain(parallel(pushToConnections(), {
114-
concurrency: this.concurrency
115-
}))
125+
await drain(parallel(pushToConnections(), {
126+
concurrency: this.concurrency
127+
}))
128+
} catch (err: any) {
129+
this.log.error('error pushing updates to peers - %e', err)
130+
}
116131
}
117132

118133
/**

packages/protocol-identify/src/index.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,15 @@ export interface IdentifyPushInit extends Omit<IdentifyInit, 'runOnConnectionOpe
126126
* @default 32
127127
*/
128128
concurrency?: number
129+
130+
/**
131+
* To prevent rapid flurries of network activity when addresses/protocols
132+
* change rapidly in succession, debounce the sending of push message by this
133+
* amount in ms
134+
*
135+
* @default 1_000
136+
*/
137+
debounce?: number
129138
}
130139

131140
export interface IdentifyComponents {

packages/protocol-identify/test/push.spec.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { defaultLogger } from '@libp2p/logger'
44
import { peerIdFromPrivateKey } from '@libp2p/peer-id'
55
import { multiaddr } from '@multiformats/multiaddr'
66
import { expect } from 'aegir/chai'
7+
import delay from 'delay'
78
import { pair } from 'it-pair'
89
import { pbStream } from 'it-protobuf-stream'
910
import { TypedEventEmitter } from 'main-event'
@@ -164,4 +165,35 @@ describe('identify (push)', () => {
164165
expect(components.peerStore.patch.callCount).to.equal(0, 'patched peer when push timed out')
165166
expect(stream.abort.callCount).to.equal(1, 'did not abort stream')
166167
})
168+
169+
it('should debounce outgoing pushes', async () => {
170+
identify = new IdentifyPush(components, {
171+
timeout: 10
172+
})
173+
174+
await start(identify)
175+
176+
components.addressManager.getAddresses.returns([multiaddr(`/ip4/123.123.123.123/tcp/123/p2p/${components.peerId}`)])
177+
components.registrar.getProtocols.returns(['/super/fun/protocol'])
178+
components.peerStore.get.withArgs(components.peerId).resolves({
179+
id: components.peerId,
180+
addresses: [],
181+
protocols: [],
182+
metadata: new Map(),
183+
tags: new Map()
184+
})
185+
186+
expect(components.connectionManager.getConnections.called).to.be.false()
187+
188+
identify.push()
189+
identify.push()
190+
identify.push()
191+
identify.push()
192+
identify.push()
193+
identify.push()
194+
195+
await delay(2_000)
196+
197+
expect(components.connectionManager.getConnections.callCount).to.equal(1)
198+
})
167199
})

0 commit comments

Comments
 (0)