Skip to content

Commit f5cdd0e

Browse files
fix streaming sync
1 parent c05b29c commit f5cdd0e

File tree

2 files changed

+28
-42
lines changed

2 files changed

+28
-42
lines changed

lib/main.dart

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import 'package:ardrive/shared/blocs/banner/app_banner_bloc.dart';
2626
import 'package:ardrive/sharing/blocs/sharing_file_bloc.dart';
2727
import 'package:ardrive/sync/data/snapshot_validation_service.dart';
2828
import 'package:ardrive/sync/domain/repositories/sync_repository.dart';
29-
import 'package:ardrive/theme/theme_switcher_bloc.dart';
3029
import 'package:ardrive/theme/theme_switcher_state.dart';
3130
import 'package:ardrive/turbo/services/payment_service.dart';
3231
import 'package:ardrive/turbo/services/upload_service.dart';

lib/sync/domain/repositories/sync_repository.dart

Lines changed: 28 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import 'package:ardrive/utils/snapshots/snapshot_drive_history.dart';
3838
import 'package:ardrive/utils/snapshots/snapshot_item.dart';
3939
import 'package:ardrive_utils/ardrive_utils.dart';
4040
import 'package:ario_sdk/ario_sdk.dart';
41-
import 'package:pool/pool.dart';
4241
import 'package:arweave/arweave.dart';
4342
import 'package:cryptography/cryptography.dart';
4443
import 'package:drift/drift.dart';
@@ -608,6 +607,8 @@ class _SyncRepository implements SyncRepository {
608607
ownerAddress: ownerAddress,
609608
);
610609

610+
txFechedCallback?.call(drive.id, gqlDriveHistory.txCount);
611+
611612
logger.d('Total range to query for: ${totalRangeToQueryFor.rangeSegments}\n'
612613
'Sub ranges in snapshots (DRIVE ID: $driveId): ${snapshotDriveHistory.subRanges.rangeSegments}\n'
613614
'Sub ranges in GQL (DRIVE ID: $driveId): ${gqlDriveHistorySubRanges.rangeSegments}');
@@ -770,40 +771,6 @@ class _SyncRepository implements SyncRepository {
770771
fileEntities.clear();
771772
}
772773

773-
final pool = Pool(_entityBatchSize);
774-
final runningTasks = <Future<void>>[];
775-
776-
Future<void> parseTransaction(DriveEntityHistoryTransactionModel t) async {
777-
await for (final entity in _arweave.streamEntitiesFromTransactions(
778-
Stream.value(t),
779-
driveKey,
780-
driveId: drive.id,
781-
ownerAddress: ownerAddress,
782-
concurrency: 1,
783-
)) {
784-
if (entity is DriveEntity) {
785-
driveEntities.add(entity);
786-
} else if (entity is FolderEntity) {
787-
folderEntities.add(entity);
788-
} else if (entity is FileEntity) {
789-
fileEntities.add(entity);
790-
}
791-
}
792-
793-
numberOfDriveEntitiesParsed++;
794-
795-
if (driveEntities.length +
796-
folderEntities.length +
797-
fileEntities.length >=
798-
_entityBatchSize) {
799-
await flush();
800-
}
801-
802-
controller.add(
803-
fetchPhaseWeight + driveEntityParseProgress() * parsePhaseWeight,
804-
);
805-
}
806-
807774
await for (final t in transactions) {
808775
if (firstBlockHeight == null) {
809776
final block = t.transactionCommonMixin.block;
@@ -824,16 +791,36 @@ class _SyncRepository implements SyncRepository {
824791
controller.add(fetchPhasePercentage * fetchPhaseWeight);
825792
}
826793

827-
final future = pool.withResource(() => parseTransaction(t));
828-
runningTasks.add(future);
829-
future.whenComplete(() => runningTasks.remove(future));
794+
await for (final entity in _arweave.streamEntitiesFromTransactions(
795+
Stream.value(t),
796+
driveKey,
797+
driveId: drive.id,
798+
ownerAddress: ownerAddress,
799+
concurrency: _entityBatchSize,
800+
)) {
801+
if (entity is DriveEntity) {
802+
driveEntities.add(entity);
803+
} else if (entity is FolderEntity) {
804+
folderEntities.add(entity);
805+
} else if (entity is FileEntity) {
806+
fileEntities.add(entity);
807+
}
830808

831-
if (runningTasks.length >= _entityBatchSize) {
832-
await Future.any(runningTasks);
809+
if (driveEntities.length +
810+
folderEntities.length +
811+
fileEntities.length >=
812+
_entityBatchSize) {
813+
await flush();
814+
}
833815
}
816+
817+
numberOfDriveEntitiesParsed++;
818+
819+
controller.add(
820+
fetchPhaseWeight + driveEntityParseProgress() * parsePhaseWeight,
821+
);
834822
}
835823

836-
await Future.wait(runningTasks);
837824
await flush();
838825

839826
if (numberOfDriveEntitiesToParse == 0) {

0 commit comments

Comments
 (0)