Skip to content

fix(transport): improve WebSocket/WebRTC connection robustness #3222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ export interface ConnectOptions extends LoggerOptions, ProgressOptions<WebRTCDia

export async function initiateConnection ({ rtcConfiguration, dataChannel, signal, metrics, multiaddr: ma, connectionManager, transportManager, log, logger, onProgress }: ConnectOptions): Promise<{ remoteAddress: Multiaddr, peerConnection: RTCPeerConnection, muxerFactory: DataChannelMuxerFactory }> {
const { circuitAddress, targetPeer } = splitAddr(ma)
const ICE_GATHERING_TIMEOUT = 30000 // 30 seconds
const CONNECTION_TIMEOUT = 60000 // 60 seconds

metrics?.dialerEvents.increment({ open: true })

Expand All @@ -54,7 +56,6 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa
})
} else {
onProgress?.(new CustomProgressEvent('webrtc:reuse-relay-connection'))

connection = connections[0]
}

Expand All @@ -74,7 +75,59 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa
dataChannelOptions: dataChannel
})

// Track connection state
let isConnecting = true
let iceGatheringComplete = false
let connectionStateTimeout: NodeJS.Timeout
let iceGatheringTimeout: NodeJS.Timeout

try {
// Monitor ICE gathering state
peerConnection.onicegatheringstatechange = () => {
if (peerConnection.iceGatheringState === 'complete') {
iceGatheringComplete = true
clearTimeout(iceGatheringTimeout)
}
}

// Monitor connection state changes
peerConnection.onconnectionstatechange = () => {
log.trace('connection state changed to: %s', peerConnection.connectionState)

switch (peerConnection.connectionState) {
case 'connected':
isConnecting = false
clearTimeout(connectionStateTimeout)
break
case 'failed':
case 'disconnected':
case 'closed':
isConnecting = false
clearTimeout(connectionStateTimeout)
if (!iceGatheringComplete) {
log.error('connection failed before ICE gathering completed')
}
break
}
}

// Set timeouts
iceGatheringTimeout = setTimeout(() => {
if (!iceGatheringComplete) {
log.error('ICE gathering timed out after %d ms', ICE_GATHERING_TIMEOUT)
peerConnection.close()
throw new Error('ICE gathering timeout')
}
}, ICE_GATHERING_TIMEOUT)

connectionStateTimeout = setTimeout(() => {
if (isConnecting) {
log.error('connection establishment timed out after %d ms', CONNECTION_TIMEOUT)
peerConnection.close()
throw new Error('Connection timeout')
}
}, CONNECTION_TIMEOUT)

// we create the channel so that the RTCPeerConnection has a component for
// which to collect candidates. The label is not relevant to connection
// initiation but can be useful for debugging
Expand All @@ -100,14 +153,17 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa
log.error('error sending ICE candidate', err)
})
}

peerConnection.onicecandidateerror = (event) => {
log.error('initiator ICE candidate error', event)
metrics?.dialerEvents.increment({ ice_error: true })
}

// create an offer
const offerSdp = await peerConnection.createOffer().catch(err => {
log.error('could not execute createOffer', err)
throw new SDPHandshakeFailedError('Failed to set createOffer')
metrics?.dialerEvents.increment({ offer_error: true })
throw new SDPHandshakeFailedError('Failed to create offer')
})

log.trace('initiator send SDP offer %s', offerSdp.sdp)
Expand All @@ -122,19 +178,22 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa
// set offer as local description
await peerConnection.setLocalDescription(offerSdp).catch(err => {
log.error('could not execute setLocalDescription', err)
throw new SDPHandshakeFailedError('Failed to set localDescription')
metrics?.dialerEvents.increment({ local_description_error: true })
throw new SDPHandshakeFailedError('Failed to set local description')
})

onProgress?.(new CustomProgressEvent('webrtc:read-sdp-answer'))

log.trace('initiator read SDP answer')

// read answer
const answerMessage = await messageStream.read({
signal
})
// read answer with timeout
const answerMessage = await Promise.race([
messageStream.read({ signal }),
new Promise((_, reject) => setTimeout(() => reject(new Error('SDP answer timeout')), 30000))
])

if (answerMessage.type !== Message.Type.SDP_ANSWER) {
metrics?.dialerEvents.increment({ answer_error: true })
throw new SDPHandshakeFailedError('Remote should send an SDP answer')
}

Expand All @@ -143,7 +202,8 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa
const answerSdp = new RTCSessionDescription({ type: 'answer', sdp: answerMessage.data })
await peerConnection.setRemoteDescription(answerSdp).catch(err => {
log.error('could not execute setRemoteDescription', err)
throw new SDPHandshakeFailedError('Failed to set remoteDescription')
metrics?.dialerEvents.increment({ remote_description_error: true })
throw new SDPHandshakeFailedError('Failed to set remote description')
})

