From 91d6ab97ac6c9b5ccad5556fa72844c6be1c60f4 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 8 Jul 2025 11:28:11 +0200 Subject: [PATCH] feat: allow async stream handlers Allow `await`ing promises inside stream handlers. --- .../src/mocks/connection.ts | 4 ++-- packages/interface/src/stream-handler.ts | 2 +- packages/libp2p/src/upgrader.ts | 14 ++++---------- 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/packages/interface-compliance-tests/src/mocks/connection.ts b/packages/interface-compliance-tests/src/mocks/connection.ts index 7cc2c34a0e..6eae7809e0 100644 --- a/packages/interface-compliance-tests/src/mocks/connection.ts +++ b/packages/interface-compliance-tests/src/mocks/connection.ts @@ -143,7 +143,7 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio mss.handle(muxedStream, registrar.getProtocols(), { log }) - .then(({ stream, protocol }) => { + .then(async ({ stream, protocol }) => { log('%s: incoming stream opened on %s', direction, protocol) muxedStream.protocol = protocol muxedStream.sink = stream.sink @@ -152,7 +152,7 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio connection.streams.push(muxedStream) const { handler } = registrar.getHandler(protocol) - handler({ connection, stream: muxedStream }) + await handler({ connection, stream: muxedStream }) }).catch(err => { log.error(err) }) diff --git a/packages/interface/src/stream-handler.ts b/packages/interface/src/stream-handler.ts index 39a087a5f9..f6e362a3cb 100644 --- a/packages/interface/src/stream-handler.ts +++ b/packages/interface/src/stream-handler.ts @@ -17,7 +17,7 @@ export interface StreamHandler { /** * A callback function that accepts the incoming stream data */ - (data: IncomingStreamData): void + (data: IncomingStreamData): void | Promise } export interface StreamHandlerOptions extends AbortOptions { diff --git a/packages/libp2p/src/upgrader.ts b/packages/libp2p/src/upgrader.ts index 833e6e6872..c5268be369 100644 --- a/packages/libp2p/src/upgrader.ts +++ b/packages/libp2p/src/upgrader.ts @@ -467,17 +467,11 @@ export class Upgrader implements UpgraderInterface { this.components.metrics?.trackProtocolStream(muxedStream, connection) - this._onStream({ connection, stream: muxedStream, protocol }) + await this._onStream({ connection, stream: muxedStream, protocol }) }) .catch(async err => { connection.log.error('error handling incoming stream id %s - %e', muxedStream.id, err) - - if (muxedStream.timeline.close == null) { - await muxedStream.close({ - signal - }) - .catch(err => muxedStream.abort(err)) - } + muxedStream.abort(err) }) } }) @@ -651,7 +645,7 @@ export class Upgrader implements UpgraderInterface { /** * Routes incoming streams to the correct handler */ - _onStream (opts: OnStreamOptions): void { + async _onStream (opts: OnStreamOptions): Promise { const { connection, stream, protocol } = opts const { handler, options } = this.components.registrar.getHandler(protocol) @@ -659,7 +653,7 @@ export class Upgrader implements UpgraderInterface { throw new LimitedConnectionError('Cannot open protocol stream on limited connection') } - handler({ connection, stream }) + await handler({ connection, stream }) } /**