From 15360977a1aeb4f7bc5ecebc76e3b5784498e8f6 Mon Sep 17 00:00:00 2001 From: Atticus Date: Fri, 13 Jun 2025 15:12:32 -0700 Subject: [PATCH 1/5] Refactor sync to stream batches --- .../domain/repositories/sync_repository.dart | 372 ++++++++---------- lib/sync/utils/batch_processor.dart | 23 ++ test/sync/utils/batch_processor_test.dart | 40 ++ 3 files changed, 238 insertions(+), 197 deletions(-) diff --git a/lib/sync/domain/repositories/sync_repository.dart b/lib/sync/domain/repositories/sync_repository.dart index d21389eb47..8fb4190cd4 100644 --- a/lib/sync/domain/repositories/sync_repository.dart +++ b/lib/sync/domain/repositories/sync_repository.dart @@ -568,12 +568,8 @@ class _SyncRepository implements SyncRepository { } } } - final fetchPhaseStartDT = DateTime.now(); - logger.d('Fetching all transactions for drive ${drive.id}'); - final transactions = []; - List snapshotItems = []; if (_configService.config.enableSyncFromSnapshot) { @@ -633,79 +629,9 @@ class _SyncRepository implements SyncRepository { final transactionsStream = driveHistory.getNextStream(); - /// The first block height of this drive. - int? firstBlockHeight; - - /// In order to measure the sync progress by the block height, we use the difference - /// between the first block and the `currentBlockHeight` - late int totalBlockHeightDifference; - - /// This percentage is based on block heights. - var fetchPhasePercentage = 0.0; - - /// First phase of the sync - /// Here we get all transactions from its drive. - await for (DriveEntityHistoryTransactionModel t in transactionsStream) { - double calculatePercentageBasedOnBlockHeights() { - final block = t.transactionCommonMixin.block; - - if (block != null) { - return (1 - - ((currentBlockHeight - block.height) / - totalBlockHeightDifference)); - } - - /// if the block is null, we don't calculate and keep the same percentage - return fetchPhasePercentage; - } - - /// Initialize only once `firstBlockHeight` and `totalBlockHeightDifference` - if (firstBlockHeight == null) { - final block = t.transactionCommonMixin.block; - - if (block != null) { - firstBlockHeight = block.height; - totalBlockHeightDifference = currentBlockHeight - firstBlockHeight; - logger.d( - 'First height: $firstBlockHeight, totalHeightDiff: $totalBlockHeightDifference', - ); - } else { - logger.d( - 'The transaction block is null. Transaction node id: ${t.transactionCommonMixin.id}', - ); - } - } - - transactions.add(t); - - /// We can only calculate the fetch percentage if we have the `firstBlockHeight` - if (firstBlockHeight != null) { - if (totalBlockHeightDifference > 0) { - fetchPhasePercentage = calculatePercentageBasedOnBlockHeights(); - } else { - // If the difference is zero means that the first phase was concluded. - logger.d('The syncs first phase just finished!'); - fetchPhasePercentage = 1; - } - final percentage = - calculatePercentageBasedOnBlockHeights() * fetchPhaseWeight; - yield percentage; - } - } - - logger.d('Done fetching data - ${gqlDriveHistory.driveId}'); - - txFechedCallback?.call(drive.id, gqlDriveHistory.txCount); - - final fetchPhaseTotalTime = - DateTime.now().difference(fetchPhaseStartDT).inMilliseconds; - - logger.d( - 'Duration of fetch phase for ${drive.name}: $fetchPhaseTotalTime ms. Progress by block height: $fetchPhasePercentage%. Starting parse phase'); - try { yield* _parseDriveTransactionsIntoDatabaseEntities( - transactions: transactions, + transactions: transactionsStream, drive: drive, driveKey: driveKey?.key, currentBlockHeight: currentBlockHeight, @@ -713,8 +639,6 @@ class _SyncRepository implements SyncRepository { batchSize: transactionParseBatchSize, snapshotDriveHistory: snapshotDriveHistory, ownerAddress: ownerAddress, - ).map( - (parseProgress) => parseProgress * 0.9, ); } catch (e) { logger.e('[Sync Drive] Error while parsing transactions', e); @@ -726,10 +650,7 @@ class _SyncRepository implements SyncRepository { final syncDriveTotalTime = DateTime.now().difference(startSyncDT).inMilliseconds; - final averageBetweenFetchAndGet = fetchPhaseTotalTime / syncDriveTotalTime; - - logger.i( - 'Drive ${drive.name} completed parse phase. Progress by block height: $fetchPhasePercentage%. Starting parse phase. Sync duration: $syncDriveTotalTime ms. Fetching used ${(averageBetweenFetchAndGet * 100).toStringAsFixed(2)}% of drive sync process'); + logger.i('Drive ${drive.name} sync completed in $syncDriveTotalTime ms'); } Future _updateLicenses({ @@ -809,148 +730,205 @@ class _SyncRepository implements SyncRepository { /// Process the transactions from the first phase into database entities. /// This is done in batches to improve performance and provide more granular progress Stream _parseDriveTransactionsIntoDatabaseEntities({ - required List transactions, + required Stream transactions, required Drive drive, required SecretKey? driveKey, required int lastBlockHeight, required int currentBlockHeight, required int batchSize, required SnapshotDriveHistory snapshotDriveHistory, - // required Map ghostFolders, required String ownerAddress, - }) async* { - final numberOfDriveEntitiesToParse = transactions.length; - var numberOfDriveEntitiesParsed = 0; - - double driveEntityParseProgress() => - numberOfDriveEntitiesParsed / numberOfDriveEntitiesToParse; - - if (transactions.isEmpty) { - await _driveDao.writeToDrive( - DrivesCompanion( - id: Value(drive.id), - lastBlockHeight: Value(currentBlockHeight), - syncCursor: const Value(null), - ), - ); + }) { + final controller = StreamController(); - /// If there's nothing to sync, we assume that all were synced + () async { + var numberOfDriveEntitiesToParse = 0; + var numberOfDriveEntitiesParsed = 0; - yield 1; - return; - } + double driveEntityParseProgress() => numberOfDriveEntitiesToParse == 0 + ? 0 + : numberOfDriveEntitiesParsed / numberOfDriveEntitiesToParse; - logger.d( - 'no. of entities in drive with id ${drive.id} to be parsed are: $numberOfDriveEntitiesToParse\n', - ); + final batch = []; + + int? firstBlockHeight; + late int totalBlockHeightDifference; + var fetchPhasePercentage = 0.0; - yield* _batchProcessor.batchProcess( - list: transactions, - batchSize: batchSize, - endOfBatchCallback: (items) async* { - final entityHistory = - await _arweave.createDriveEntityHistoryFromTransactions( - items, + double calculatePercentageBasedOnBlockHeights( + DriveEntityHistoryTransactionModel t) { + final block = t.transactionCommonMixin.block; + if (block != null) { + return (1 - + ((currentBlockHeight - block.height) / + totalBlockHeightDifference)); + } + return fetchPhasePercentage; + } + + await for (final t in transactions) { + if (firstBlockHeight == null) { + final block = t.transactionCommonMixin.block; + if (block != null) { + firstBlockHeight = block.height; + totalBlockHeightDifference = currentBlockHeight - firstBlockHeight!; + } + } + + batch.add(t); + numberOfDriveEntitiesToParse++; + + if (firstBlockHeight != null) { + if (totalBlockHeightDifference > 0) { + fetchPhasePercentage = calculatePercentageBasedOnBlockHeights(t); + } else { + fetchPhasePercentage = 1; + } + controller.add(fetchPhasePercentage * fetchPhaseWeight); + } + + if (batch.length >= batchSize) { + await _processTransactionBatch( + batch, + drive, driveKey, lastBlockHeight, - driveId: drive.id, - ownerAddress: ownerAddress, + currentBlockHeight, + batchSize, + ownerAddress, ); + numberOfDriveEntitiesParsed += batch.length; + controller.add(fetchPhaseWeight + + driveEntityParseProgress() * parsePhaseWeight); + batch.clear(); + } + } - // Create entries for all the new revisions of file and folders in this drive. - final newEntities = entityHistory.blockHistory - .map((b) => b.entities) - .expand((entities) => entities); - - numberOfDriveEntitiesParsed += items.length - newEntities.length; - - yield driveEntityParseProgress(); - - // Handle the last page of newEntities, i.e; There's nothing more to sync - if (newEntities.length < batchSize) { - // Reset the sync cursor after every sync to pick up files from other instances of the app. - // (Different tab, different window, mobile, desktop etc) - await _driveDao.writeToDrive(DrivesCompanion( - id: Value(drive.id), - lastBlockHeight: Value(currentBlockHeight), - syncCursor: const Value(null), - isHidden: Value(drive.isHidden), - )); - } + if (batch.isNotEmpty) { + await _processTransactionBatch( + batch, + drive, + driveKey, + lastBlockHeight, + currentBlockHeight, + batchSize, + ownerAddress, + ); + numberOfDriveEntitiesParsed += batch.length; + controller.add(fetchPhaseWeight + + driveEntityParseProgress() * parsePhaseWeight); + batch.clear(); + } - await _driveDao.runTransaction(() async { - final latestDriveRevision = await _addNewDriveEntityRevisions( - newEntities: newEntities.whereType(), - ); - final latestFolderRevisions = await _addNewFolderEntityRevisions( - driveId: drive.id, - newEntities: newEntities.whereType(), - ); - final latestFileRevisions = await _addNewFileEntityRevisions( - driveId: drive.id, - newEntities: newEntities.whereType(), - ); + if (numberOfDriveEntitiesToParse == 0) { + await _driveDao.writeToDrive( + DrivesCompanion( + id: Value(drive.id), + lastBlockHeight: Value(currentBlockHeight), + syncCursor: const Value(null), + ), + ); + controller.add(1); + } else { + controller.add(fetchPhaseWeight + parsePhaseWeight); + } - for (final entity in latestFileRevisions) { - if (!_folderIds.contains(entity.parentFolderId.value)) { - _ghostFolders.putIfAbsent( - entity.parentFolderId.value, - () => GhostFolder( - driveId: drive.id, - folderId: entity.parentFolderId.value, - isHidden: false, - ), - ); - } - } + controller.close(); - // Check and handle cases where there's no more revisions - final updatedDrive = latestDriveRevision != null - ? await _computeRefreshedDriveFromRevision( - driveDao: _driveDao, - latestRevision: latestDriveRevision, - ) - : null; + logger.i( + 'drive: ${drive.id} sync completed. no. of transactions to be parsed into entities: $numberOfDriveEntitiesToParse. no. of parsed entities: $numberOfDriveEntitiesParsed'); + }(); - final updatedFoldersById = - await _computeRefreshedFolderEntriesFromRevisions( - driveDao: _driveDao, - driveId: drive.id, - revisionsByFolderId: latestFolderRevisions, - ); - final updatedFilesById = - await _computeRefreshedFileEntriesFromRevisions( - driveDao: _driveDao, - driveId: drive.id, - revisionsByFileId: latestFileRevisions, - ); + return controller.stream; + } - numberOfDriveEntitiesParsed += newEntities.length; + Future _processTransactionBatch( + List items, + Drive drive, + SecretKey? driveKey, + int lastBlockHeight, + int currentBlockHeight, + int batchSize, + String ownerAddress, + ) async { + final entityHistory = await _arweave.createDriveEntityHistoryFromTransactions( + items, + driveKey, + lastBlockHeight, + driveId: drive.id, + ownerAddress: ownerAddress, + ); - numberOfDriveEntitiesParsed -= - updatedFoldersById.length + updatedFilesById.length; + final newEntities = entityHistory.blockHistory + .map((b) => b.entities) + .expand((entities) => entities); + + if (newEntities.length < batchSize) { + await _driveDao.writeToDrive(DrivesCompanion( + id: Value(drive.id), + lastBlockHeight: Value(currentBlockHeight), + syncCursor: const Value(null), + isHidden: Value(drive.isHidden), + )); + } - // Update the drive model, making sure to not overwrite the existing keys defined on the drive. - if (updatedDrive != null) { - await _driveDao.updateDrive(updatedDrive); - } + await _driveDao.runTransaction(() async { + final latestDriveRevision = await _addNewDriveEntityRevisions( + newEntities: newEntities.whereType(), + ); + final latestFolderRevisions = await _addNewFolderEntityRevisions( + driveId: drive.id, + newEntities: newEntities.whereType(), + ); + final latestFileRevisions = await _addNewFileEntityRevisions( + driveId: drive.id, + newEntities: newEntities.whereType(), + ); - // Update the folder and file entries before generating their new paths. - await _driveDao - .updateFolderEntries(updatedFoldersById.values.toList()); - await _driveDao.updateFileEntries(updatedFilesById.values.toList()); + for (final entity in latestFileRevisions) { + if (!_folderIds.contains(entity.parentFolderId.value)) { + _ghostFolders.putIfAbsent( + entity.parentFolderId.value, + () => GhostFolder( + driveId: drive.id, + folderId: entity.parentFolderId.value, + isHidden: false, + ), + ); + } + } - numberOfDriveEntitiesParsed += - updatedFoldersById.length + updatedFilesById.length; + final updatedDrive = latestDriveRevision != null + ? await _computeRefreshedDriveFromRevision( + driveDao: _driveDao, + latestRevision: latestDriveRevision, + ) + : null; - latestFolderRevisions.clear(); - latestFileRevisions.clear(); - }); - yield driveEntityParseProgress(); - }); + final updatedFoldersById = + await _computeRefreshedFolderEntriesFromRevisions( + driveDao: _driveDao, + driveId: drive.id, + revisionsByFolderId: latestFolderRevisions, + ); + final updatedFilesById = await _computeRefreshedFileEntriesFromRevisions( + driveDao: _driveDao, + driveId: drive.id, + revisionsByFileId: latestFileRevisions, + ); - logger.i( - 'drive: ${drive.id} sync completed. no. of transactions to be parsed into entities: $numberOfDriveEntitiesToParse. no. of parsed entities: $numberOfDriveEntitiesParsed'); + if (updatedDrive != null) { + await _driveDao.updateDrive(updatedDrive); + } + + await _driveDao.updateFolderEntries(updatedFoldersById.values.toList()); + await _driveDao.updateFileEntries(updatedFilesById.values.toList()); + + latestFolderRevisions.clear(); + latestFileRevisions.clear(); + }); + + return; } /// Computes the new drive revisions from the provided entities, inserts them into the database, diff --git a/lib/sync/utils/batch_processor.dart b/lib/sync/utils/batch_processor.dart index 8c15f526b9..0f80331db3 100644 --- a/lib/sync/utils/batch_processor.dart +++ b/lib/sync/utils/batch_processor.dart @@ -29,4 +29,27 @@ class BatchProcessor { list.clear(); } + + Stream batchProcessStream({ + required Stream stream, + required Stream Function(List items) endOfBatchCallback, + required int batchSize, + }) async* { + if (batchSize <= 0) { + throw ArgumentError('Batch size cannot be 0'); + } + + final buffer = []; + await for (final item in stream) { + buffer.add(item); + if (buffer.length >= batchSize) { + yield* endOfBatchCallback(List.from(buffer)); + buffer.clear(); + } + } + + if (buffer.isNotEmpty) { + yield* endOfBatchCallback(List.from(buffer)); + } + } } diff --git a/test/sync/utils/batch_processor_test.dart b/test/sync/utils/batch_processor_test.dart index aa635ddcf5..ccf0c25d30 100644 --- a/test/sync/utils/batch_processor_test.dart +++ b/test/sync/utils/batch_processor_test.dart @@ -1,3 +1,5 @@ +import 'dart:async'; + import 'package:ardrive/sync/utils/batch_processor.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:mocktail/mocktail.dart'; @@ -18,6 +20,44 @@ void main() { }); }); + group('batchProcessStream', () { + test('should produce no output for an empty stream', () { + final processor = BatchProcessor(); + final controller = StreamController(); + controller.close(); + + expect( + processor.batchProcessStream( + stream: controller.stream, + endOfBatchCallback: mockCallback.call, + batchSize: 5), + emitsDone); + }); + + test('should split stream into batches', () { + final processor = BatchProcessor(); + final controller = StreamController(); + + when(() => mockCallback(any())).thenAnswer((_) async* { + yield 1.0; + }); + + final stream = processor.batchProcessStream( + stream: controller.stream, + endOfBatchCallback: mockCallback.call, + batchSize: 3, + ); + + controller.add(1); + controller.add(2); + controller.add(3); + controller.add(4); + controller.close(); + + expect(stream, emitsInOrder([1.0, 1.0, emitsDone])); + }); + }); + test('should produce no output for an empty list', () { final processor = BatchProcessor(); expect( From 27a64039f4576e4644c3431ea3724245e189043e Mon Sep 17 00:00:00 2001 From: Atticus Date: Fri, 13 Jun 2025 15:32:42 -0700 Subject: [PATCH 2/5] Remove unused batch processor and fix lint --- lib/main.dart | 1 - lib/sync/domain/repositories/sync_repository.dart | 8 +------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/lib/main.dart b/lib/main.dart index e5fc285436..3462d48346 100644 --- a/lib/main.dart +++ b/lib/main.dart @@ -477,7 +477,6 @@ class AppState extends State { configService: configService, driveDao: _.read(), licenseService: _.read(), - batchProcessor: BatchProcessor(), snapshotValidationService: SnapshotValidationService( configService: configService, ), diff --git a/lib/sync/domain/repositories/sync_repository.dart b/lib/sync/domain/repositories/sync_repository.dart index 8fb4190cd4..c855f9da5c 100644 --- a/lib/sync/domain/repositories/sync_repository.dart +++ b/lib/sync/domain/repositories/sync_repository.dart @@ -27,7 +27,6 @@ import 'package:ardrive/sync/data/snapshot_validation_service.dart'; import 'package:ardrive/sync/domain/ghost_folder.dart'; import 'package:ardrive/sync/domain/models/drive_entity_history.dart'; import 'package:ardrive/sync/domain/sync_progress.dart'; -import 'package:ardrive/sync/utils/batch_processor.dart'; import 'package:ardrive/sync/utils/network_transaction_utils.dart'; import 'package:ardrive/user/repositories/user_preferences_repository.dart'; import 'package:ardrive/utils/logger.dart'; @@ -93,7 +92,6 @@ abstract class SyncRepository { required DriveDao driveDao, required ConfigService configService, required LicenseService licenseService, - required BatchProcessor batchProcessor, required SnapshotValidationService snapshotValidationService, required ARNSRepository arnsRepository, required UserPreferencesRepository userPreferencesRepository, @@ -103,7 +101,6 @@ abstract class SyncRepository { driveDao: driveDao, configService: configService, licenseService: licenseService, - batchProcessor: batchProcessor, snapshotValidationService: snapshotValidationService, arnsRepository: arnsRepository, userPreferencesRepository: userPreferencesRepository, @@ -116,7 +113,6 @@ class _SyncRepository implements SyncRepository { final DriveDao _driveDao; final ConfigService _configService; final LicenseService _licenseService; - final BatchProcessor _batchProcessor; final SnapshotValidationService _snapshotValidationService; final ARNSRepository _arnsRepository; final UserPreferencesRepository _userPreferencesRepository; @@ -131,7 +127,6 @@ class _SyncRepository implements SyncRepository { required DriveDao driveDao, required ConfigService configService, required LicenseService licenseService, - required BatchProcessor batchProcessor, required SnapshotValidationService snapshotValidationService, required ARNSRepository arnsRepository, required UserPreferencesRepository userPreferencesRepository, @@ -140,7 +135,6 @@ class _SyncRepository implements SyncRepository { _configService = configService, _licenseService = licenseService, _snapshotValidationService = snapshotValidationService, - _batchProcessor = batchProcessor, _userPreferencesRepository = userPreferencesRepository, _arnsRepository = arnsRepository; @@ -771,7 +765,7 @@ class _SyncRepository implements SyncRepository { final block = t.transactionCommonMixin.block; if (block != null) { firstBlockHeight = block.height; - totalBlockHeightDifference = currentBlockHeight - firstBlockHeight!; + totalBlockHeightDifference = currentBlockHeight - firstBlockHeight; } } From 30452542b8b1f1b9269fdb0414adb981471a3ed6 Mon Sep 17 00:00:00 2001 From: atticusofsparta Date: Mon, 16 Jun 2025 09:30:51 -0600 Subject: [PATCH 3/5] fix(lint): remove unused import --- lib/main.dart | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/main.dart b/lib/main.dart index 3462d48346..4e87859493 100644 --- a/lib/main.dart +++ b/lib/main.dart @@ -26,7 +26,6 @@ import 'package:ardrive/shared/blocs/banner/app_banner_bloc.dart'; import 'package:ardrive/sharing/blocs/sharing_file_bloc.dart'; import 'package:ardrive/sync/data/snapshot_validation_service.dart'; import 'package:ardrive/sync/domain/repositories/sync_repository.dart'; -import 'package:ardrive/sync/utils/batch_processor.dart'; import 'package:ardrive/theme/theme_switcher_bloc.dart'; import 'package:ardrive/theme/theme_switcher_state.dart'; import 'package:ardrive/turbo/services/payment_service.dart'; From f485cb111538bcb026bc90f33b309ed09e83f154 Mon Sep 17 00:00:00 2001 From: Atticus Date: Mon, 16 Jun 2025 10:21:28 -0700 Subject: [PATCH 4/5] Stream parsed entities --- lib/services/arweave/arweave_service.dart | 77 +++++++++++++++++++ .../domain/repositories/sync_repository.dart | 62 ++++++++++++--- 2 files changed, 127 insertions(+), 12 deletions(-) diff --git a/lib/services/arweave/arweave_service.dart b/lib/services/arweave/arweave_service.dart index fea15fb378..bca2c46cac 100644 --- a/lib/services/arweave/arweave_service.dart +++ b/lib/services/arweave/arweave_service.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'dart:collection'; import 'dart:convert'; +import 'dart:typed_data'; import 'package:ardrive/core/crypto/crypto.dart'; import 'package:ardrive/entities/drive_signature.dart'; @@ -427,6 +428,82 @@ class ArweaveService { ); } + /// Streams entities parsed from the provided transactions one by one. + Stream streamEntitiesFromTransactions( + List entityTxs, + SecretKey? driveKey, { + required String ownerAddress, + required DriveID driveId, + }) async* { + final metadataCache = await MetadataCache.fromCacheStore( + await newSharedPreferencesCacheStore(), + ); + + for (final model in entityTxs) { + final transaction = model.transactionCommonMixin; + final tags = HashMap.fromIterable( + transaction.tags, + key: (tag) => tag.name, + value: (tag) => tag.value, + ); + + if (driveKey != null && tags[EntityTag.cipherIv] == null) { + logger.d('skipping unnecessary request for a broken entity'); + continue; + } + + if (transaction.block == null) { + break; + } + + final entityType = tags[EntityTag.entityType]; + final isSnapshot = entityType == EntityTypeTag.snapshot; + + Uint8List rawEntityData = Uint8List(0); + if (!isSnapshot) { + rawEntityData = await _getEntityData( + entityId: transaction.id, + driveId: driveId, + isPrivate: driveKey != null, + ); + await metadataCache.put(transaction.id, rawEntityData); + } + + try { + Entity? entity; + if (entityType == EntityTypeTag.drive) { + entity = await DriveEntity.fromTransaction( + transaction, _crypto, rawEntityData, driveKey); + } else if (entityType == EntityTypeTag.folder) { + entity = await FolderEntity.fromTransaction( + transaction, _crypto, rawEntityData, driveKey); + } else if (entityType == EntityTypeTag.file) { + entity = await FileEntity.fromTransaction( + transaction, + rawEntityData, + driveKey: driveKey, + crypto: _crypto, + ); + } + + if (entity != null && entity.ownerAddress == ownerAddress) { + yield entity; + } + } on EntityTransactionParseException catch (parseException) { + logger.w( + 'Failed to parse transaction with id ${parseException.transactionId}', + ); + } on GatewayError catch (fetchException) { + logger.e( + 'Failed to fetch entity data with the exception ${fetchException.runtimeType}' + ' for transaction ${transaction.id}, ' + ' with status ${fetchException.statusCode} ' + ' and reason ${fetchException.reasonPhrase}', + ); + } + } + } + Future hasUserPrivateDrives( Wallet wallet, { int maxRetries = defaultMaxRetries, diff --git a/lib/sync/domain/repositories/sync_repository.dart b/lib/sync/domain/repositories/sync_repository.dart index c855f9da5c..eae57ee535 100644 --- a/lib/sync/domain/repositories/sync_repository.dart +++ b/lib/sync/domain/repositories/sync_repository.dart @@ -845,19 +845,52 @@ class _SyncRepository implements SyncRepository { int batchSize, String ownerAddress, ) async { - final entityHistory = await _arweave.createDriveEntityHistoryFromTransactions( + final driveEntities = []; + final folderEntities = []; + final fileEntities = []; + var processed = 0; + + await for (final entity in _arweave.streamEntitiesFromTransactions( items, driveKey, - lastBlockHeight, driveId: drive.id, ownerAddress: ownerAddress, - ); + )) { + if (entity is DriveEntity) { + driveEntities.add(entity); + } else if (entity is FolderEntity) { + folderEntities.add(entity); + } else if (entity is FileEntity) { + fileEntities.add(entity); + } + + processed++; + if (processed >= batchSize) { + await _insertEntities( + drive, + driveEntities, + folderEntities, + fileEntities, + ); + driveEntities.clear(); + folderEntities.clear(); + fileEntities.clear(); + processed = 0; + } + } - final newEntities = entityHistory.blockHistory - .map((b) => b.entities) - .expand((entities) => entities); + if (driveEntities.isNotEmpty || + folderEntities.isNotEmpty || + fileEntities.isNotEmpty) { + await _insertEntities( + drive, + driveEntities, + folderEntities, + fileEntities, + ); + } - if (newEntities.length < batchSize) { + if (items.length < batchSize) { await _driveDao.writeToDrive(DrivesCompanion( id: Value(drive.id), lastBlockHeight: Value(currentBlockHeight), @@ -865,18 +898,25 @@ class _SyncRepository implements SyncRepository { isHidden: Value(drive.isHidden), )); } + } + Future _insertEntities( + Drive drive, + List driveEntities, + List folderEntities, + List fileEntities, + ) async { await _driveDao.runTransaction(() async { final latestDriveRevision = await _addNewDriveEntityRevisions( - newEntities: newEntities.whereType(), + newEntities: driveEntities, ); final latestFolderRevisions = await _addNewFolderEntityRevisions( driveId: drive.id, - newEntities: newEntities.whereType(), + newEntities: folderEntities, ); final latestFileRevisions = await _addNewFileEntityRevisions( driveId: drive.id, - newEntities: newEntities.whereType(), + newEntities: fileEntities, ); for (final entity in latestFileRevisions) { @@ -921,8 +961,6 @@ class _SyncRepository implements SyncRepository { latestFolderRevisions.clear(); latestFileRevisions.clear(); }); - - return; } /// Computes the new drive revisions from the provided entities, inserts them into the database, From 56158fc18317acaee25a7e1c270e9979a748e3a1 Mon Sep 17 00:00:00 2001 From: Atticus Date: Mon, 16 Jun 2025 12:05:12 -0700 Subject: [PATCH 5/5] Stream transactions directly into database --- .../domain/repositories/sync_repository.dart | 92 ++++--------------- lib/sync/utils/batch_processor.dart | 23 ----- test/sync/utils/batch_processor_test.dart | 40 -------- 3 files changed, 16 insertions(+), 139 deletions(-) diff --git a/lib/sync/domain/repositories/sync_repository.dart b/lib/sync/domain/repositories/sync_repository.dart index eae57ee535..7b82329dc7 100644 --- a/lib/sync/domain/repositories/sync_repository.dart +++ b/lib/sync/domain/repositories/sync_repository.dart @@ -187,8 +187,6 @@ class _SyncRepository implements SyncRepository { ? 0 : _calculateSyncLastBlockHeight(drive.lastBlockHeight!), currentBlockHeight: currentBlockHeight, - transactionParseBatchSize: - 200 ~/ (syncProgress.drivesCount - syncProgress.drivesSynced), ownerAddress: drive.ownerAddress, txFechedCallback: txFechedCallback, ); @@ -301,7 +299,6 @@ class _SyncRepository implements SyncRepository { ownerAddress: ownerAddress, lastBlockHeight: 0, currentBlockHeight: 0, - transactionParseBatchSize: 200, txFechedCallback: txFechedCallback, ); } @@ -538,7 +535,6 @@ class _SyncRepository implements SyncRepository { SecretKey? cipherKey, required int currentBlockHeight, required int lastBlockHeight, - required int transactionParseBatchSize, required String ownerAddress, Function(String driveId, int txCount)? txFechedCallback, }) async* { @@ -629,9 +625,6 @@ class _SyncRepository implements SyncRepository { drive: drive, driveKey: driveKey?.key, currentBlockHeight: currentBlockHeight, - lastBlockHeight: lastBlockHeight, - batchSize: transactionParseBatchSize, - snapshotDriveHistory: snapshotDriveHistory, ownerAddress: ownerAddress, ); } catch (e) { @@ -727,10 +720,7 @@ class _SyncRepository implements SyncRepository { required Stream transactions, required Drive drive, required SecretKey? driveKey, - required int lastBlockHeight, required int currentBlockHeight, - required int batchSize, - required SnapshotDriveHistory snapshotDriveHistory, required String ownerAddress, }) { final controller = StreamController(); @@ -743,8 +733,6 @@ class _SyncRepository implements SyncRepository { ? 0 : numberOfDriveEntitiesParsed / numberOfDriveEntitiesToParse; - final batch = []; - int? firstBlockHeight; late int totalBlockHeightDifference; var fetchPhasePercentage = 0.0; @@ -769,7 +757,6 @@ class _SyncRepository implements SyncRepository { } } - batch.add(t); numberOfDriveEntitiesToParse++; if (firstBlockHeight != null) { @@ -781,37 +768,15 @@ class _SyncRepository implements SyncRepository { controller.add(fetchPhasePercentage * fetchPhaseWeight); } - if (batch.length >= batchSize) { - await _processTransactionBatch( - batch, - drive, - driveKey, - lastBlockHeight, - currentBlockHeight, - batchSize, - ownerAddress, - ); - numberOfDriveEntitiesParsed += batch.length; - controller.add(fetchPhaseWeight + - driveEntityParseProgress() * parsePhaseWeight); - batch.clear(); - } - } - - if (batch.isNotEmpty) { - await _processTransactionBatch( - batch, + await _processTransaction( + t, drive, driveKey, - lastBlockHeight, - currentBlockHeight, - batchSize, ownerAddress, ); - numberOfDriveEntitiesParsed += batch.length; + numberOfDriveEntitiesParsed++; controller.add(fetchPhaseWeight + driveEntityParseProgress() * parsePhaseWeight); - batch.clear(); } if (numberOfDriveEntitiesToParse == 0) { @@ -836,22 +801,18 @@ class _SyncRepository implements SyncRepository { return controller.stream; } - Future _processTransactionBatch( - List items, + Future _processTransaction( + DriveEntityHistoryTransactionModel item, Drive drive, SecretKey? driveKey, - int lastBlockHeight, - int currentBlockHeight, - int batchSize, String ownerAddress, ) async { final driveEntities = []; final folderEntities = []; final fileEntities = []; - var processed = 0; await for (final entity in _arweave.streamEntitiesFromTransactions( - items, + [item], driveKey, driveId: drive.id, ownerAddress: ownerAddress, @@ -863,41 +824,20 @@ class _SyncRepository implements SyncRepository { } else if (entity is FileEntity) { fileEntities.add(entity); } - - processed++; - if (processed >= batchSize) { - await _insertEntities( - drive, - driveEntities, - folderEntities, - fileEntities, - ); - driveEntities.clear(); - folderEntities.clear(); - fileEntities.clear(); - processed = 0; - } } - if (driveEntities.isNotEmpty || - folderEntities.isNotEmpty || - fileEntities.isNotEmpty) { - await _insertEntities( - drive, - driveEntities, - folderEntities, - fileEntities, - ); + if (driveEntities.isEmpty && + folderEntities.isEmpty && + fileEntities.isEmpty) { + return; } - if (items.length < batchSize) { - await _driveDao.writeToDrive(DrivesCompanion( - id: Value(drive.id), - lastBlockHeight: Value(currentBlockHeight), - syncCursor: const Value(null), - isHidden: Value(drive.isHidden), - )); - } + await _insertEntities( + drive, + driveEntities, + folderEntities, + fileEntities, + ); } Future _insertEntities( diff --git a/lib/sync/utils/batch_processor.dart b/lib/sync/utils/batch_processor.dart index 0f80331db3..8c15f526b9 100644 --- a/lib/sync/utils/batch_processor.dart +++ b/lib/sync/utils/batch_processor.dart @@ -29,27 +29,4 @@ class BatchProcessor { list.clear(); } - - Stream batchProcessStream({ - required Stream stream, - required Stream Function(List items) endOfBatchCallback, - required int batchSize, - }) async* { - if (batchSize <= 0) { - throw ArgumentError('Batch size cannot be 0'); - } - - final buffer = []; - await for (final item in stream) { - buffer.add(item); - if (buffer.length >= batchSize) { - yield* endOfBatchCallback(List.from(buffer)); - buffer.clear(); - } - } - - if (buffer.isNotEmpty) { - yield* endOfBatchCallback(List.from(buffer)); - } - } } diff --git a/test/sync/utils/batch_processor_test.dart b/test/sync/utils/batch_processor_test.dart index ccf0c25d30..aa635ddcf5 100644 --- a/test/sync/utils/batch_processor_test.dart +++ b/test/sync/utils/batch_processor_test.dart @@ -1,5 +1,3 @@ -import 'dart:async'; - import 'package:ardrive/sync/utils/batch_processor.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:mocktail/mocktail.dart'; @@ -20,44 +18,6 @@ void main() { }); }); - group('batchProcessStream', () { - test('should produce no output for an empty stream', () { - final processor = BatchProcessor(); - final controller = StreamController(); - controller.close(); - - expect( - processor.batchProcessStream( - stream: controller.stream, - endOfBatchCallback: mockCallback.call, - batchSize: 5), - emitsDone); - }); - - test('should split stream into batches', () { - final processor = BatchProcessor(); - final controller = StreamController(); - - when(() => mockCallback(any())).thenAnswer((_) async* { - yield 1.0; - }); - - final stream = processor.batchProcessStream( - stream: controller.stream, - endOfBatchCallback: mockCallback.call, - batchSize: 3, - ); - - controller.add(1); - controller.add(2); - controller.add(3); - controller.add(4); - controller.close(); - - expect(stream, emitsInOrder([1.0, 1.0, emitsDone])); - }); - }); - test('should produce no output for an empty list', () { final processor = BatchProcessor(); expect(