Skip to content

Commit ce6b542

Browse files
authored
fix: scope logging to connection and stream (#3215)
Updates log scope for streams to use the muxer scope, which uses the connection scope so we can trace log lines per stream/connection instead of having component scope.
1 parent 58abe87 commit ce6b542

File tree

56 files changed

+1128
-1010
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1128
-1010
lines changed

packages/connection-encrypter-plaintext/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
"@libp2p/interface": "^2.10.5",
5050
"@libp2p/peer-id": "^5.1.8",
5151
"it-protobuf-stream": "^2.0.2",
52-
"it-stream-types": "^2.0.2",
5352
"protons-runtime": "^5.5.0",
5453
"uint8arraylist": "^2.4.8",
5554
"uint8arrays": "^5.1.0"

packages/connection-encrypter-plaintext/src/index.ts

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@ import { peerIdFromPublicKey } from '@libp2p/peer-id'
2727
import { pbStream } from 'it-protobuf-stream'
2828
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
2929
import { Exchange, KeyType } from './pb/proto.js'
30-
import type { ComponentLogger, Logger, MultiaddrConnection, ConnectionEncrypter, SecuredConnection, PrivateKey, SecureConnectionOptions } from '@libp2p/interface'
31-
import type { Duplex } from 'it-stream-types'
32-
import type { Uint8ArrayList } from 'uint8arraylist'
30+
import type { ComponentLogger, Logger, MultiaddrConnection, ConnectionEncrypter, SecuredConnection, PrivateKey, SecureConnectionOptions, SecurableStream } from '@libp2p/interface'
3331

3432
const PROTOCOL = '/plaintext/2.0.0'
3533

@@ -54,21 +52,22 @@ class Plaintext implements ConnectionEncrypter {
5452
'@libp2p/connection-encryption'
5553
]
5654

57-
async secureInbound<Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection>(conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
55+
async secureInbound<Stream extends SecurableStream = MultiaddrConnection>(conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
5856
return this._encrypt(conn, options)
5957
}
6058

61-
async secureOutbound<Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection>(conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
59+
async secureOutbound<Stream extends SecurableStream = MultiaddrConnection>(conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
6260
return this._encrypt(conn, options)
6361
}
6462

6563
/**
6664
* Encrypt connection
6765
*/
68-
async _encrypt<Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection>(conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
66+
async _encrypt<Stream extends SecurableStream = MultiaddrConnection>(conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
67+
const log = conn.log?.newScope('plaintext') ?? this.log
6968
const pb = pbStream(conn).pb(Exchange)
7069

71-
this.log('write pubkey exchange to peer %p', options?.remotePeer)
70+
log('write pubkey exchange to peer %p', options?.remotePeer)
7271

7372
const publicKey = this.privateKey.publicKey
7473

@@ -108,15 +107,15 @@ class Plaintext implements ConnectionEncrypter {
108107
throw new InvalidCryptoExchangeError('Public key did not match id')
109108
}
110109
} catch (err: any) {
111-
this.log.error(err)
110+
log.error(err)
112111
throw new InvalidCryptoExchangeError('Invalid public key - ' + err.message)
113112
}
114113

115114
if (options?.remotePeer != null && !peerId.equals(options?.remotePeer)) {
116115
throw new UnexpectedPeerError()
117116
}
118117

119-
this.log('plaintext key exchange completed successfully with peer %p', peerId)
118+
log('plaintext key exchange completed successfully with peer %p', peerId)
120119

121120
return {
122121
conn: pb.unwrap().unwrap(),

packages/connection-encrypter-tls/src/tls.ts

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,8 @@ import { HandshakeTimeoutError } from './errors.js'
2424
import { generateCertificate, verifyPeerCertificate, itToStream, streamToIt } from './utils.js'
2525
import { PROTOCOL } from './index.js'
2626
import type { TLSComponents } from './index.js'
27-
import type { MultiaddrConnection, ConnectionEncrypter, SecuredConnection, Logger, SecureConnectionOptions, CounterGroup, StreamMuxerFactory } from '@libp2p/interface'
28-
import type { Duplex } from 'it-stream-types'
27+
import type { MultiaddrConnection, ConnectionEncrypter, SecuredConnection, Logger, SecureConnectionOptions, CounterGroup, StreamMuxerFactory, SecurableStream } from '@libp2p/interface'
2928
import type { TLSSocketOptions } from 'node:tls'
30-
import type { Uint8ArrayList } from 'uint8arraylist'
3129

3230
export class TLS implements ConnectionEncrypter {
3331
public protocol: string = PROTOCOL
@@ -77,18 +75,19 @@ export class TLS implements ConnectionEncrypter {
7775
'@libp2p/connection-encryption'
7876
]
7977

80-
async secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
78+
async secureInbound <Stream extends SecurableStream = MultiaddrConnection> (conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
8179
return this._encrypt(conn, true, options)
8280
}
8381

84-
async secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
82+
async secureOutbound <Stream extends SecurableStream = MultiaddrConnection> (conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
8583
return this._encrypt(conn, false, options)
8684
}
8785

8886
/**
8987
* Encrypt connection
9088
*/
91-
async _encrypt <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (conn: Stream, isServer: boolean, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
89+
async _encrypt <Stream extends SecurableStream = MultiaddrConnection> (conn: Stream, isServer: boolean, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
90+
const log = conn.log?.newScope('tls') ?? this.log
9291
let streamMuxer: StreamMuxerFactory | undefined
9392

9493
let streamMuxers: string[] = []
@@ -112,7 +111,7 @@ export class TLS implements ConnectionEncrypter {
112111
'libp2p'
113112
],
114113
ALPNCallback: ({ protocols }) => {
115-
this.log.trace('received protocols %s', protocols)
114+
log.trace('received protocols %s', protocols)
116115
let chosenProtocol: string | undefined
117116

118117
for (const protocol of protocols) {
@@ -165,7 +164,7 @@ export class TLS implements ConnectionEncrypter {
165164

166165
verifyPeerCertificate(remote.raw, options?.remotePeer, this.log)
167166
.then(remotePeer => {
168-
this.log('remote certificate ok, remote peer %p', remotePeer)
167+
log('remote certificate ok, remote peer %p', remotePeer)
169168

170169
// 'libp2p' is a special protocol - if it's sent the remote does not
171170
// support early muxer negotiation
@@ -175,7 +174,7 @@ export class TLS implements ConnectionEncrypter {
175174

176175
if (streamMuxer == null) {
177176
const err = new InvalidCryptoExchangeError(`Selected muxer ${socket.alpnProtocol} did not exist`)
178-
this.log.error(`Selected muxer ${socket.alpnProtocol} did not exist - %e`, err)
177+
log.error(`Selected muxer ${socket.alpnProtocol} did not exist - %e`, err)
179178

180179
if (isAbortable(conn)) {
181180
conn.abort(err)

packages/connection-encrypter-tls/test/index.spec.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,14 @@ describe('tls', () => {
4747

4848
await Promise.all([
4949
encrypter.secureInbound(stubInterface<MultiaddrConnection>({
50-
...inbound
50+
...inbound,
51+
log: defaultLogger().forComponent('inbound')
5152
}), {
5253
remotePeer
5354
}),
5455
encrypter.secureOutbound(stubInterface<MultiaddrConnection>({
55-
...outbound
56+
...outbound,
57+
log: defaultLogger().forComponent('outbound')
5658
}), {
5759
remotePeer: wrongPeer
5860
})
@@ -81,12 +83,14 @@ describe('tls', () => {
8183

8284
await expect(Promise.all([
8385
encrypter.secureInbound(stubInterface<MultiaddrConnection>({
84-
...inbound
86+
...inbound,
87+
log: defaultLogger().forComponent('inbound')
8588
}), {
8689
remotePeer
8790
}),
8891
encrypter.secureOutbound(stubInterface<MultiaddrConnection>({
89-
...outbound
92+
...outbound,
93+
log: defaultLogger().forComponent('outbound')
9094
}), {
9195
remotePeer: localPeer
9296
})
@@ -99,12 +103,14 @@ describe('tls', () => {
99103

100104
const result = await Promise.all([
101105
encrypter.secureInbound(stubInterface<MultiaddrConnection>({
102-
...inbound
106+
...inbound,
107+
log: defaultLogger().forComponent('inbound')
103108
}), {
104109
remotePeer: localPeer
105110
}),
106111
encrypter.secureOutbound(stubInterface<MultiaddrConnection>({
107-
...outbound
112+
...outbound,
113+
log: defaultLogger().forComponent('outbound')
108114
}), {
109115
remotePeer: localPeer
110116
})
@@ -119,13 +125,15 @@ describe('tls', () => {
119125

120126
const result = await Promise.all([
121127
encrypter.secureInbound(stubInterface<MultiaddrConnection>({
122-
...inbound
128+
...inbound,
129+
log: defaultLogger().forComponent('inbound')
123130
}), {
124131
remotePeer: localPeer,
125132
skipStreamMuxerNegotiation: true
126133
}),
127134
encrypter.secureOutbound(stubInterface<MultiaddrConnection>({
128-
...outbound
135+
...outbound,
136+
log: defaultLogger().forComponent('outbound')
129137
}), {
130138
remotePeer: localPeer,
131139
skipStreamMuxerNegotiation: true

packages/interface-compliance-tests/src/mocks/connection.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
137137
const log = logger.forComponent('libp2p:mock-muxer')
138138

139139
const muxer = muxerFactory.createStreamMuxer({
140+
log,
140141
direction,
141142
onIncomingStream: (muxedStream) => {
142143
try {

packages/interface-compliance-tests/src/mocks/muxer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { logger } from '@libp2p/logger'
1+
import { defaultLogger, logger } from '@libp2p/logger'
22
import { AbstractStream } from '@libp2p/utils/abstract-stream'
33
import { abortableSource } from 'abortable-iterator'
44
import map from 'it-map'
@@ -131,7 +131,7 @@ class MockMuxer implements StreamMuxer {
131131
this.registryInitiatorStreams = new Map()
132132
this.registryRecipientStreams = new Map()
133133
this.log('create muxer')
134-
this.options = init ?? { direction: 'inbound' }
134+
this.options = init ?? { direction: 'inbound', log: defaultLogger().forComponent('mock-muxer') }
135135
this.closeController = new AbortController()
136136
// receives data from the muxer at the other end of the stream
137137
this.source = this.input = pushable({

packages/interface-compliance-tests/src/stream-muxer/base-test.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
2222
const onStreamEndPromise: DeferredPromise<Stream> = defer()
2323

2424
const dialerFactory = await common.setup()
25-
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
25+
const dialer = dialerFactory.createStreamMuxer({
26+
direction: 'outbound'
27+
})
2628

2729
const listenerFactory = await common.setup()
2830
const listener = listenerFactory.createStreamMuxer({
@@ -88,7 +90,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
8890
})
8991

9092
const listenerFactory = await common.setup()
91-
const listener = listenerFactory.createStreamMuxer({ direction: 'inbound' })
93+
const listener = listenerFactory.createStreamMuxer({
94+
direction: 'inbound'
95+
})
9296

9397
void pipe(p[0], dialer, p[0])
9498
void pipe(p[1], listener, p[1])

packages/interface-compliance-tests/src/stream-muxer/close-test.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
7272
let openedStreams = 0
7373
const expectedStreams = 5
7474
const dialerFactory = await common.setup()
75-
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
75+
const dialer = dialerFactory.createStreamMuxer({
76+
direction: 'outbound'
77+
})
7678

7779
// Listener is echo server :)
7880
const listenerFactory = await common.setup()
@@ -114,7 +116,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
114116
let openedStreams = 0
115117
const expectedStreams = 5
116118
const dialerFactory = await common.setup()
117-
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
119+
const dialer = dialerFactory.createStreamMuxer({
120+
direction: 'outbound'
121+
})
118122

119123
// Listener is echo server :)
120124
const listenerFactory = await common.setup()
@@ -157,7 +161,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
157161
let openedStreams = 0
158162
const expectedStreams = 5
159163
const dialerFactory = await common.setup()
160-
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
164+
const dialer = dialerFactory.createStreamMuxer({
165+
direction: 'outbound'
166+
})
161167

162168
// Listener is echo server :)
163169
const listenerFactory = await common.setup()
@@ -212,7 +218,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
212218

213219
it('calling newStream after close throws an error', async () => {
214220
const dialerFactory = await common.setup()
215-
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
221+
const dialer = dialerFactory.createStreamMuxer({
222+
direction: 'outbound'
223+
})
216224

217225
await dialer.close()
218226

@@ -227,7 +235,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
227235
it('closing one of the muxed streams doesn\'t close others', async () => {
228236
const p = duplexPair<Uint8Array | Uint8ArrayList>()
229237
const dialerFactory = await common.setup()
230-
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
238+
const dialer = dialerFactory.createStreamMuxer({
239+
direction: 'outbound'
240+
})
231241

232242
// Listener is echo server :)
233243
const listenerFactory = await common.setup()
@@ -280,7 +290,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
280290

281291
const p = duplexPair<Uint8Array | Uint8ArrayList>()
282292
const dialerFactory = await common.setup()
283-
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
293+
const dialer = dialerFactory.createStreamMuxer({
294+
direction: 'outbound'
295+
})
284296
const data = [randomBuffer(), randomBuffer()]
285297

286298
const listenerFactory = await common.setup()
@@ -325,7 +337,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
325337
const deferred = pDefer<Uint8ArrayList[]>()
326338
const p = duplexPair<Uint8Array | Uint8ArrayList>()
327339
const dialerFactory = await common.setup()
328-
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
340+
const dialer = dialerFactory.createStreamMuxer({
341+
direction: 'outbound'
342+
})
329343
const data = [randomBuffer(), randomBuffer()].map(d => new Uint8ArrayList(d))
330344
const expected = toBuffer(data.map(d => d.subarray()))
331345

@@ -391,7 +405,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
391405

392406
const p = duplexPair<Uint8Array | Uint8ArrayList>()
393407
const dialerFactory = await common.setup()
394-
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
408+
const dialer = dialerFactory.createStreamMuxer({
409+
direction: 'outbound'
410+
})
395411

396412
const listenerFactory = await common.setup()
397413
const listener = listenerFactory.createStreamMuxer({

packages/interface-compliance-tests/src/stream-muxer/spawner.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ export default async (createMuxer: (init?: StreamMuxerInit) => Promise<StreamMux
2525
.catch(err => { stream.abort(err) })
2626
}
2727
})
28-
const dialer = await createMuxer({ direction: 'outbound' })
28+
const dialer = await createMuxer({
29+
direction: 'outbound'
30+
})
2931

3032
void pipe(listenerSocket, listener, listenerSocket)
3133
void pipe(dialerSocket, dialer, dialerSocket)

packages/interface/src/connection-encrypter.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { MultiaddrConnection } from './connection.js'
2-
import type { AbortOptions, StreamMuxerFactory } from './index.js'
2+
import type { AbortOptions, Logger, StreamMuxerFactory } from './index.js'
33
import type { PeerId } from './peer-id.js'
44
import type { Duplex } from 'it-stream-types'
55
import type { Uint8ArrayList } from 'uint8arraylist'
@@ -21,6 +21,13 @@ export interface SecureConnectionOptions extends AbortOptions {
2121
skipStreamMuxerNegotiation?: boolean
2222
}
2323

24+
/**
25+
* A stream with an optional logger
26+
*/
27+
export interface SecurableStream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> {
28+
log?: Logger
29+
}
30+
2431
/**
2532
* A libp2p connection encrypter module must be compliant to this interface
2633
* to ensure all exchanged data between two peers is encrypted.
@@ -33,14 +40,14 @@ export interface ConnectionEncrypter<Extension = unknown> {
3340
* pass it for extra verification, otherwise it will be determined during
3441
* the handshake.
3542
*/
36-
secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (connection: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream, Extension>>
43+
secureOutbound <Stream extends SecurableStream = MultiaddrConnection> (connection: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream, Extension>>
3744

3845
/**
3946
* Decrypt incoming data. If the remote PeerId is known,
4047
* pass it for extra verification, otherwise it will be determined during
4148
* the handshake
4249
*/
43-
secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (connection: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream, Extension>>
50+
secureInbound <Stream extends SecurableStream = MultiaddrConnection> (connection: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream, Extension>>
4451
}
4552

4653
export interface SecuredConnection<Stream = any, Extension = unknown> {

0 commit comments

Comments
 (0)