From dd27fe6631a40c911003ca8d0f415c3542e460a0 Mon Sep 17 00:00:00 2001 From: David Whittington Date: Wed, 11 Jun 2025 10:51:31 -0500 Subject: [PATCH] feat: add bypassDataItemFilter flag and convert to object parameters (PE-8173) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Rename bypassFilter to bypassBundleFilter for clarity * Add new bypassDataItemFilter flag that only bypasses ANS104_INDEX_FILTER * Convert all queue methods from positional to object parameters * Update system.queueBundle to accept QueueBundleOptions interface * Update DataImporter, Ans104Unbundler, and ANS-104 parser to use object params * Update admin API to support both new and legacy parameter formats * Update data verification worker to use new parameter structure * Update all unit tests to use new object parameter format * Maintain backward compatibility for legacy bypassFilter parameter The new implementation allows independent control of bundle and data item filtering, providing more granular control over the unbundling process. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/lib/ans-104.ts | 15 +++++++-- src/routes/ar-io.ts | 49 ++++++++++++++++++++++------ src/system.ts | 38 +++++++++++++-------- src/workers/ans104-unbundler.test.ts | 15 ++++++--- src/workers/ans104-unbundler.ts | 35 +++++++++++++------- src/workers/data-importer.test.ts | 36 ++++++++++++++++---- src/workers/data-importer.ts | 43 ++++++++++++++++++------ src/workers/data-verification.ts | 37 ++++++++++++--------- 8 files changed, 193 insertions(+), 75 deletions(-) diff --git a/src/lib/ans-104.ts b/src/lib/ans-104.ts index 63481d34..ed10b416 100644 --- a/src/lib/ans-104.ts +++ b/src/lib/ans-104.ts @@ -281,11 +281,13 @@ export class Ans104Parser { parentId, parentIndex, rootParentOffset, + bypassDataItemFilter = false, }: { rootTxId: string; parentId: string; parentIndex: number; rootParentOffset: number; + bypassDataItemFilter?: boolean; }): Promise { // eslint-disable-next-line no-async-promise-executor return new Promise(async (resolve, reject) => { @@ -347,6 +349,7 @@ export class Ans104Parser { parentIndex, bundlePath, rootParentOffset, + bypassDataItemFilter, }, }); this.drainQueue(); @@ -421,8 +424,14 @@ if (!isMainThread) { process.exit(0); } - const { rootTxId, parentId, parentIndex, bundlePath, rootParentOffset } = - message; + const { + rootTxId, + parentId, + parentIndex, + bundlePath, + rootParentOffset, + bypassDataItemFilter = false, + } = message; let stream: fs.ReadStream | undefined = undefined; try { stream = fs.createReadStream(bundlePath); @@ -478,7 +487,7 @@ if (!isMainThread) { rootParentOffset, }); - if (await filter.match(normalizedDataItem)) { + if (bypassDataItemFilter || (await filter.match(normalizedDataItem))) { matchedItemCount++; parentPort?.postMessage({ eventName: DATA_ITEM_MATCHED, diff --git a/src/routes/ar-io.ts b/src/routes/ar-io.ts index 2f63064c..f15dfc29 100644 --- a/src/routes/ar-io.ts +++ b/src/routes/ar-io.ts @@ -257,20 +257,46 @@ arIoRouter.post( express.json(), async (req, res) => { try { - const { id, bypassFilter = true } = req.body; + const { + id, + bypassBundleFilter = true, + bypassDataItemFilter = false, + // Legacy support + bypassFilter, + } = req.body; if (id === undefined) { res.status(400).send("Must provide 'id'"); return; } - if (bypassFilter !== undefined && typeof bypassFilter !== 'boolean') { - res.status(400).send("'bypassFilter' must be a boolean"); + // Handle legacy bypassFilter parameter + const effectiveBypassBundleFilter = + bypassFilter !== undefined ? bypassFilter : bypassBundleFilter; + + if ( + effectiveBypassBundleFilter !== undefined && + typeof effectiveBypassBundleFilter !== 'boolean' + ) { + // Use legacy error message if using legacy parameter + const errorMessage = + bypassFilter !== undefined + ? "'bypassFilter' must be a boolean" + : "'bypassBundleFilter' must be a boolean"; + res.status(400).send(errorMessage); + return; + } + + if ( + bypassDataItemFilter !== undefined && + typeof bypassDataItemFilter !== 'boolean' + ) { + res.status(400).send("'bypassDataItemFilter' must be a boolean"); return; } - // if byPassFilter is false, then queue like queue-tx - if (bypassFilter === false) { + // if bypassBundleFilter is false, then queue like queue-tx + if (effectiveBypassBundleFilter === false) { system.prioritizedTxIds.add(id); system.txFetcher.queueTxId({ txId: id }); res.json({ message: 'TX queued' }); @@ -284,11 +310,14 @@ arIoRouter.post( return; } - const queuedBundle = await system.queueBundle( - { id, root_tx_id: id } as NormalizedDataItem | PartialJsonTransaction, - true, - bypassFilter, - ); + const queuedBundle = await system.queueBundle({ + item: { id, root_tx_id: id } as + | NormalizedDataItem + | PartialJsonTransaction, + prioritized: true, + bypassBundleFilter: effectiveBypassBundleFilter, + bypassDataItemFilter, + }); if (queuedBundle.error !== undefined) { res.status(503).send(queuedBundle.error); diff --git a/src/system.ts b/src/system.ts index a6ba53e7..11385501 100644 --- a/src/system.ts +++ b/src/system.ts @@ -611,11 +611,19 @@ export type QueueBundleResponse = { status: 'skipped' | 'queued' | 'error'; error?: string; }; -export async function queueBundle( - item: NormalizedDataItem | PartialJsonTransaction, - isPrioritized = false, - bypassFilter = false, -): Promise { +export interface QueueBundleOptions { + item: NormalizedDataItem | PartialJsonTransaction; + prioritized?: boolean; + bypassBundleFilter?: boolean; + bypassDataItemFilter?: boolean; +} + +export async function queueBundle({ + item, + prioritized = false, + bypassBundleFilter = false, + bypassDataItemFilter = false, +}: QueueBundleOptions): Promise { try { if ('root_tx_id' in item && item.root_tx_id === null) { log.debug('Skipping download of optimistically indexed data item', { @@ -632,7 +640,10 @@ export async function queueBundle( format: 'ans-104', }); - if (bypassFilter || (await config.ANS104_UNBUNDLE_FILTER.match(item))) { + if ( + bypassBundleFilter || + (await config.ANS104_UNBUNDLE_FILTER.match(item)) + ) { metrics.bundlesMatchedCounter.inc({ bundle_format: 'ans-104' }); const { unbundleFilterId, @@ -662,17 +673,18 @@ export async function queueBundle( return { status: 'skipped' }; } - bundleDataImporter.queueItem( - { + bundleDataImporter.queueItem({ + item: { ...item, index: 'parent_index' in item && item.parent_index !== undefined ? item.parent_index : -1, // parent indexes are not needed for L1 }, - isPrioritized, - bypassFilter, - ); + prioritized, + bypassBundleFilter, + bypassDataItemFilter, + }); metrics.bundlesQueuedCounter.inc({ bundle_format: 'ans-104' }); } else { await db.saveBundle({ @@ -701,7 +713,7 @@ eventEmitter.on( const isPrioritized = prioritizedTxIds.has(item.id); prioritizedTxIds.delete(item.id); - await queueBundle(item, isPrioritized); + await queueBundle({ item, prioritized: isPrioritized }); }, ); @@ -709,7 +721,7 @@ eventEmitter.on( eventEmitter.on( events.ANS104_NESTED_BUNDLE_INDEXED, async (item: NormalizedDataItem | PartialJsonTransaction) => { - await queueBundle(item, true); + await queueBundle({ item, prioritized: true }); }, ); diff --git a/src/workers/ans104-unbundler.test.ts b/src/workers/ans104-unbundler.test.ts index e42a5aa5..c6833189 100644 --- a/src/workers/ans104-unbundler.test.ts +++ b/src/workers/ans104-unbundler.test.ts @@ -68,7 +68,7 @@ describe('Ans104Unbundler', () => { shouldUnbundleMock.mock.mockImplementation(() => false); for (let i = 0; i < 10; i++) { - ans104Unbundler.queueItem(mockItem, false); + ans104Unbundler.queueItem({ item: mockItem, prioritized: false }); } assert.equal(shouldUnbundleMock.mock.calls.length, 10); @@ -77,7 +77,7 @@ describe('Ans104Unbundler', () => { it('should queue item when shouldUnbundle returns true', async () => { for (let i = 0; i < 10; i++) { - ans104Unbundler.queueItem(mockItem, false); + ans104Unbundler.queueItem({ item: mockItem, prioritized: false }); } assert.equal(shouldUnbundleMock.mock.calls.length, 10); @@ -88,7 +88,7 @@ describe('Ans104Unbundler', () => { shouldUnbundleMock.mock.mockImplementation(() => false); for (let i = 0; i < 10; i++) { - ans104Unbundler.queueItem(mockItem, true); + ans104Unbundler.queueItem({ item: mockItem, prioritized: true }); } assert.equal(shouldUnbundleMock.mock.calls.length, 10); @@ -99,7 +99,7 @@ describe('Ans104Unbundler', () => { ans104Unbundler['workerCount'] = 0; for (let i = 0; i < 10; i++) { - ans104Unbundler.queueItem(mockItem, false); + ans104Unbundler.queueItem({ item: mockItem, prioritized: false }); } assert.equal(shouldUnbundleMock.mock.calls.length, 0); @@ -115,7 +115,11 @@ describe('Ans104Unbundler', () => { root_tx_id: 'root_tx_id', } as UnbundleableItem; - await ans104Unbundler.queueItem(mockItem, false, true); + await ans104Unbundler.queueItem({ + item: mockItem, + prioritized: false, + bypassBundleFilter: true, + }); assert.deepEqual( (mockAns104Parser.parseBundle as any).mock.calls[0].arguments[0], @@ -124,6 +128,7 @@ describe('Ans104Unbundler', () => { parentIndex: undefined, rootParentOffset: 0, rootTxId: 'root_tx_id', + bypassDataItemFilter: false, }, ); }); diff --git a/src/workers/ans104-unbundler.ts b/src/workers/ans104-unbundler.ts index 10c6b105..1edc1784 100644 --- a/src/workers/ans104-unbundler.ts +++ b/src/workers/ans104-unbundler.ts @@ -53,7 +53,11 @@ export class Ans104Unbundler { private workerCount: number; private maxQueueSize: number; private queue: queueAsPromised< - { item: UnbundleableItem; bypassFilter: boolean }, + { + item: UnbundleableItem; + bypassBundleFilter: boolean; + bypassDataItemFilter: boolean; + }, void >; private shouldUnbundle: () => boolean; @@ -103,11 +107,17 @@ export class Ans104Unbundler { this.shouldUnbundle = shouldUnbundle; } - async queueItem( - item: UnbundleableItem, - prioritized: boolean | undefined, - bypassFilter = false, - ): Promise { + async queueItem({ + item, + prioritized, + bypassBundleFilter = false, + bypassDataItemFilter = false, + }: { + item: UnbundleableItem; + prioritized?: boolean; + bypassBundleFilter?: boolean; + bypassDataItemFilter?: boolean; + }): Promise { const log = this.log.child({ method: 'queueItem', id: item.id }); if (this.workerCount === 0) { @@ -122,11 +132,11 @@ export class Ans104Unbundler { if (prioritized === true) { log.debug('Queueing prioritized bundle...'); - this.queue.unshift({ item, bypassFilter }); + this.queue.unshift({ item, bypassBundleFilter, bypassDataItemFilter }); log.debug('Prioritized bundle queued.'); } else if (this.queue.length() < this.maxQueueSize) { log.debug('Queueing bundle...'); - this.queue.push({ item, bypassFilter }); + this.queue.push({ item, bypassBundleFilter, bypassDataItemFilter }); log.debug('Bundle queued.'); } else { log.debug('Skipping unbundle, queue is full.'); @@ -135,10 +145,12 @@ export class Ans104Unbundler { async unbundle({ item, - bypassFilter, + bypassBundleFilter, + bypassDataItemFilter, }: { item: UnbundleableItem; - bypassFilter: boolean; + bypassBundleFilter: boolean; + bypassDataItemFilter: boolean; }): Promise { const log = this.log.child({ method: 'unbundle', id: item.id }); try { @@ -153,7 +165,7 @@ export class Ans104Unbundler { // Data item without root_tx_id (should be impossible) throw new Error('Missing root_tx_id on data item.'); } - if (bypassFilter || (await this.filter.match(item))) { + if (bypassBundleFilter || (await this.filter.match(item))) { log.info('Unbundling bundle...'); let rootParentOffset = 0; @@ -171,6 +183,7 @@ export class Ans104Unbundler { parentId: item.id, parentIndex: item.index, rootParentOffset, + bypassDataItemFilter, }); log.info('Bundle unbundled.'); } diff --git a/src/workers/data-importer.test.ts b/src/workers/data-importer.test.ts index 49440e57..71c888ef 100644 --- a/src/workers/data-importer.test.ts +++ b/src/workers/data-importer.test.ts @@ -31,7 +31,12 @@ import { ContiguousDataSource } from '../types.js'; import { DataImporter } from './data-importer.js'; class Ans104UnbundlerStub { - async queueItem(): Promise { + async queueItem(_options: { + item: any; + prioritized?: boolean; + bypassBundleFilter?: boolean; + bypassDataItemFilter?: boolean; + }): Promise { return; } @@ -100,7 +105,10 @@ describe('DataImporter', () => { it('should queue a non-prioritized item if queue is not full', async () => { mock.method(contiguousDataSource, 'getData'); - await bundleDataImporter.queueItem(mockItem, false); + await bundleDataImporter.queueItem({ + item: mockItem, + prioritized: false, + }); assert.deepEqual( (contiguousDataSource.getData as any).mock.calls[0].arguments[0], @@ -111,7 +119,10 @@ describe('DataImporter', () => { it('should not queue a non-prioritized item if queue is full', async () => { mock.method(contiguousDataSource, 'getData'); - await bundleDataImporterWithFullQueue.queueItem(mockItem, false); + await bundleDataImporterWithFullQueue.queueItem({ + item: mockItem, + prioritized: false, + }); assert.equal((contiguousDataSource.getData as any).mock.callCount(), 0); }); @@ -119,7 +130,7 @@ describe('DataImporter', () => { it('should queue a prioritized item if the queue is not full', async () => { mock.method(contiguousDataSource, 'getData'); - await bundleDataImporter.queueItem(mockItem, true); + await bundleDataImporter.queueItem({ item: mockItem, prioritized: true }); assert.deepEqual( (contiguousDataSource.getData as any).mock.calls[0].arguments[0], @@ -130,7 +141,10 @@ describe('DataImporter', () => { it('should queue a prioritized item if the queue is full', async () => { mock.method(contiguousDataSource, 'getData'); - await bundleDataImporterWithFullQueue.queueItem(mockItem, true); + await bundleDataImporterWithFullQueue.queueItem({ + item: mockItem, + prioritized: true, + }); assert.deepEqual( (contiguousDataSource.getData as any).mock.calls[0].arguments[0], @@ -153,12 +167,20 @@ describe('DataImporter', () => { await bundleDataImporter.download({ item: mockItem, prioritized: true, - bypassFilter: false, + bypassBundleFilter: false, + bypassDataItemFilter: false, }); assert.deepEqual( (ans104Unbundler.queueItem as any).mock.calls[0].arguments, - [mockItem, true, false], + [ + { + item: mockItem, + prioritized: true, + bypassBundleFilter: false, + bypassDataItemFilter: false, + }, + ], ); }); diff --git a/src/workers/data-importer.ts b/src/workers/data-importer.ts index 94f1390b..a9008864 100644 --- a/src/workers/data-importer.ts +++ b/src/workers/data-importer.ts @@ -40,7 +40,8 @@ type ImportableItem = AnyContiguousData | UnbundleableItem; interface DataImporterQueueItem { item: ImportableItem; prioritized: boolean | undefined; - bypassFilter: boolean; + bypassBundleFilter: boolean; + bypassDataItemFilter: boolean; } export class DataImporter { @@ -80,11 +81,17 @@ export class DataImporter { ); } - async queueItem( - item: ImportableItem, - prioritized: boolean | undefined, - bypassFilter = false, - ): Promise { + async queueItem({ + item, + prioritized, + bypassBundleFilter = false, + bypassDataItemFilter = false, + }: { + item: ImportableItem; + prioritized?: boolean; + bypassBundleFilter?: boolean; + bypassDataItemFilter?: boolean; + }): Promise { const log = this.log.child({ method: 'queueItem', id: item.id }); if (this.workerCount === 0) { log.debug('Skipping contiguous-data download, no workers.'); @@ -93,11 +100,21 @@ export class DataImporter { if (prioritized === true) { log.debug('Queueing prioritized contiguous data download...'); - this.queue.unshift({ item, prioritized, bypassFilter }); + this.queue.unshift({ + item, + prioritized, + bypassBundleFilter, + bypassDataItemFilter, + }); log.debug('Prioritized contiguous data download queued.'); } else if (this.queue.length() < this.maxQueueSize) { log.debug('Queueing contiguous data download...'); - this.queue.push({ item, prioritized, bypassFilter }); + this.queue.push({ + item, + prioritized, + bypassBundleFilter, + bypassDataItemFilter, + }); log.debug('Contiguous data download queued.'); } else { log.debug('Skipping contiguous data download, queue is full.'); @@ -107,7 +124,8 @@ export class DataImporter { async download({ item, prioritized, - bypassFilter, + bypassBundleFilter, + bypassDataItemFilter, }: DataImporterQueueItem): Promise { const log = this.log.child({ method: 'download', id: item.id }); @@ -118,7 +136,12 @@ export class DataImporter { const hasIndexProperty = this.hasIndexPropery(item); if (this.ans104Unbundler && hasIndexProperty) { log.debug('Data download completed. Queuing for unbundling...'); - this.ans104Unbundler.queueItem(item, prioritized, bypassFilter); + this.ans104Unbundler.queueItem({ + item, + prioritized, + bypassBundleFilter, + bypassDataItemFilter, + }); } else { log.debug( hasIndexProperty diff --git a/src/workers/data-verification.ts b/src/workers/data-verification.ts index 07f9f3e3..e7e33fd0 100644 --- a/src/workers/data-verification.ts +++ b/src/workers/data-verification.ts @@ -43,11 +43,12 @@ export class DataVerificationWorker { private dataRootComputer: DataRootComputer; private dataImporter: DataImporter | undefined; private queueBundle: - | (( - item: NormalizedDataItem | PartialJsonTransaction, - isPrioritized: boolean, - bypassFilter: boolean, - ) => Promise) + | ((options: { + item: NormalizedDataItem | PartialJsonTransaction; + prioritized?: boolean; + bypassBundleFilter?: boolean; + bypassDataItemFilter?: boolean; + }) => Promise) | undefined; private workerCount: number; @@ -71,11 +72,12 @@ export class DataVerificationWorker { dataItemRootTxIndex: DataItemRootTxIndex; contiguousDataSource: ContiguousDataSource; dataImporter?: DataImporter; - queueBundle?: ( - item: NormalizedDataItem | PartialJsonTransaction, - isPrioritized: boolean, - bypassFilter: boolean, - ) => Promise; + queueBundle?: (options: { + item: NormalizedDataItem | PartialJsonTransaction; + prioritized?: boolean; + bypassBundleFilter?: boolean; + bypassDataItemFilter?: boolean; + }) => Promise; workerCount?: number; streamTimeout?: number; interval?: number; @@ -154,13 +156,13 @@ export class DataVerificationWorker { // TODO: consider using bundle index to make this determination if (this.queueBundle && dataAttributes?.hash === undefined) { log.verbose('Root bundle has not been unbundled, queuing...'); - await this.queueBundle( - { id, root_tx_id: id } as + await this.queueBundle({ + item: { id, root_tx_id: id } as | NormalizedDataItem | PartialJsonTransaction, - true, - true, - ); // isPrioritized: true, bypassFilter: true + prioritized: true, + bypassBundleFilter: true, + }); } else { return false; } @@ -189,7 +191,10 @@ export class DataVerificationWorker { log.verbose( 'Computed data root mismatch, queueing for root bundle download from chunks....', ); - await this.dataImporter.queueItem({ id }, true); + await this.dataImporter.queueItem({ + item: { id }, + prioritized: true, + }); } return false;