log.trace('initiator read candidates until connected')
Expand All @@ -168,6 +228,7 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa
})

log.trace('initiator connected to remote address %s', ma)
metrics?.dialerEvents.increment({ success: true })

return {
remoteAddress: ma,
Expand All @@ -176,12 +237,17 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa
}
} catch (err: any) {
log.error('outgoing signaling error', err)
metrics?.dialerEvents.increment({ error: true })

clearTimeout(iceGatheringTimeout)
clearTimeout(connectionStateTimeout)
peerConnection.close()
stream.abort(err)
throw err
} finally {
peerConnection.onicecandidate = null
peerConnection.onicecandidateerror = null
peerConnection.onicegatheringstatechange = null
peerConnection.onconnectionstatechange = null
}
}
98 changes: 72 additions & 26 deletions packages/transport-websockets/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,37 +139,83 @@ class WebSockets implements Transport<WebSocketsDialEvents> {
const cOpts = ma.toOptions()
this.log('dialing %s:%s', cOpts.host, cOpts.port)

const errorPromise = pDefer()
const rawSocket = connect(toUri(ma), this.init)
rawSocket.socket.addEventListener('error', () => {
// the WebSocket.ErrorEvent type doesn't actually give us any useful
// information about what happened
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/error_event
const err = new ConnectionFailedError(`Could not connect to ${ma.toString()}`)
this.log.error('connection error:', err)
this.metrics?.dialerEvents.increment({ error: true })
errorPromise.reject(err)
})
const MAX_RETRIES = 3
const RETRY_DELAY_MS = 1000
let lastError: Error | undefined

try {
options.onProgress?.(new CustomProgressEvent('websockets:open-connection'))
await raceSignal(Promise.race([rawSocket.connected(), errorPromise.promise]), options.signal)
} catch (err: any) {
if (options.signal?.aborted) {
this.metrics?.dialerEvents.increment({ abort: true })
}
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
const errorPromise = pDefer<Error>()
const connectPromise = pDefer<void>()
const rawSocket = connect(toUri(ma), this.init)

rawSocket.close()
.catch(err => {
this.log.error('error closing raw socket', err)
})
// Track connection state
let isConnecting = true

// Handle WebSocket errors
rawSocket.socket.addEventListener('error', (event) => {
const err = new ConnectionFailedError(`WebSocket connection failed to ${ma.toString()}: ${(event as any).message ?? 'Unknown error'}`)
this.log.error('connection error (attempt %d/%d):', attempt, MAX_RETRIES, err)
this.metrics?.dialerEvents.increment({ error: true })
errorPromise.reject(err)
})

// Handle successful connection
rawSocket.socket.addEventListener('open', () => {
this.log('connection successful on attempt %d', attempt)
connectPromise.resolve()
})

try {
options.onProgress?.(new CustomProgressEvent('websockets:open-connection', { attempt }))

// Race between connection, error, and timeout
await raceSignal(
Promise.race([
connectPromise.promise,
errorPromise.promise,
// Add explicit connection timeout
new Promise((_, reject) => setTimeout(() => {
if (isConnecting) {
reject(new ConnectionFailedError(`Connection timeout after ${this.init.timeout ?? 30000}ms`))
}
}, this.init.timeout ?? 30000))
]),
options.signal
)

throw err
isConnecting = false
this.log('connected %s on attempt %d', ma, attempt)
this.metrics?.dialerEvents.increment({ connect: true })
return rawSocket
} catch (err: any) {
isConnecting = false
lastError = err

if (options.signal?.aborted) {
this.metrics?.dialerEvents.increment({ abort: true })
throw err
}

// Close the failed socket
rawSocket.close()
.catch(err => {
this.log.error('error closing raw socket after failure', err)
})

// If we have retries left, wait and try again
if (attempt < MAX_RETRIES) {
this.log('retrying connection after %dms...', RETRY_DELAY_MS)
await new Promise(resolve => setTimeout(resolve, RETRY_DELAY_MS))
continue
}

// No more retries, throw the last error
throw new ConnectionFailedError(`Failed to connect after ${MAX_RETRIES} attempts: ${lastError.message}`)
}
}

this.log('connected %s', ma)
this.metrics?.dialerEvents.increment({ connect: true })
return rawSocket
// This should never be reached due to the throw above
throw new ConnectionFailedError('Unexpected connection failure')
}

/**
Expand Down