Skip to content

Commit 53001ad

Browse files
authored
fix: use .close() to close streams (#3191)
Instead of the weird double-sink of `[]`, use the `.close` method to explicitly close streams and muxers.
1 parent 6ced1c8 commit 53001ad

File tree

1 file changed

+53
-37
lines changed
  • packages/interface-compliance-tests/src/stream-muxer

1 file changed

+53
-37
lines changed

packages/interface-compliance-tests/src/stream-muxer/base-test.ts

Lines changed: 53 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { expect } from 'aegir/chai'
22
import all from 'it-all'
33
import { byteStream } from 'it-byte-stream'
4-
import drain from 'it-drain'
54
import map from 'it-map'
65
import { duplexPair } from 'it-pair/duplex'
76
import { pipe } from 'it-pipe'
@@ -12,22 +11,19 @@ import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
1211
import { isValidTick } from '../is-valid-tick.js'
1312
import type { TestSetup } from '../index.js'
1413
import type { Stream, StreamMuxerFactory } from '@libp2p/interface'
15-
import type { Source, Duplex } from 'it-stream-types'
14+
import type { Source } from 'it-stream-types'
1615
import type { DeferredPromise } from 'p-defer'
1716

18-
async function drainAndClose (stream: Duplex<any>): Promise<void> {
19-
await pipe([], stream, drain)
20-
}
21-
2217
export default (common: TestSetup<StreamMuxerFactory>): void => {
2318
describe('base', () => {
24-
it('Open a stream from the dialer', async () => {
19+
it('should open a stream from the dialer', async () => {
2520
const p = duplexPair<Uint8Array | Uint8ArrayList>()
26-
const dialerFactory = await common.setup()
27-
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
2821
const onStreamPromise: DeferredPromise<Stream> = defer()
2922
const onStreamEndPromise: DeferredPromise<Stream> = defer()
3023

24+
const dialerFactory = await common.setup()
25+
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
26+
3127
const listenerFactory = await common.setup()
3228
const listener = listenerFactory.createStreamMuxer({
3329
direction: 'inbound',
@@ -42,18 +38,20 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
4238
void pipe(p[0], dialer, p[0])
4339
void pipe(p[1], listener, p[1])
4440

45-
const conn = await dialer.newStream()
46-
expect(dialer.streams).to.include(conn)
47-
expect(isValidTick(conn.timeline.open)).to.equal(true)
41+
const dialerStream = await dialer.newStream()
42+
expect(dialer.streams).to.include(dialerStream)
43+
expect(isValidTick(dialerStream.timeline.open)).to.equal(true)
4844

49-
void drainAndClose(conn)
45+
const dialerBytes = byteStream(dialerStream)
46+
void dialerBytes.write(uint8ArrayFromString('hello'))
5047

51-
const stream = await onStreamPromise.promise
52-
expect(isValidTick(stream.timeline.open)).to.equal(true)
48+
const listenerStream = await onStreamPromise.promise
49+
expect(isValidTick(listenerStream.timeline.open)).to.equal(true)
5350
// Make sure the stream is being tracked
54-
expect(listener.streams).to.include(stream)
51+
expect(listener.streams).to.include(listenerStream)
5552

56-
void drainAndClose(stream)
53+
await dialerStream.close()
54+
await listenerStream.close()
5755

5856
// Make sure stream is closed properly
5957
const endedStream = await onStreamEndPromise.promise
@@ -66,22 +64,26 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
6664
// Make sure the stream is removed from tracking
6765
expect(isValidTick(endedStream.timeline.close)).to.equal(true)
6866

69-
await drainAndClose(dialer)
70-
await drainAndClose(listener)
67+
await dialer.close()
68+
await listener.close()
7169

7270
// ensure we have no streams left
7371
expect(dialer.streams).to.have.length(0)
7472
expect(listener.streams).to.have.length(0)
7573
})
7674

77-
it('Open a stream from the listener', async () => {
75+
it('should open a stream from the listener', async () => {
7876
const p = duplexPair<Uint8Array | Uint8ArrayList>()
7977
const onStreamPromise: DeferredPromise<Stream> = defer()
78+
const onStreamEndPromise: DeferredPromise<Stream> = defer()
8079
const dialerFactory = await common.setup()
8180
const dialer = dialerFactory.createStreamMuxer({
8281
direction: 'outbound',
8382
onIncomingStream: (stream: Stream) => {
8483
onStreamPromise.resolve(stream)
84+
},
85+
onStreamEnd: (stream) => {
86+
onStreamEndPromise.resolve(stream)
8587
}
8688
})
8789

@@ -91,21 +93,35 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
9193
void pipe(p[0], dialer, p[0])
9294
void pipe(p[1], listener, p[1])
9395

94-
const conn = await listener.newStream()
96+
const listenerStream = await listener.newStream()
97+
const listenerBytes = byteStream(listenerStream)
98+
void listenerBytes.write(uint8ArrayFromString('hello'))
9599

96-
void drainAndClose(conn)
100+
const dialerStream = await onStreamPromise.promise
97101

98-
const stream = await onStreamPromise.promise
99-
expect(isValidTick(stream.timeline.open)).to.equal(true)
100-
expect(listener.streams).to.include(conn)
101-
expect(isValidTick(conn.timeline.open)).to.equal(true)
102-
void drainAndClose(stream)
102+
expect(isValidTick(dialerStream.timeline.open)).to.equal(true)
103+
expect(listener.streams).to.include(listenerStream)
104+
expect(isValidTick(listenerStream.timeline.open)).to.equal(true)
105+
106+
await dialerStream.close()
107+
await listenerStream.close()
108+
109+
// Make sure stream is closed properly
110+
const endedStream = await onStreamEndPromise.promise
111+
expect(dialer.streams).to.not.include(endedStream)
112+
113+
if (endedStream.timeline.close == null) {
114+
throw new Error('timeline had no close time')
115+
}
116+
117+
// Make sure the stream is removed from tracking
118+
expect(isValidTick(endedStream.timeline.close)).to.equal(true)
103119

104-
await drainAndClose(dialer)
105-
await drainAndClose(listener)
120+
await dialer.close()
121+
await listener.close()
106122
})
107123

108-
it('Open a stream on both sides', async () => {
124+
it('should open a stream on both sides', async () => {
109125
const p = duplexPair<Uint8Array | Uint8ArrayList>()
110126
const onDialerStreamPromise: DeferredPromise<Stream> = defer()
111127
const onListenerStreamPromise: DeferredPromise<Stream> = defer()
@@ -132,19 +148,19 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
132148
const listenerInitiatorStream = await listener.newStream()
133149

134150
await Promise.all([
135-
drainAndClose(dialerInitiatorStream),
136-
drainAndClose(listenerInitiatorStream),
137-
onDialerStreamPromise.promise.then(async stream => { await drainAndClose(stream) }),
138-
onListenerStreamPromise.promise.then(async stream => { await drainAndClose(stream) })
151+
dialerInitiatorStream.close(),
152+
listenerInitiatorStream.close(),
153+
onDialerStreamPromise.promise.then(async stream => { await stream.close() }),
154+
onListenerStreamPromise.promise.then(async stream => { await stream.close() })
139155
])
140156

141157
await Promise.all([
142-
drainAndClose(dialer),
143-
drainAndClose(listener)
158+
dialer.close(),
159+
listener.close()
144160
])
145161
})
146162

147-
it('Open a stream on one side, write, open a stream on the other side', async () => {
163+
it('should open a stream on one side, write, open a stream on the other side', async () => {
148164
const toString = (source: Source<Uint8ArrayList>): AsyncGenerator<string> => map(source, (u) => uint8ArrayToString(u.subarray()))
149165
const p = duplexPair<Uint8Array | Uint8ArrayList>()
150166
const onDialerStreamPromise: DeferredPromise<Stream> = defer()

0 commit comments

Comments
 (0)