Skip to content

Commit 56158fc

Browse files
Stream transactions directly into database
1 parent f485cb1 commit 56158fc

File tree

3 files changed

+16
-139
lines changed

3 files changed

+16
-139
lines changed

lib/sync/domain/repositories/sync_repository.dart

Lines changed: 16 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,6 @@ class _SyncRepository implements SyncRepository {
187187
? 0
188188
: _calculateSyncLastBlockHeight(drive.lastBlockHeight!),
189189
currentBlockHeight: currentBlockHeight,
190-
transactionParseBatchSize:
191-
200 ~/ (syncProgress.drivesCount - syncProgress.drivesSynced),
192190
ownerAddress: drive.ownerAddress,
193191
txFechedCallback: txFechedCallback,
194192
);
@@ -301,7 +299,6 @@ class _SyncRepository implements SyncRepository {
301299
ownerAddress: ownerAddress,
302300
lastBlockHeight: 0,
303301
currentBlockHeight: 0,
304-
transactionParseBatchSize: 200,
305302
txFechedCallback: txFechedCallback,
306303
);
307304
}
@@ -538,7 +535,6 @@ class _SyncRepository implements SyncRepository {
538535
SecretKey? cipherKey,
539536
required int currentBlockHeight,
540537
required int lastBlockHeight,
541-
required int transactionParseBatchSize,
542538
required String ownerAddress,
543539
Function(String driveId, int txCount)? txFechedCallback,
544540
}) async* {
@@ -629,9 +625,6 @@ class _SyncRepository implements SyncRepository {
629625
drive: drive,
630626
driveKey: driveKey?.key,
631627
currentBlockHeight: currentBlockHeight,
632-
lastBlockHeight: lastBlockHeight,
633-
batchSize: transactionParseBatchSize,
634-
snapshotDriveHistory: snapshotDriveHistory,
635628
ownerAddress: ownerAddress,
636629
);
637630
} catch (e) {
@@ -727,10 +720,7 @@ class _SyncRepository implements SyncRepository {
727720
required Stream<DriveEntityHistoryTransactionModel> transactions,
728721
required Drive drive,
729722
required SecretKey? driveKey,
730-
required int lastBlockHeight,
731723
required int currentBlockHeight,
732-
required int batchSize,
733-
required SnapshotDriveHistory snapshotDriveHistory,
734724
required String ownerAddress,
735725
}) {
736726
final controller = StreamController<double>();
@@ -743,8 +733,6 @@ class _SyncRepository implements SyncRepository {
743733
? 0
744734
: numberOfDriveEntitiesParsed / numberOfDriveEntitiesToParse;
745735

746-
final batch = <DriveEntityHistoryTransactionModel>[];
747-
748736
int? firstBlockHeight;
749737
late int totalBlockHeightDifference;
750738
var fetchPhasePercentage = 0.0;
@@ -769,7 +757,6 @@ class _SyncRepository implements SyncRepository {
769757
}
770758
}
771759

772-
batch.add(t);
773760
numberOfDriveEntitiesToParse++;
774761

775762
if (firstBlockHeight != null) {
@@ -781,37 +768,15 @@ class _SyncRepository implements SyncRepository {
781768
controller.add(fetchPhasePercentage * fetchPhaseWeight);
782769
}
783770

784-
if (batch.length >= batchSize) {
785-
await _processTransactionBatch(
786-
batch,
787-
drive,
788-
driveKey,
789-
lastBlockHeight,
790-
currentBlockHeight,
791-
batchSize,
792-
ownerAddress,
793-
);
794-
numberOfDriveEntitiesParsed += batch.length;
795-
controller.add(fetchPhaseWeight +
796-
driveEntityParseProgress() * parsePhaseWeight);
797-
batch.clear();
798-
}
799-
}
800-
801-
if (batch.isNotEmpty) {
802-
await _processTransactionBatch(
803-
batch,
771+
await _processTransaction(
772+
t,
804773
drive,
805774
driveKey,
806-
lastBlockHeight,
807-
currentBlockHeight,
808-
batchSize,
809775
ownerAddress,
810776
);
811-
numberOfDriveEntitiesParsed += batch.length;
777+
numberOfDriveEntitiesParsed++;
812778
controller.add(fetchPhaseWeight +
813779
driveEntityParseProgress() * parsePhaseWeight);
814-
batch.clear();
815780
}
816781

817782
if (numberOfDriveEntitiesToParse == 0) {
@@ -836,22 +801,18 @@ class _SyncRepository implements SyncRepository {
836801
return controller.stream;
837802
}
838803

839-
Future<void> _processTransactionBatch(
840-
List<DriveEntityHistoryTransactionModel> items,
804+
Future<void> _processTransaction(
805+
DriveEntityHistoryTransactionModel item,
841806
Drive drive,
842807
SecretKey? driveKey,
843-
int lastBlockHeight,
844-
int currentBlockHeight,
845-
int batchSize,
846808
String ownerAddress,
847809
) async {
848810
final driveEntities = <DriveEntity>[];
849811
final folderEntities = <FolderEntity>[];
850812
final fileEntities = <FileEntity>[];
851-
var processed = 0;
852813

853814
await for (final entity in _arweave.streamEntitiesFromTransactions(
854-
items,
815+
[item],
855816
driveKey,
856817
driveId: drive.id,
857818
ownerAddress: ownerAddress,
@@ -863,41 +824,20 @@ class _SyncRepository implements SyncRepository {
863824
} else if (entity is FileEntity) {
864825
fileEntities.add(entity);
865826
}
866-
867-
processed++;
868-
if (processed >= batchSize) {
869-
await _insertEntities(
870-
drive,
871-
driveEntities,
872-
folderEntities,
873-
fileEntities,
874-
);
875-
driveEntities.clear();
876-
folderEntities.clear();
877-
fileEntities.clear();
878-
processed = 0;
879-
}
880827
}
881828

882-
if (driveEntities.isNotEmpty ||
883-
folderEntities.isNotEmpty ||
884-
fileEntities.isNotEmpty) {
885-
await _insertEntities(
886-
drive,
887-
driveEntities,
888-
folderEntities,
889-
fileEntities,
890-
);
829+
if (driveEntities.isEmpty &&
830+
folderEntities.isEmpty &&
831+
fileEntities.isEmpty) {
832+
return;
891833
}
892834

893-
if (items.length < batchSize) {
894-
await _driveDao.writeToDrive(DrivesCompanion(
895-
id: Value(drive.id),
896-
lastBlockHeight: Value(currentBlockHeight),
897-
syncCursor: const Value(null),
898-
isHidden: Value(drive.isHidden),
899-
));
900-
}
835+
await _insertEntities(
836+
drive,
837+
driveEntities,
838+
folderEntities,
839+
fileEntities,
840+
);
901841
}
902842

903843
Future<void> _insertEntities(

lib/sync/utils/batch_processor.dart

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,27 +29,4 @@ class BatchProcessor {
2929

3030
list.clear();
3131
}
32-
33-
Stream<double> batchProcessStream<T>({
34-
required Stream<T> stream,
35-
required Stream<double> Function(List<T> items) endOfBatchCallback,
36-
required int batchSize,
37-
}) async* {
38-
if (batchSize <= 0) {
39-
throw ArgumentError('Batch size cannot be 0');
40-
}
41-
42-
final buffer = <T>[];
43-
await for (final item in stream) {
44-
buffer.add(item);
45-
if (buffer.length >= batchSize) {
46-
yield* endOfBatchCallback(List<T>.from(buffer));
47-
buffer.clear();
48-
}
49-
}
50-
51-
if (buffer.isNotEmpty) {
52-
yield* endOfBatchCallback(List<T>.from(buffer));
53-
}
54-
}
5532
}

test/sync/utils/batch_processor_test.dart

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import 'dart:async';
2-
31
import 'package:ardrive/sync/utils/batch_processor.dart';
42
import 'package:flutter_test/flutter_test.dart';
53
import 'package:mocktail/mocktail.dart';
@@ -20,44 +18,6 @@ void main() {
2018
});
2119
});
2220

23-
group('batchProcessStream', () {
24-
test('should produce no output for an empty stream', () {
25-
final processor = BatchProcessor();
26-
final controller = StreamController<int>();
27-
controller.close();
28-
29-
expect(
30-
processor.batchProcessStream<int>(
31-
stream: controller.stream,
32-
endOfBatchCallback: mockCallback.call,
33-
batchSize: 5),
34-
emitsDone);
35-
});
36-
37-
test('should split stream into batches', () {
38-
final processor = BatchProcessor();
39-
final controller = StreamController<int>();
40-
41-
when(() => mockCallback(any())).thenAnswer((_) async* {
42-
yield 1.0;
43-
});
44-
45-
final stream = processor.batchProcessStream<int>(
46-
stream: controller.stream,
47-
endOfBatchCallback: mockCallback.call,
48-
batchSize: 3,
49-
);
50-
51-
controller.add(1);
52-
controller.add(2);
53-
controller.add(3);
54-
controller.add(4);
55-
controller.close();
56-
57-
expect(stream, emitsInOrder([1.0, 1.0, emitsDone]));
58-
});
59-
});
60-
6121
test('should produce no output for an empty list', () {
6222
final processor = BatchProcessor();
6323
expect(

0 commit comments

Comments
 (0)