diff --git a/README.md b/README.md index d2297eb..989914f 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,9 @@ The official Push Notification adapter for Parse Server. See [Parse Server Push - [HTTP/1.1 Legacy Option](#http11-legacy-option) - [Firebase Client Error](#firebase-client-error) - [Expo Push Options](#expo-push-options) +- [Push Queue](#push-queue) + - [Throttling](#throttling) + - [Push Options](#push-options) - [Bundled with Parse Server](#bundled-with-parse-server) - [Logging](#logging) @@ -181,6 +184,83 @@ expo: { For more information see the [Expo docs](https://docs.expo.dev/push-notifications/overview/). +## Push Queue + +### Throttling + +By default, pushes are sent as fast as possible. However, push providers usually throttle their APIs, so that sending too many pushes notifications within a short time may cause the API to reject requests. To address this, push sending can be throttled per provider by adding the `queue` option to the respective push configuration. + +| Parameter | Default | Optional | Description | +|--------------------------|------------|----------|--------------------------------------------------------------------------------------------| +| `queue.concurrency` | `Infinity` | Yes | The maximum number of pushes to process concurrently. | +| `queue.intervalCapacity` | `Infinity` | Yes | The interval capacity, meaning the maximum number of tasks to process in a given interval. | +| `queue.interval` | `0` | Yes | The interval in milliseconds for the interval capacity. | + +Example configuration to throttle the queue to max. 1 push every 100ms, equivalent to max. 10 pushes per second: + +```js +const parseServerOptions = { + push: { + adapter: new ParsePushAdapter({ + ios: { + // ... + queue: { + concurrency: 1, + intervalCapacity: 1, + interval: 100, + }, + } + }) + } +}; +``` + +Keep in mind that `concurrency: 1` means that pushes are sent in serial. For example, if sending a request to the push provider takes up to 500ms to complete, then the configuration above may be limited to only 2 pushes per second if every request takes 500ms. To address this, you can send pushes in parallel by setting the concurrency to a value greater than `1`, and increasing `intervalCapacity` and `interval` to fully utilize parallelism. + +Example configuration sending pushes in parallel: + +```js +const parseServerOptions = { + push: { + adapter: new ParsePushAdapter({ + ios: { + // ... + queue: { + concurrency: 5, + intervalCapacity: 5, + interval: 500, + }, + } + }) + } +}; +``` + +In the example above, pushes will be sent in bursts of 5 at once, with max. 10 pushes within 1s. On a timeline that means at `t=0ms`, 5 pushes will be sent in parallel. If sending the pushes take less than 500ms, then `intervalCapacity` will still limit to 5 pushes within the first 500ms. At `t=500ms` the second interval begins and another max. 5 pushes are sent in parallel. That effectively means a throughput of up to 10 pushes per second. + +### Push Options + +Each push request may specify the following options for handling in the queue. + +| Parameter | Default | Optional | Description | +|------------------|------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `queue.ttl` | `Infinity` | Yes | The time-to-live of the push in the queue in seconds. If a queued push expires before it is sent to the push provider, it is discarded. Default is `Infinity`, meaning pushes never expire. | +| `queue.priority` | `0` | Yes | The priority of the push in the queue. When processing the queue, pushes are sent in order of their priority. For example, a push with priority `1` is sent before a push with priority `0`. | + +Example push payload: + +```js +pushData = { + queue: { + // Discard after 10 seconds from queue if push has not been sent to push provider yet + ttl: 10, + // Send with higher priority than default pushes + priority: 1, + }, + data: { alert: 'Hello' } +}; +``` + ## Bundled with Parse Server Parse Server already comes bundled with a specific version of the push adapter. This installation is only necessary when customizing the push adapter version that should be used by Parse Server. When using a customized version of the push adapter, ensure that it's compatible with the version of Parse Server you are using. diff --git a/package-lock.json b/package-lock.json index 4396004..cc6096e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,6 +14,7 @@ "expo-server-sdk": "3.15.0", "firebase-admin": "13.4.0", "npmlog": "7.0.1", + "p-queue": "8.1.0", "parse": "6.1.1", "web-push": "3.6.7" }, @@ -3390,6 +3391,11 @@ "node": ">=6" } }, + "node_modules/eventemitter3": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==" + }, "node_modules/events": { "version": "3.3.0", "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", @@ -8244,6 +8250,21 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-queue": { + "version": "8.1.0", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-8.1.0.tgz", + "integrity": "sha512-mxLDbbGIBEXTJL0zEx8JIylaj3xQ7Z/7eEVjcF9fJX4DBiH9oqe+oahYnlKKxm0Ci9TlWTyhSHgygxMxjIB2jw==", + "dependencies": { + "eventemitter3": "^5.0.1", + "p-timeout": "^6.1.2" + }, + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/p-reduce": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/p-reduce/-/p-reduce-2.1.0.tgz", @@ -8253,6 +8274,17 @@ "node": ">=8" } }, + "node_modules/p-timeout": { + "version": "6.1.4", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-6.1.4.tgz", + "integrity": "sha512-MyIV3ZA/PmyBN/ud8vV9XzwTrNtR4jFrObymZYnZqMmW0zA8Z17vnT0rBgFE/TlohB+YCHqXMgZzb3Csp49vqg==", + "engines": { + "node": ">=14.16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/package-json-from-dist": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/package-json-from-dist/-/package-json-from-dist-1.0.0.tgz", @@ -12773,6 +12805,11 @@ "resolved": "https://registry.npmjs.org/event-target-shim/-/event-target-shim-5.0.1.tgz", "integrity": "sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==" }, + "eventemitter3": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==" + }, "events": { "version": "3.3.0", "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", @@ -16181,12 +16218,26 @@ "integrity": "sha512-z4cYYMMdKHzw4O5UkWJImbZynVIo0lSGTXc7bzB1e/rrDqkgGUNysK/o4bTr+0+xKvvLoTyGqYC4Fgljy9qe1Q==", "dev": true }, + "p-queue": { + "version": "8.1.0", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-8.1.0.tgz", + "integrity": "sha512-mxLDbbGIBEXTJL0zEx8JIylaj3xQ7Z/7eEVjcF9fJX4DBiH9oqe+oahYnlKKxm0Ci9TlWTyhSHgygxMxjIB2jw==", + "requires": { + "eventemitter3": "^5.0.1", + "p-timeout": "^6.1.2" + } + }, "p-reduce": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/p-reduce/-/p-reduce-2.1.0.tgz", "integrity": "sha512-2USApvnsutq8uoxZBGbbWM0JIYLiEMJ9RlaN7fAzVNb9OZN0SHjjTTfIcb667XynS5Y1VhwDJVDa72TnPzAYWw==", "dev": true }, + "p-timeout": { + "version": "6.1.4", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-6.1.4.tgz", + "integrity": "sha512-MyIV3ZA/PmyBN/ud8vV9XzwTrNtR4jFrObymZYnZqMmW0zA8Z17vnT0rBgFE/TlohB+YCHqXMgZzb3Csp49vqg==" + }, "package-json-from-dist": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/package-json-from-dist/-/package-json-from-dist-1.0.0.tgz", diff --git a/package.json b/package.json index 2c27acc..5da7b97 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "expo-server-sdk": "3.15.0", "firebase-admin": "13.4.0", "npmlog": "7.0.1", + "p-queue": "8.1.0", "parse": "6.1.1", "web-push": "3.6.7" }, diff --git a/spec/.eslintrc.json b/spec/.eslintrc.json index 44ad90c..af3c16b 100644 --- a/spec/.eslintrc.json +++ b/spec/.eslintrc.json @@ -1,6 +1,6 @@ { "env": { - "jasmine": true + "jasmine": true }, "globals": { "Parse": true diff --git a/spec/ParsePushAdapter.spec.js b/spec/ParsePushAdapter.spec.js index 40d8d0b..0401c57 100644 --- a/spec/ParsePushAdapter.spec.js +++ b/spec/ParsePushAdapter.spec.js @@ -1,15 +1,16 @@ -import { join } from 'path'; -import log from 'npmlog'; import apn from '@parse/node-apn'; -import ParsePushAdapterPackage, { ParsePushAdapter as _ParsePushAdapter, APNS as _APNS, GCM as _GCM, WEB as _WEB, EXPO as _EXPO, utils } from '../src/index.js'; -const ParsePushAdapter = _ParsePushAdapter; -import { randomString } from '../src/PushAdapterUtils.js'; -import MockAPNProvider from './MockAPNProvider.js'; +import log from 'npmlog'; +import { join } from 'path'; import APNS from '../src/APNS.js'; +import EXPO from '../src/EXPO.js'; +import FCM from '../src/FCM.js'; import GCM from '../src/GCM.js'; +import ParsePushAdapterPackage, { APNS as _APNS, EXPO as _EXPO, GCM as _GCM, ParsePushAdapter as _ParsePushAdapter, WEB as _WEB, utils } from '../src/index.js'; +import { randomString } from '../src/PushAdapterUtils.js'; import WEB from '../src/WEB.js'; -import FCM from '../src/FCM.js'; -import EXPO from '../src/EXPO.js'; +import { wait } from './helper.js'; +import MockAPNProvider from './MockAPNProvider.js'; +const ParsePushAdapter = _ParsePushAdapter; describe('ParsePushAdapter', () => { @@ -642,6 +643,99 @@ describe('ParsePushAdapter', () => { }); }); + + it('throttles push sends per provider', async () => { + const pushConfig = { + android: { + senderId: 'id', + apiKey: 'key', + queue: { + concurrency: 1, + intervalCapacity: 1, + interval: 1_000, + }, + }, + }; + const parsePushAdapter = new ParsePushAdapter(pushConfig); + const times = []; + parsePushAdapter.senderMap['android'].send = jasmine.createSpy('send').and.callFake(() => { + times.push(Date.now()); + return Promise.resolve([]); + }); + const installs = [{ deviceType: 'android', deviceToken: 'token' }]; + await Promise.all([ + parsePushAdapter.send({}, installs), + parsePushAdapter.send({}, installs), + ]); + expect(times.length).toBe(2); + expect(times[1] - times[0]).toBeGreaterThanOrEqual(900); + expect(times[1] - times[0]).toBeLessThanOrEqual(1100); + }); + + it('skips queued pushes after ttl expires', async () => { + const pushConfig = { + android: { + senderId: 'id', + apiKey: 'key', + queue: { + concurrency: 1, + intervalCapacity: 1, + interval: 1_000, + }, + }, + }; + const parsePushAdapter = new ParsePushAdapter(pushConfig); + parsePushAdapter.senderMap['android'].send = jasmine.createSpy('send').and.callFake(async () => { + await wait(1_200); + return []; + }); + const installs = [{ deviceType: 'android', deviceToken: 'token' }]; + await Promise.all([ + parsePushAdapter.send({}, installs), + parsePushAdapter.send({ queue: { ttl: 1 } }, installs) + ]); + expect(parsePushAdapter.senderMap['android'].send.calls.count()).toBe(1); + }); + + it('sends higher priority pushes before lower priority ones', async () => { + const pushConfig = { + android: { + senderId: 'id', + apiKey: 'key', + queue: { + concurrency: 1, + intervalCapacity: 1, + interval: 1_000, + }, + }, + }; + const parsePushAdapter = new ParsePushAdapter(pushConfig); + const callOrder = []; + parsePushAdapter.senderMap['android'].send = jasmine.createSpy('send').and.callFake(async (data) => { + callOrder.push(data.id); + await wait(100); + return []; + }); + const installs = [{ deviceType: 'android', deviceToken: 'token' }]; + + // Block queue with task so that the queue scheduler doesn't start processing enqueued items + // immediately; afterwards the scheduler picks the next enqueued item according to priority; + const pBlock = parsePushAdapter.queues.android.enqueue({ task: () => wait(500) }); + // Wait to ensure block item in queue has started + await wait(100); + + await Promise.all([ + pBlock, + parsePushAdapter.send({ id: 3, queue: { priority: 3 }}, installs), + parsePushAdapter.send({ id: 4, queue: { priority: 4 }}, installs), + parsePushAdapter.send({ id: 2, queue: { priority: 2 }}, installs), + parsePushAdapter.send({ id: 0, queue: { priority: 0 }}, installs), + parsePushAdapter.send({ id: 1, queue: { priority: 1 }}, installs), + ]); + expect(callOrder).toEqual([4, 3, 2, 1, 0]); + }); + + it('random string throws with size <=0', () => { expect(() => randomString(0)).toThrow(); }); diff --git a/spec/ThrottleQueue.spec.js b/spec/ThrottleQueue.spec.js new file mode 100644 index 0000000..0307cb1 --- /dev/null +++ b/spec/ThrottleQueue.spec.js @@ -0,0 +1,66 @@ +import ThrottleQueue from '../src/ThrottleQueue.js'; +import { wait } from './helper.js'; + +describe('ThrottleQueue', () => { + it('processes items respecting rate limit', async () => { + const q = new ThrottleQueue({ concurrency: 1, intervalCap: 1, interval: 1_000 }); + const times = []; + const p1 = q.enqueue({ task: () => { times.push(Date.now()) } }); + const p2 = q.enqueue({ task: () => { times.push(Date.now()) } }); + await Promise.all([p1, p2]); + expect(times.length).toBe(2); + expect(times[1] - times[0]).toBeGreaterThanOrEqual(900); + expect(times[1] - times[0]).toBeLessThanOrEqual(1100); + }); + + it('drops expired items', async () => { + const q = new ThrottleQueue({ concurrency: 1, intervalCap: 1, interval: 1_000 }); + const results = []; + const p1 = q.enqueue({ task: () => wait(1_200) }); + const p2 = q.enqueue({ task: () => { throw new Error('should not run'); }, ttl: 1 }); + const p3 = q.enqueue({ task: () => { results.push('run'); } }); + await Promise.all([p1, p2, p3]); + expect(results).toEqual(['run']); + }); + + it('processes higher priority tasks first', async () => { + const q = new ThrottleQueue({ concurrency: 1, intervalCap: 1, interval: 1_000 }); + const results = []; + + // Block queue with task so that the queue scheduler doesn't start processing enqueued items + // immediately; afterwards the scheduler picks the next enqueued item according to priority + const pBlock = q.enqueue({ task: () => wait(500) }); + // Wait to ensure block item in queue has started + await wait(100); + + const p7 = q.enqueue({ task: () => results.push('priority7'), priority: 7 }); + const p4 = q.enqueue({ task: () => results.push('priority4'), priority: 4 }); + const p2 = q.enqueue({ task: () => results.push('priority2'), priority: 2 }); + const p0 = q.enqueue({ task: () => results.push('priority0'), priority: 0 }); + const p6 = q.enqueue({ task: () => results.push('priority6'), priority: 6 }); + const p1 = q.enqueue({ task: () => results.push('priority1'), priority: 1 }); + const p3 = q.enqueue({ task: () => results.push('priority3'), priority: 3 }); + const p5 = q.enqueue({ task: () => results.push('priority5'), priority: 5 }); + await Promise.all([ + pBlock, + p3, + p2, + p4, + p7, + p0, + p6, + p5, + p1, + ]); + expect(results).toEqual([ + 'priority7', + 'priority6', + 'priority5', + 'priority4', + 'priority3', + 'priority2', + 'priority1', + 'priority0', + ]); + }); +}); diff --git a/spec/helper.js b/spec/helper.js index f01497b..e3c445e 100644 --- a/spec/helper.js +++ b/spec/helper.js @@ -1,12 +1,19 @@ import { SpecReporter } from 'jasmine-spec-reporter'; -import { fileURLToPath } from 'url'; import { dirname } from 'path'; +import { setTimeout } from 'timers'; +import { fileURLToPath } from 'url'; const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); global.__dirname = __dirname; -jasmine.DEFAULT_TIMEOUT_INTERVAL = process.env.PARSE_SERVER_TEST_TIMEOUT || 5000; +jasmine.DEFAULT_TIMEOUT_INTERVAL = process.env.PARSE_SERVER_TEST_TIMEOUT || 10_000; jasmine.getEnv().clearReporters(); jasmine.getEnv().addReporter(new SpecReporter()); + +const wait = (ms) => new Promise(resolve => setTimeout(resolve, ms)); + +export { + wait +}; diff --git a/src/ParsePushAdapter.js b/src/ParsePushAdapter.js index 5186769..8a8786d 100644 --- a/src/ParsePushAdapter.js +++ b/src/ParsePushAdapter.js @@ -7,6 +7,7 @@ import FCM from './FCM.js'; import WEB from './WEB.js'; import EXPO from './EXPO.js'; import { classifyInstallations } from './PushAdapterUtils.js'; +import ThrottleQueue from './ThrottleQueue.js'; const LOG_PREFIX = 'parse-server-push-adapter'; @@ -17,6 +18,7 @@ export default class ParsePushAdapter { constructor(pushConfig = {}) { this.validPushTypes = ['ios', 'osx', 'tvos', 'watchos', 'android', 'fcm', 'web', 'expo']; this.senderMap = {}; + this.queues = {}; // used in PushController for Dashboard Features this.feature = { immediatePush: true @@ -55,6 +57,12 @@ export default class ParsePushAdapter { } break; } + const config = pushConfig[pushType]; + const queue = Array.isArray(config) ? config.find(c => c && c.queue)?.queue : config.queue; + if (queue) { + const { concurrency, intervalCapacity, interval } = queue || {}; + this.queues[pushType] = new ThrottleQueue({ concurrency, intervalCap: intervalCapacity, interval }); + } } } @@ -84,6 +92,9 @@ export default class ParsePushAdapter { }) }); sendPromises.push(Promise.all(results)); + } else if (this.queues[pushType]) { + const { ttl, priority } = data?.queue || {}; + sendPromises.push(this.queues[pushType].enqueue({ task: () => sender.send(data, devices), ttl, priority })); } else { sendPromises.push(sender.send(data, devices)); } diff --git a/src/ThrottleQueue.js b/src/ThrottleQueue.js new file mode 100644 index 0000000..3423b4f --- /dev/null +++ b/src/ThrottleQueue.js @@ -0,0 +1,42 @@ +import PQueue from 'p-queue'; + +export default class ThrottleQueue { + + /** + * Creates an instance of ThrottleQueue. If no parameters are provided, then the queue will have no + * throttling and will process tasks as fast as possible. + * + * @param {Object} [options] The options. + * @param {number} [options.concurrency=Infinity] The maximum number of tasks to process concurrently. + * Optional, defaults to `Infinity`, meaning no limit on concurrency. + * @param {number} [options.intervalCap=Infinity] The interval capacity, meaning the maximum number of + * tasks to process in a given interval. Optional, defaults to `Infinity`, meaning no interval limit. + * @param {number} [options.interval=0] The interval in milliseconds for the interval capacity. + * Optional, defaults to `0`, meaning no interval limit. + */ + constructor({ concurrency = Infinity, intervalCap = Infinity, interval = 0 } = {}) { + this.queue = new PQueue({ concurrency, intervalCap, interval }); + } + + /** + * Enqueue a task to be processed by the throttle queue. + * + * @param {Object} options The options for the task. + * @param {Function} options.task The task to be enqueued. + * @param {number} [options.ttl] The time-to-live for the task in seconds. Optional, if provided, + * the task will only be processed if it is still valid when dequeued. + * @param {number} [options.priority=0] The priority of the task. Optional, defaults to 0. Higher + * priority tasks will be processed before lower priority ones. For example, the value 1 has a + * higher priority than 0. + * @returns {Promise} A promise that resolves when the task is processed. + */ + enqueue({ task, ttl, priority = 0 }) { + const expireAt = ttl ? Date.now() + ttl * 1_000 : null; + return this.queue.add(() => { + if (expireAt && Date.now() > expireAt) { + return null; + } + return task(); + }, { priority }); + } +}