diff --git a/src/Akka.Persistence.Sql/Config/SnapshotConfig.cs b/src/Akka.Persistence.Sql/Config/SnapshotConfig.cs index 40f084b2..da86f74e 100644 --- a/src/Akka.Persistence.Sql/Config/SnapshotConfig.cs +++ b/src/Akka.Persistence.Sql/Config/SnapshotConfig.cs @@ -39,6 +39,8 @@ public SnapshotConfig(Configuration.Config config) WarnOnAutoInitializeFail = config.GetBoolean("warn-on-auto-init-fail"); ReadIsolationLevel = config.GetIsolationLevel("read-isolation-level"); WriteIsolationLevel = config.GetIsolationLevel("write-isolation-level"); + MaxSubStreamsForReads = config.GetInt("max-substreams-for-reads", 8); + MaxBatchPerSubStreamRead = config.GetInt("max-batch-per-substream-read", 25); } public string? UseSharedDb { get; } @@ -67,5 +69,8 @@ public SnapshotConfig(Configuration.Config config) public IsolationLevel WriteIsolationLevel { get; } public IsolationLevel ReadIsolationLevel { get; } + public int MaxSubStreamsForReads { get; } + + public int MaxBatchPerSubStreamRead { get; } } } diff --git a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs index dab5541f..1e52f6d2 100644 --- a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs +++ b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs @@ -5,20 +5,80 @@ // ----------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Data; using System.Linq; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Akka.Event; using Akka.Persistence.Sql.Config; using Akka.Persistence.Sql.Db; using Akka.Persistence.Sql.Extensions; using Akka.Streams; +using Akka.Streams.Dsl; using Akka.Util; using LinqToDB; +using LinqToDB.Tools; namespace Akka.Persistence.Sql.Snapshot { + public static class SubFlowExtensions + { + public static Source MergeSubStreamsAsSource(this SubFlow subFlow) + { + return (Source)(subFlow.MergeSubstreams()); + } + } + + public class LatestSnapRequestEntry + { + public LatestSnapRequestEntry(string persistenceId, CancellationToken cancellationToken) + { + PersistenceId = persistenceId; + CTS = cancellationToken; + TCS = new TaskCompletionSource>(TaskCreationOptions.RunContinuationsAsynchronously); + } + + public readonly string PersistenceId; + public readonly TaskCompletionSource> TCS; + public readonly CancellationToken CTS; + } + + public readonly record struct SnapshotReadGroup + { + public SnapshotReadGroup(QueryLatestSnapSet a, List b, Exception? err) + { + this.a = a; + this.b = b; + this.err = err; + } + public QueryLatestSnapSet a { get; } + public List b { get; } + public Exception? err { get; } + public void Deconstruct(out QueryLatestSnapSet a, out List b, out Exception? err) + { + a = this.a; + b = this.b; + err = this.err; + } + } + + public class QueryLatestSnapSet + { + public readonly Dictionary> Entries = new(); + + public void Add(LatestSnapRequestEntry entry) + { + if (Entries.TryGetValue(entry.PersistenceId, out var item) == false) + { + item = Entries[entry.PersistenceId] = new List(); + } + item.Add(entry); + } + } + public class ByteArraySnapshotDao : ISnapshotDao { private readonly AkkaPersistenceDataConnectionFactory _connectionFactory; @@ -29,6 +89,10 @@ public class ByteArraySnapshotDao : ISnapshotDao private readonly CancellationTokenSource _shutdownCts; private readonly SnapshotConfig _snapshotConfig; private readonly IsolationLevel _writeIsolationLevel; + private readonly Channel _pendingLatestChannel; + private readonly Task _latestSnapStream; + private readonly int _maxSubStreamsForReads; + private readonly int _maxBatchPerSubStreamRead; public ByteArraySnapshotDao( AkkaPersistenceDataConnectionFactory connectionFactory, @@ -48,6 +112,210 @@ public ByteArraySnapshotDao( _readIsolationLevel = snapshotConfig.ReadIsolationLevel; _shutdownCts = new CancellationTokenSource(); + _pendingLatestChannel = Channel.CreateUnbounded(); + _maxSubStreamsForReads = _snapshotConfig.MaxSubStreamsForReads; + _maxBatchPerSubStreamRead = _snapshotConfig.MaxBatchPerSubStreamRead; + _latestSnapStream = Source.ChannelReader(_pendingLatestChannel.Reader) + .GroupBy(_maxSubStreamsForReads, a=> a.PersistenceId.GetHashCode()% _maxSubStreamsForReads) + .BatchWeighted( + _maxBatchPerSubStreamRead, + a => 1, + e => + { + var a = new QueryLatestSnapSet(); + a.Add(e); + return a; + }, + (a, e) => + { + a.Add(e); + return a; + }) + .SelectAsync(1, + async a => + { + + using (var connection = _connectionFactory.GetConnection()) + { + if (connection.UseDateTime) + { + //TODO: Consolidate/fixup different rowtype issues. + try + { + var set = await connection.GetTable() + .Where(r => r.PersistenceId.In(a.Entries.Keys)) + .Select( + r => new + { + Created = r.Created, + PersistenceId = r.PersistenceId, + SequenceNumber = r.SequenceNumber, + Manifest = r.Manifest, + Payload = r.Payload, + SerializerId = r.SerializerId, + RowNum = LinqToDB.Sql.Ext.Rank().Over().PartitionBy(r.PersistenceId).OrderByDesc(r.SequenceNumber).ToValue() + }) + .Where(r => r.RowNum == 1) + .Select( + r => new LongSnapshotRow() + { + Created = r.Created.Ticks, + PersistenceId = r.PersistenceId, + SequenceNumber = r.SequenceNumber, + Manifest = r.Manifest, + Payload = r.Payload, + SerializerId = r.SerializerId, + }).ToListAsync(); + return new SnapshotReadGroup(a, set, err: (Exception?)null); + } + catch (Exception ex) + { + return new (a, null, err: ex); + } + } + else + { + try + { + var set = await connection.GetTable() + .Where(r => r.PersistenceId.In(a.Entries.Keys)) + .Select( + r => new + { + Created = r.Created, + PersistenceId = r.PersistenceId, + SequenceNumber = r.SequenceNumber, + Manifest = r.Manifest, + Payload = r.Payload, + SerializerId = r.SerializerId, + LatestRowNum = LinqToDB.Sql.Ext.Rank().Over().PartitionBy(r.PersistenceId).OrderByDesc(r.SequenceNumber).ToValue() + }) + .Where(r => r.LatestRowNum == 1) + .Select( + r => new LongSnapshotRow() + { + Created = r.Created, + PersistenceId = r.PersistenceId, + SequenceNumber = r.SequenceNumber, + Manifest = r.Manifest, + Payload = r.Payload, + SerializerId = r.SerializerId, + }).ToListAsync(); + return new (a, set, err: (Exception?)null); + } + catch (Exception ex) + { + return new (a, null, err: ex); + } + } + } + }).SelectAsync(1, + async (ab) => + { + var (a, b, c) = ab; + if (c != null) + { + foreach (var taskCompletionSourcese in a!.Entries.Values.ToList()) + { + foreach (var taskCompletionSource in taskCompletionSourcese) + { + if (taskCompletionSource.CTS.IsCancellationRequested) + { + taskCompletionSource.TCS.TrySetCanceled(taskCompletionSource.CTS); + } + else + { + + taskCompletionSource.TCS.TrySetException(c); + } + } + } + } + else + { + //TODO: Pool this set: + var tempSet = new List(); + if (b.Count == 0) + { + foreach (var keyValuePair in a.Entries) + { + foreach (var taskCompletionSource in keyValuePair.Value) + { + if (taskCompletionSource.CTS.IsCancellationRequested) + { + taskCompletionSource.TCS.TrySetCanceled(taskCompletionSource.CTS); + } + else + { + taskCompletionSource.TCS.TrySetResult(Option.None); + } + } + } + } + foreach (var result in b) + { + if (a.Entries.TryGetValue(result.PersistenceId, out var toSet)) + { + try + { + var res = _longSerializer.Deserialize(result); + if (res.IsSuccess) + { + foreach (var taskCompletionSource in toSet) + { + if (taskCompletionSource.CTS.IsCancellationRequested) + { + taskCompletionSource.TCS.TrySetCanceled(taskCompletionSource.CTS); + } + else + { + taskCompletionSource.TCS.TrySetResult(res.Success); + } + } + } + else + { + foreach (var taskCompletionSource in toSet) + { + if (taskCompletionSource.CTS.IsCancellationRequested) + { + taskCompletionSource.TCS.TrySetCanceled(taskCompletionSource.CTS); + } + else + { + taskCompletionSource.TCS.TrySetException(res.Failure.Value); + } + } + } + } + catch (Exception e) + { + foreach (var taskCompletionSource in toSet) + { + taskCompletionSource.TCS.TrySetException(e); + } + } + } + else + { + tempSet.Add(result.PersistenceId); + } + + foreach (var se in tempSet) + { + if (a.Entries.TryGetValue(se, out var setNo)) + { + foreach (var taskCompletionSource in setNo) + { + taskCompletionSource.TCS.TrySetResult(Option.None); + } + } + } + } + } + + return Done.Instance; + }).MergeSubStreamsAsSource().RunWith(Sink.Ignore(), materializer); } public async Task DeleteAllSnapshotsAsync( @@ -182,41 +450,50 @@ await connection }); } - public async Task> LatestSnapshotAsync( + public Task> LatestSnapshotAsync( string persistenceId, CancellationToken cancellationToken = default) { - var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token); - return await _connectionFactory.ExecuteWithTransactionAsync( - _readIsolationLevel, - cts.Token, - async (connection, token) => - { - if (connection.UseDateTime) - { - var row = await connection - .GetTable() - .Where(r => r.PersistenceId == persistenceId) - .OrderByDescending(t => t.SequenceNumber) - .FirstOrDefaultAsync(token); - - return row != null - ? _dateTimeSerializer.Deserialize(row).Get() - : Option.None; - } - else - { - var row = await connection - .GetTable() - .Where(r => r.PersistenceId == persistenceId) - .OrderByDescending(t => t.SequenceNumber) - .FirstOrDefaultAsync(token); - - return row != null - ? _longSerializer.Deserialize(row).Get() - : Option.None; - } - }); + var req = new LatestSnapRequestEntry(persistenceId, cancellationToken); + if (_pendingLatestChannel.Writer.TryWrite(req)) + { + return req.TCS.Task; + } + else + { + return Task.FromException>(new Exception("Queue is closed, System may be shutting down!")); + } + //var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token); + //return await _connectionFactory.ExecuteWithTransactionAsync( + // _readIsolationLevel, + // cts.Token, + // async (connection, token) => + // { + // if (connection.UseDateTime) + // { + // var row = await connection + // .GetTable() + // .Where(r => r.PersistenceId == persistenceId) + // .OrderByDescending(t => t.SequenceNumber) + // .FirstOrDefaultAsync(token); + // + // return row != null + // ? _dateTimeSerializer.Deserialize(row).Get() + // : Option.None; + // } + // else + // { + // var row = await connection + // .GetTable() + // .Where(r => r.PersistenceId == persistenceId) + // .OrderByDescending(t => t.SequenceNumber) + // .FirstOrDefaultAsync(token); + // + // return row != null + // ? _longSerializer.Deserialize(row).Get() + // : Option.None; + // } + // }); } public async Task> SnapshotForMaxTimestampAsync( diff --git a/src/Akka.Persistence.Sql/snapshot.conf b/src/Akka.Persistence.Sql/snapshot.conf index c6b8ef8d..ae08c579 100644 --- a/src/Akka.Persistence.Sql/snapshot.conf +++ b/src/Akka.Persistence.Sql/snapshot.conf @@ -49,6 +49,14 @@ # https://learn.microsoft.com/en-us/dotnet/api/system.data.isolationlevel?#fields # Valid values: "read-committed", "read-uncommitted", "repeatable-read", "serializable", "snapshot", or "unspecified" write-isolation-level = unspecified + + # The max number of substreams for reads. + # Ideally a power of 2. + max-substreams-for-reads = 16 + + # The max number of reads per substream. + # For smaller snapshots and overall 'grouping', one can use a larger batch for throughput. + max-batch-per-substream-read = 25 default { schema-name = null