Skip to content

Commit 0e19673

Browse files
authored
ref(core): Improve promise buffer (#17788)
Extracted this out of #17782, this improved our promise buffer class a bit: 1. Remove the code path without a `limit`, as we never use this (there is a default limit used). There is also really no reason to use this without a limit, the limit is the whole purpose of this class. 2. Use a `Set` instead of an array for the internal buffer handling, this should slightly streamline stuff. 3. For `drain`, we can simplify the implementation without a timeout drastically. We can use `Promise.race()` to handle this more gracefully, which should be supported everywhere. 4. Some slight refactorings, actually improving timing semantics slightly.
1 parent 80d5ff2 commit 0e19673

File tree

2 files changed

+180
-70
lines changed

2 files changed

+180
-70
lines changed

packages/core/src/utils/promisebuffer.ts

Lines changed: 28 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import { rejectedSyncPromise, resolvedSyncPromise, SyncPromise } from './syncpromise';
1+
import { rejectedSyncPromise, resolvedSyncPromise } from './syncpromise';
22

33
export interface PromiseBuffer<T> {
44
// exposes the internal array so tests can assert on the state of it.
55
// XXX: this really should not be public api.
6-
$: Array<PromiseLike<T>>;
6+
$: PromiseLike<T>[];
77
add(taskProducer: () => PromiseLike<T>): PromiseLike<T>;
88
drain(timeout?: number): PromiseLike<boolean>;
99
}
@@ -14,11 +14,11 @@ export const SENTRY_BUFFER_FULL_ERROR = Symbol.for('SentryBufferFullError');
1414
* Creates an new PromiseBuffer object with the specified limit
1515
* @param limit max number of promises that can be stored in the buffer
1616
*/
17-
export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {
18-
const buffer: Array<PromiseLike<T>> = [];
17+
export function makePromiseBuffer<T>(limit: number = 100): PromiseBuffer<T> {
18+
const buffer: Set<PromiseLike<T>> = new Set();
1919

2020
function isReady(): boolean {
21-
return limit === undefined || buffer.length < limit;
21+
return buffer.size < limit;
2222
}
2323

2424
/**
@@ -27,8 +27,8 @@ export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {
2727
* @param task Can be any PromiseLike<T>
2828
* @returns Removed promise.
2929
*/
30-
function remove(task: PromiseLike<T>): PromiseLike<T | void> {
31-
return buffer.splice(buffer.indexOf(task), 1)[0] || Promise.resolve(undefined);
30+
function remove(task: PromiseLike<T>): void {
31+
buffer.delete(task);
3232
}
3333

3434
/**
@@ -48,19 +48,11 @@ export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {
4848

4949
// start the task and add its promise to the queue
5050
const task = taskProducer();
51-
if (buffer.indexOf(task) === -1) {
52-
buffer.push(task);
53-
}
54-
void task
55-
.then(() => remove(task))
56-
// Use `then(null, rejectionHandler)` rather than `catch(rejectionHandler)` so that we can use `PromiseLike`
57-
// rather than `Promise`. `PromiseLike` doesn't have a `.catch` method, making its polyfill smaller. (ES5 didn't
58-
// have promises, so TS has to polyfill when down-compiling.)
59-
.then(null, () =>
60-
remove(task).then(null, () => {
61-
// We have to add another catch here because `remove()` starts a new promise chain.
62-
}),
63-
);
51+
buffer.add(task);
52+
void task.then(
53+
() => remove(task),
54+
() => remove(task),
55+
);
6456
return task;
6557
}
6658

@@ -74,34 +66,28 @@ export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {
7466
* `false` otherwise
7567
*/
7668
function drain(timeout?: number): PromiseLike<boolean> {
77-
return new SyncPromise<boolean>((resolve, reject) => {
78-
let counter = buffer.length;
69+
if (!buffer.size) {
70+
return resolvedSyncPromise(true);
71+
}
7972

80-
if (!counter) {
81-
return resolve(true);
82-
}
73+
// We want to resolve even if one of the promises rejects
74+
const drainPromise = Promise.allSettled(Array.from(buffer)).then(() => true);
75+
76+
if (!timeout) {
77+
return drainPromise;
78+
}
8379

84-
// wait for `timeout` ms and then resolve to `false` (if not cancelled first)
85-
const capturedSetTimeout = setTimeout(() => {
86-
if (timeout && timeout > 0) {
87-
resolve(false);
88-
}
89-
}, timeout);
80+
const promises = [drainPromise, new Promise<boolean>(resolve => setTimeout(() => resolve(false), timeout))];
9081

91-
// if all promises resolve in time, cancel the timer and resolve to `true`
92-
buffer.forEach(item => {
93-
void resolvedSyncPromise(item).then(() => {
94-
if (!--counter) {
95-
clearTimeout(capturedSetTimeout);
96-
resolve(true);
97-
}
98-
}, reject);
99-
});
100-
});
82+
// Promise.race will resolve to the first promise that resolves or rejects
83+
// So if the drainPromise resolves, the timeout promise will be ignored
84+
return Promise.race(promises);
10185
}
10286

10387
return {
104-
$: buffer,
88+
get $(): PromiseLike<T>[] {
89+
return Array.from(buffer);
90+
},
10591
add,
10692
drain,
10793
};

packages/core/test/lib/utils/promisebuffer.test.ts

Lines changed: 152 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,163 @@
11
import { describe, expect, test, vi } from 'vitest';
22
import { makePromiseBuffer } from '../../../src/utils/promisebuffer';
3-
import { SyncPromise } from '../../../src/utils/syncpromise';
3+
import { rejectedSyncPromise, resolvedSyncPromise } from '../../../src/utils/syncpromise';
44

55
describe('PromiseBuffer', () => {
66
describe('add()', () => {
7-
test('no limit', () => {
8-
const buffer = makePromiseBuffer();
9-
const p = vi.fn(() => new SyncPromise(resolve => setTimeout(resolve)));
10-
void buffer.add(p);
11-
expect(buffer.$.length).toEqual(1);
7+
test('enforces limit of promises', async () => {
8+
const buffer = makePromiseBuffer(5);
9+
10+
const producer1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
11+
const producer2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
12+
const producer3 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
13+
const producer4 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
14+
const producer5 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
15+
const producer6 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
16+
17+
void buffer.add(producer1);
18+
void buffer.add(producer2);
19+
void buffer.add(producer3);
20+
void buffer.add(producer4);
21+
void buffer.add(producer5);
22+
await expect(buffer.add(producer6)).rejects.toThrowError();
23+
24+
expect(producer1).toHaveBeenCalledTimes(1);
25+
expect(producer2).toHaveBeenCalledTimes(1);
26+
expect(producer3).toHaveBeenCalledTimes(1);
27+
expect(producer4).toHaveBeenCalledTimes(1);
28+
expect(producer5).toHaveBeenCalledTimes(1);
29+
expect(producer6).not.toHaveBeenCalled();
30+
31+
expect(buffer.$.length).toEqual(5);
32+
33+
await buffer.drain();
34+
35+
expect(buffer.$.length).toEqual(0);
36+
37+
expect(producer1).toHaveBeenCalledTimes(1);
38+
expect(producer2).toHaveBeenCalledTimes(1);
39+
expect(producer3).toHaveBeenCalledTimes(1);
40+
expect(producer4).toHaveBeenCalledTimes(1);
41+
expect(producer5).toHaveBeenCalledTimes(1);
42+
expect(producer6).not.toHaveBeenCalled();
43+
});
44+
45+
test('sync promises', async () => {
46+
const buffer = makePromiseBuffer(1);
47+
let task1;
48+
const producer1 = vi.fn(() => {
49+
task1 = resolvedSyncPromise();
50+
return task1;
51+
});
52+
const producer2 = vi.fn(() => resolvedSyncPromise());
53+
expect(buffer.add(producer1)).toEqual(task1);
54+
const add2 = buffer.add(producer2);
55+
56+
// This is immediately executed and removed again from the buffer
57+
expect(buffer.$.length).toEqual(0);
58+
59+
await expect(add2).resolves.toBeUndefined();
60+
61+
expect(producer1).toHaveBeenCalled();
62+
expect(producer2).toHaveBeenCalled();
1263
});
1364

14-
test('with limit', () => {
65+
test('async promises', async () => {
1566
const buffer = makePromiseBuffer(1);
1667
let task1;
1768
const producer1 = vi.fn(() => {
18-
task1 = new SyncPromise(resolve => setTimeout(resolve));
69+
task1 = new Promise(resolve => setTimeout(resolve, 1));
1970
return task1;
2071
});
21-
const producer2 = vi.fn(() => new SyncPromise(resolve => setTimeout(resolve)));
72+
const producer2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
2273
expect(buffer.add(producer1)).toEqual(task1);
23-
void expect(buffer.add(producer2)).rejects.toThrowError();
74+
const add2 = buffer.add(producer2);
75+
2476
expect(buffer.$.length).toEqual(1);
77+
78+
await expect(add2).rejects.toThrowError();
79+
2580
expect(producer1).toHaveBeenCalled();
2681
expect(producer2).not.toHaveBeenCalled();
2782
});
83+
84+
test('handles multiple equivalent promises', async () => {
85+
const buffer = makePromiseBuffer(10);
86+
87+
const promise = new Promise(resolve => setTimeout(resolve, 1));
88+
89+
const producer = vi.fn(() => promise);
90+
const producer2 = vi.fn(() => promise);
91+
92+
expect(buffer.add(producer)).toEqual(promise);
93+
expect(buffer.add(producer2)).toEqual(promise);
94+
95+
expect(buffer.$.length).toEqual(1);
96+
97+
expect(producer).toHaveBeenCalled();
98+
expect(producer2).toHaveBeenCalled();
99+
100+
await buffer.drain();
101+
102+
expect(buffer.$.length).toEqual(0);
103+
});
28104
});
29105

30106
describe('drain()', () => {
31-
test('without timeout', async () => {
107+
test('drains all promises without timeout', async () => {
32108
const buffer = makePromiseBuffer();
33-
for (let i = 0; i < 5; i++) {
34-
void buffer.add(() => new SyncPromise(resolve => setTimeout(resolve)));
35-
}
109+
110+
const p1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
111+
const p2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
112+
const p3 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
113+
const p4 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
114+
const p5 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
115+
116+
[p1, p2, p3, p4, p5].forEach(p => {
117+
void buffer.add(p);
118+
});
119+
36120
expect(buffer.$.length).toEqual(5);
37121
const result = await buffer.drain();
38122
expect(result).toEqual(true);
39123
expect(buffer.$.length).toEqual(0);
124+
125+
expect(p1).toHaveBeenCalled();
126+
expect(p2).toHaveBeenCalled();
127+
expect(p3).toHaveBeenCalled();
128+
expect(p4).toHaveBeenCalled();
129+
expect(p5).toHaveBeenCalled();
40130
});
41131

42-
test('with timeout', async () => {
132+
test('drains all promises with timeout', async () => {
43133
const buffer = makePromiseBuffer();
44-
for (let i = 0; i < 5; i++) {
45-
void buffer.add(() => new SyncPromise(resolve => setTimeout(resolve, 100)));
46-
}
134+
135+
const p1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 2)));
136+
const p2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 4)));
137+
const p3 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 6)));
138+
const p4 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 8)));
139+
const p5 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 10)));
140+
141+
[p1, p2, p3, p4, p5].forEach(p => {
142+
void buffer.add(p);
143+
});
144+
145+
expect(p1).toHaveBeenCalled();
146+
expect(p2).toHaveBeenCalled();
147+
expect(p3).toHaveBeenCalled();
148+
expect(p4).toHaveBeenCalled();
149+
expect(p5).toHaveBeenCalled();
150+
47151
expect(buffer.$.length).toEqual(5);
48-
const result = await buffer.drain(50);
152+
const result = await buffer.drain(8);
49153
expect(result).toEqual(false);
154+
// p5 is still in the buffer
155+
expect(buffer.$.length).toEqual(1);
156+
157+
// Now drain final item
158+
const result2 = await buffer.drain();
159+
expect(result2).toEqual(true);
160+
expect(buffer.$.length).toEqual(0);
50161
});
51162

52163
test('on empty buffer', async () => {
@@ -56,11 +167,26 @@ describe('PromiseBuffer', () => {
56167
expect(result).toEqual(true);
57168
expect(buffer.$.length).toEqual(0);
58169
});
170+
171+
test('resolves even if one of the promises rejects', async () => {
172+
const buffer = makePromiseBuffer();
173+
const p1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
174+
const p2 = vi.fn(() => new Promise((_, reject) => setTimeout(() => reject(new Error('whoops')), 1)));
175+
void buffer.add(p1);
176+
void buffer.add(p2);
177+
178+
const result = await buffer.drain();
179+
expect(result).toEqual(true);
180+
expect(buffer.$.length).toEqual(0);
181+
182+
expect(p1).toHaveBeenCalled();
183+
expect(p2).toHaveBeenCalled();
184+
});
59185
});
60186

61187
test('resolved promises should not show up in buffer length', async () => {
62188
const buffer = makePromiseBuffer();
63-
const producer = () => new SyncPromise(resolve => setTimeout(resolve));
189+
const producer = () => new Promise(resolve => setTimeout(resolve, 1));
64190
const task = buffer.add(producer);
65191
expect(buffer.$.length).toEqual(1);
66192
await task;
@@ -69,20 +195,18 @@ describe('PromiseBuffer', () => {
69195

70196
test('rejected promises should not show up in buffer length', async () => {
71197
const buffer = makePromiseBuffer();
72-
const producer = () => new SyncPromise((_, reject) => setTimeout(reject));
198+
const error = new Error('whoops');
199+
const producer = () => new Promise((_, reject) => setTimeout(() => reject(error), 1));
73200
const task = buffer.add(producer);
74201
expect(buffer.$.length).toEqual(1);
75-
try {
76-
await task;
77-
} catch {
78-
// no-empty
79-
}
202+
203+
await expect(task).rejects.toThrow(error);
80204
expect(buffer.$.length).toEqual(0);
81205
});
82206

83207
test('resolved task should give an access to the return value', async () => {
84208
const buffer = makePromiseBuffer<string>();
85-
const producer = () => new SyncPromise<string>(resolve => setTimeout(() => resolve('test')));
209+
const producer = () => resolvedSyncPromise('test');
86210
const task = buffer.add(producer);
87211
const result = await task;
88212
expect(result).toEqual('test');
@@ -91,7 +215,7 @@ describe('PromiseBuffer', () => {
91215
test('rejected task should give an access to the return value', async () => {
92216
expect.assertions(1);
93217
const buffer = makePromiseBuffer<string>();
94-
const producer = () => new SyncPromise<string>((_, reject) => setTimeout(() => reject(new Error('whoops'))));
218+
const producer = () => rejectedSyncPromise(new Error('whoops'));
95219
const task = buffer.add(producer);
96220
try {
97221
await task;

0 commit comments

Comments
 (0)