From 22eaa31e553feb5addaeeda79f2ec39762b933a0 Mon Sep 17 00:00:00 2001 From: to11mtm <12536917+to11mtm@users.noreply.github.com> Date: Sun, 11 Aug 2024 17:24:36 -0400 Subject: [PATCH 1/8] WIP Snapshot Load Batch Queue --- .../Snapshot/ByteArraySnapshotDao.cs | 269 +++++++++++++++--- 1 file changed, 237 insertions(+), 32 deletions(-) diff --git a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs index 2ad8f233..b09b4d56 100644 --- a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs +++ b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs @@ -5,20 +5,49 @@ // ----------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Data; using System.Linq; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Akka.Event; using Akka.Persistence.Sql.Config; using Akka.Persistence.Sql.Db; using Akka.Persistence.Sql.Extensions; using Akka.Streams; +using Akka.Streams.Dsl; using Akka.Util; using LinqToDB; +using LinqToDB.Tools; namespace Akka.Persistence.Sql.Snapshot { + public class LatestSnapRequestEntry + { + public LatestSnapRequestEntry(string persistenceId) + { + PersistenceId = persistenceId; + TCS = new TaskCompletionSource>(TaskCreationOptions.RunContinuationsAsynchronously); + } + + public readonly string PersistenceId; + public readonly TaskCompletionSource> TCS; + } + + public class QueryLatestSnapSet + { + public readonly Dictionary>>> Entries = new(); + + public void Add(LatestSnapRequestEntry entry) + { + if (Entries.TryGetValue(entry.PersistenceId, out var item) == false) + { + item = Entries[entry.PersistenceId] = new List>>(); + } + item.Add(entry.TCS); + } + } public class ByteArraySnapshotDao : ISnapshotDao { private readonly AkkaPersistenceDataConnectionFactory _connectionFactory; @@ -29,6 +58,8 @@ public class ByteArraySnapshotDao : ISnapshotDao private readonly CancellationTokenSource _shutdownCts; private readonly SnapshotConfig _snapshotConfig; private readonly IsolationLevel _writeIsolationLevel; + private readonly Channel _pendingLatestChannel; + private readonly Task _latestSnapStream; public ByteArraySnapshotDao( AkkaPersistenceDataConnectionFactory connectionFactory, @@ -48,6 +79,171 @@ public ByteArraySnapshotDao( _readIsolationLevel = snapshotConfig.ReadIsolationLevel; _shutdownCts = new CancellationTokenSource(); + _pendingLatestChannel = Channel.CreateUnbounded(); + _latestSnapStream = Source.ChannelReader(_pendingLatestChannel.Reader).BatchWeighted( + 50, + a => 1, + e => + { + var a = new QueryLatestSnapSet(); + a.Add(e); + return a; + }, + (a, e) => + { + a.Add(e); + return a; + }) + .SelectAsync( + 1, + async a => + { + + using (var connection = _connectionFactory.GetConnection()) + { + if (connection.UseDateTime) + { + //TODO: Make this actually work because at some point we split tables, may need to generalize. + var set = await connection.GetTable() + .Where(r => r.PersistenceId.In(a.Entries.Keys)) + .Select( + r => new + { + Created = r.Created, + PersistenceId = r.PersistenceId, + SequenceNumber = r.SequenceNumber, + Manifest = r.Manifest, + Payload = r.Payload, + SerializerId = r.SerializerId, + RowNum = LinqToDB.Sql.Ext.Rank().Over().PartitionBy(r.PersistenceId).OrderByDesc(r.SequenceNumber).ToValue() + }) + .Where(r => r.RowNum == 1) + .Select( + r => new LongSnapshotRow() + { + Created = r.Created, + PersistenceId = r.PersistenceId, + SequenceNumber = r.SequenceNumber, + Manifest = r.Manifest, + Payload = r.Payload, + SerializerId = r.SerializerId, + }).ToListAsync(); + return (a, set, err: (Exception?)null); + } + else + { + try + { + var set = await connection.GetTable() + .Where(r => r.PersistenceId.In(a.Entries.Keys)) + .Select( + r => new + { + Created = r.Created, + PersistenceId = r.PersistenceId, + SequenceNumber = r.SequenceNumber, + Manifest = r.Manifest, + Payload = r.Payload, + SerializerId = r.SerializerId, + RowNum = LinqToDB.Sql.Ext.Rank().Over().PartitionBy(r.PersistenceId).OrderByDesc(r.SequenceNumber).ToValue() + }) + .Where(r => r.RowNum == 1) + .Select( + r => new LongSnapshotRow() + { + Created = r.Created, + PersistenceId = r.PersistenceId, + SequenceNumber = r.SequenceNumber, + Manifest = r.Manifest, + Payload = r.Payload, + SerializerId = r.SerializerId, + }).ToListAsync(); + return (a, set, err: (Exception?)null); + } + catch (Exception ex) + { + return (a, null, err: ex); + } + } + } + }).Async().Select( + (ab) => + { + var (a, b, c) = ab; + if (c != null) + { + foreach (var taskCompletionSourcese in a!.Entries.Values.ToList()) + { + foreach (var taskCompletionSource in taskCompletionSourcese) + { + taskCompletionSource.TrySetException(c); + } + } + } + else + { + //TODO: Pool this set: + var tempSet = new List(); + if (b.Count == 0) + { + foreach (var keyValuePair in a.Entries) + { + foreach (var taskCompletionSource in keyValuePair.Value) + { + taskCompletionSource.TrySetResult(Option.None); + } + } + } + foreach (var result in b) + { + if (a.Entries.TryGetValue(result.PersistenceId, out var toSet)) + { + try + { + var res = _longSerializer.Deserialize(result); + if (res.IsSuccess) + { + foreach (var taskCompletionSource in toSet) + { + taskCompletionSource.TrySetResult(res.Success); + } + } + else + { + foreach (var taskCompletionSource in toSet) + { + taskCompletionSource.TrySetException(res.Failure.Value); + } + } + } + catch (Exception e) + { + foreach (var taskCompletionSource in toSet) + { + taskCompletionSource.TrySetException(e); + } + } + } + else + { + tempSet.Add(result.PersistenceId); + } + + foreach (var se in tempSet) + { + if (a.Entries.TryGetValue(se, out var setNo)) + { + foreach (var taskCompletionSource in setNo) + { + taskCompletionSource.TrySetResult(Option.None); + } + } + } + } + } + + return Done.Instance; + }).RunWith(Sink.Ignore(), materializer); } public async Task DeleteAllSnapshotsAsync( @@ -182,41 +378,50 @@ await connection }); } - public async Task> LatestSnapshotAsync( + public Task> LatestSnapshotAsync( string persistenceId, CancellationToken cancellationToken = default) { - var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token); - return await _connectionFactory.ExecuteWithTransactionAsync( - _readIsolationLevel, - cts.Token, - async (connection, token) => - { - if (connection.UseDateTime) - { - var row = await connection - .GetTable() - .Where(r => r.PersistenceId == persistenceId) - .OrderByDescending(t => t.SequenceNumber) - .FirstOrDefaultAsync(token); - - return row != null - ? _dateTimeSerializer.Deserialize(row).Get() - : Option.None; - } - else - { - var row = await connection - .GetTable() - .Where(r => r.PersistenceId == persistenceId) - .OrderByDescending(t => t.SequenceNumber) - .FirstOrDefaultAsync(token); - - return row != null - ? _longSerializer.Deserialize(row).Get() - : Option.None; - } - }); + var req = new LatestSnapRequestEntry(persistenceId); + if (_pendingLatestChannel.Writer.TryWrite(req)) + { + return req.TCS.Task; + } + else + { + return Task.FromException>(new Exception("Queue is closed, System may be shutting down!")); + } + //var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token); + //return await _connectionFactory.ExecuteWithTransactionAsync( + // _readIsolationLevel, + // cts.Token, + // async (connection, token) => + // { + // if (connection.UseDateTime) + // { + // var row = await connection + // .GetTable() + // .Where(r => r.PersistenceId == persistenceId) + // .OrderByDescending(t => t.SequenceNumber) + // .FirstOrDefaultAsync(token); + // + // return row != null + // ? _dateTimeSerializer.Deserialize(row).Get() + // : Option.None; + // } + // else + // { + // var row = await connection + // .GetTable() + // .Where(r => r.PersistenceId == persistenceId) + // .OrderByDescending(t => t.SequenceNumber) + // .FirstOrDefaultAsync(token); + // + // return row != null + // ? _longSerializer.Deserialize(row).Get() + // : Option.None; + // } + // }); } public async Task> SnapshotForMaxTimestampAsync( From 39b3701ceaf286fba72eb746baadac7e7e5c9ba3 Mon Sep 17 00:00:00 2001 From: to11mtm <12536917+to11mtm@users.noreply.github.com> Date: Sun, 11 Aug 2024 17:49:50 -0400 Subject: [PATCH 2/8] Find a DSL concern, commit before fixing --- .../Snapshot/ByteArraySnapshotDao.cs | 40 ++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs index b09b4d56..98882fd2 100644 --- a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs +++ b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs @@ -35,6 +35,25 @@ public LatestSnapRequestEntry(string persistenceId) public readonly TaskCompletionSource> TCS; } + public readonly record struct SnapshotReadGroup + { + public SnapshotReadGroup(QueryLatestSnapSet a, List b, Exception? err) + { + this.a = a; + this.b = b; + this.err = err; + } + public QueryLatestSnapSet a { get; } + public List b { get; } + public Exception? err { get; } + public void Deconstruct(out QueryLatestSnapSet a, out List b, out Exception? err) + { + a = this.a; + b = this.b; + err = this.err; + } + } + public class QueryLatestSnapSet { public readonly Dictionary>>> Entries = new(); @@ -80,8 +99,12 @@ public ByteArraySnapshotDao( _shutdownCts = new CancellationTokenSource(); _pendingLatestChannel = Channel.CreateUnbounded(); - _latestSnapStream = Source.ChannelReader(_pendingLatestChannel.Reader).BatchWeighted( - 50, + int maxSubStreamsForReads = 8; // TODO: Configurable + int maxRequestsPerBatch = 50; + _latestSnapStream = Source.ChannelReader(_pendingLatestChannel.Reader) + .GroupBy(maxSubStreamsForReads, a=> a.PersistenceId.GetHashCode()% maxSubStreamsForReads) + .BatchWeighted( + maxRequestsPerBatch, a => 1, e => { @@ -94,8 +117,7 @@ public ByteArraySnapshotDao( a.Add(e); return a; }) - .SelectAsync( - 1, + .SelectAsync(1, async a => { @@ -115,7 +137,7 @@ public ByteArraySnapshotDao( Manifest = r.Manifest, Payload = r.Payload, SerializerId = r.SerializerId, - RowNum = LinqToDB.Sql.Ext.Rank().Over().PartitionBy(r.PersistenceId).OrderByDesc(r.SequenceNumber).ToValue() + RowNum = LinqToDB.Sql.Ext.Rank().Over().PartitionBy(r.PersistenceId).OrderByDesc(r.SequenceNumber).ToValue(), }) .Where(r => r.RowNum == 1) .Select( @@ -128,7 +150,7 @@ public ByteArraySnapshotDao( Payload = r.Payload, SerializerId = r.SerializerId, }).ToListAsync(); - return (a, set, err: (Exception?)null); + return new SnapshotReadGroup(a, set, (Exception?)null); } else { @@ -158,15 +180,15 @@ public ByteArraySnapshotDao( Payload = r.Payload, SerializerId = r.SerializerId, }).ToListAsync(); - return (a, set, err: (Exception?)null); + return new (a, set, err: (Exception?)null); } catch (Exception ex) { - return (a, null, err: ex); + return new (a, null, err: ex); } } } - }).Async().Select( + }).Select( (ab) => { var (a, b, c) = ab; From a3d9db229be949388431448cef864b8dc18f93f0 Mon Sep 17 00:00:00 2001 From: to11mtm <12536917+to11mtm@users.noreply.github.com> Date: Sun, 11 Aug 2024 17:55:39 -0400 Subject: [PATCH 3/8] fix issue --- .../Snapshot/ByteArraySnapshotDao.cs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs index 98882fd2..f316ce56 100644 --- a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs +++ b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs @@ -23,6 +23,15 @@ namespace Akka.Persistence.Sql.Snapshot { + public static class SubFlowExtensions + { + public static Source MergeSubStreamsAsSource(this SubFlow subFlow) + { + return (Source)(subFlow.MergeSubstreams()); + } + } + public class LatestSnapRequestEntry { public LatestSnapRequestEntry(string persistenceId) @@ -265,7 +274,7 @@ public ByteArraySnapshotDao( } return Done.Instance; - }).RunWith(Sink.Ignore(), materializer); + }).MergeSubStreamsAsSource().RunWith(Sink.Ignore(), materializer); } public async Task DeleteAllSnapshotsAsync( From 0be4e4c8e12c51c8f78f897bde4fcc182c29fc70 Mon Sep 17 00:00:00 2001 From: to11mtm <12536917+to11mtm@users.noreply.github.com> Date: Sun, 11 Aug 2024 18:30:28 -0400 Subject: [PATCH 4/8] Fixup datetime case --- .../Snapshot/ByteArraySnapshotDao.cs | 59 +++++++++++-------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs index f316ce56..98a45238 100644 --- a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs +++ b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs @@ -134,32 +134,39 @@ public ByteArraySnapshotDao( { if (connection.UseDateTime) { - //TODO: Make this actually work because at some point we split tables, may need to generalize. - var set = await connection.GetTable() - .Where(r => r.PersistenceId.In(a.Entries.Keys)) - .Select( - r => new - { - Created = r.Created, - PersistenceId = r.PersistenceId, - SequenceNumber = r.SequenceNumber, - Manifest = r.Manifest, - Payload = r.Payload, - SerializerId = r.SerializerId, - RowNum = LinqToDB.Sql.Ext.Rank().Over().PartitionBy(r.PersistenceId).OrderByDesc(r.SequenceNumber).ToValue(), - }) - .Where(r => r.RowNum == 1) - .Select( - r => new LongSnapshotRow() - { - Created = r.Created, - PersistenceId = r.PersistenceId, - SequenceNumber = r.SequenceNumber, - Manifest = r.Manifest, - Payload = r.Payload, - SerializerId = r.SerializerId, - }).ToListAsync(); - return new SnapshotReadGroup(a, set, (Exception?)null); + //TODO: Consolidate/fixup different rowtype issues. + try + { + var set = await connection.GetTable() + .Where(r => r.PersistenceId.In(a.Entries.Keys)) + .Select( + r => new + { + Created = r.Created, + PersistenceId = r.PersistenceId, + SequenceNumber = r.SequenceNumber, + Manifest = r.Manifest, + Payload = r.Payload, + SerializerId = r.SerializerId, + RowNum = LinqToDB.Sql.Ext.Rank().Over().PartitionBy(r.PersistenceId).OrderByDesc(r.SequenceNumber).ToValue() + }) + .Where(r => r.RowNum == 1) + .Select( + r => new LongSnapshotRow() + { + Created = r.Created.Ticks, + PersistenceId = r.PersistenceId, + SequenceNumber = r.SequenceNumber, + Manifest = r.Manifest, + Payload = r.Payload, + SerializerId = r.SerializerId, + }).ToListAsync(); + return new SnapshotReadGroup(a, set, err: (Exception?)null); + } + catch (Exception ex) + { + return new (a, null, err: ex); + } } else { From 2410e71dfcb03e98f803525a3fa57a2be03abc23 Mon Sep 17 00:00:00 2001 From: to11mtm <12536917+to11mtm@users.noreply.github.com> Date: Sun, 11 Aug 2024 18:41:37 -0400 Subject: [PATCH 5/8] Make Deser stage async --- src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs index 98a45238..d5a58496 100644 --- a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs +++ b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs @@ -204,8 +204,8 @@ public ByteArraySnapshotDao( } } } - }).Select( - (ab) => + }).SelectAsync(1, + async (ab) => { var (a, b, c) = ab; if (c != null) From 9a35df230999b89eb7041df6c089331b0c6c33fc Mon Sep 17 00:00:00 2001 From: to11mtm <12536917+to11mtm@users.noreply.github.com> Date: Sun, 11 Aug 2024 19:16:22 -0400 Subject: [PATCH 6/8] Make Snapshot Recovery stream recoverable --- src/Akka.Persistence.Sql/Config/SnapshotConfig.cs | 6 ++++++ .../Snapshot/ByteArraySnapshotDao.cs | 10 ++++++---- src/Akka.Persistence.Sql/snapshot.conf | 8 ++++++++ 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/Akka.Persistence.Sql/Config/SnapshotConfig.cs b/src/Akka.Persistence.Sql/Config/SnapshotConfig.cs index 73950505..aa8f6f27 100644 --- a/src/Akka.Persistence.Sql/Config/SnapshotConfig.cs +++ b/src/Akka.Persistence.Sql/Config/SnapshotConfig.cs @@ -39,8 +39,11 @@ public SnapshotConfig(Configuration.Config config) WarnOnAutoInitializeFail = config.GetBoolean("warn-on-auto-init-fail"); ReadIsolationLevel = config.GetIsolationLevel("read-isolation-level"); WriteIsolationLevel = config.GetIsolationLevel("write-isolation-level"); + MaxSubStreamsForReads = config.GetInt("max-substreams-for-reads", 8); + MaxBatchPerSubStreamRead = config.GetInt("max-batch-per-substream-read", 25); } + public string UseSharedDb { get; } /// @@ -67,5 +70,8 @@ public SnapshotConfig(Configuration.Config config) public IsolationLevel WriteIsolationLevel { get; } public IsolationLevel ReadIsolationLevel { get; } + public int MaxSubStreamsForReads { get; } + + public int MaxBatchPerSubStreamRead { get; } } } diff --git a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs index d5a58496..3a218519 100644 --- a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs +++ b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs @@ -88,6 +88,8 @@ public class ByteArraySnapshotDao : ISnapshotDao private readonly IsolationLevel _writeIsolationLevel; private readonly Channel _pendingLatestChannel; private readonly Task _latestSnapStream; + private readonly int _maxSubStreamsForReads; + private readonly int _maxBatchPerSubStreamRead; public ByteArraySnapshotDao( AkkaPersistenceDataConnectionFactory connectionFactory, @@ -108,12 +110,12 @@ public ByteArraySnapshotDao( _shutdownCts = new CancellationTokenSource(); _pendingLatestChannel = Channel.CreateUnbounded(); - int maxSubStreamsForReads = 8; // TODO: Configurable - int maxRequestsPerBatch = 50; + _maxSubStreamsForReads = _snapshotConfig.MaxSubStreamsForReads; + _maxBatchPerSubStreamRead = _snapshotConfig.MaxBatchPerSubStreamRead; _latestSnapStream = Source.ChannelReader(_pendingLatestChannel.Reader) - .GroupBy(maxSubStreamsForReads, a=> a.PersistenceId.GetHashCode()% maxSubStreamsForReads) + .GroupBy(_maxSubStreamsForReads, a=> a.PersistenceId.GetHashCode()% _maxSubStreamsForReads) .BatchWeighted( - maxRequestsPerBatch, + _maxBatchPerSubStreamRead, a => 1, e => { diff --git a/src/Akka.Persistence.Sql/snapshot.conf b/src/Akka.Persistence.Sql/snapshot.conf index c6b8ef8d..ae08c579 100644 --- a/src/Akka.Persistence.Sql/snapshot.conf +++ b/src/Akka.Persistence.Sql/snapshot.conf @@ -49,6 +49,14 @@ # https://learn.microsoft.com/en-us/dotnet/api/system.data.isolationlevel?#fields # Valid values: "read-committed", "read-uncommitted", "repeatable-read", "serializable", "snapshot", or "unspecified" write-isolation-level = unspecified + + # The max number of substreams for reads. + # Ideally a power of 2. + max-substreams-for-reads = 16 + + # The max number of reads per substream. + # For smaller snapshots and overall 'grouping', one can use a larger batch for throughput. + max-batch-per-substream-read = 25 default { schema-name = null From f35bfd70a514d3e5b9454715e7d0ce9103545576 Mon Sep 17 00:00:00 2001 From: to11mtm <12536917+to11mtm@users.noreply.github.com> Date: Thu, 15 Aug 2024 16:30:42 -0400 Subject: [PATCH 7/8] Add CTS support (kinda) --- .../Snapshot/ByteArraySnapshotDao.cs | 59 +++++++++++++++---- 1 file changed, 47 insertions(+), 12 deletions(-) diff --git a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs index 3a218519..988f7fca 100644 --- a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs +++ b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs @@ -34,14 +34,16 @@ public static class SubFlowExtensions public class LatestSnapRequestEntry { - public LatestSnapRequestEntry(string persistenceId) + public LatestSnapRequestEntry(string persistenceId, CancellationToken cancellationToken) { PersistenceId = persistenceId; + CTS = cancellationToken; TCS = new TaskCompletionSource>(TaskCreationOptions.RunContinuationsAsynchronously); } public readonly string PersistenceId; public readonly TaskCompletionSource> TCS; + public readonly CancellationToken CTS; } public readonly record struct SnapshotReadGroup @@ -65,13 +67,17 @@ public void Deconstruct(out QueryLatestSnapSet a, out List b, o public class QueryLatestSnapSet { - public readonly Dictionary>>> Entries = new(); + public readonly Dictionary> Entries = new(); public void Add(LatestSnapRequestEntry entry) { if (Entries.TryGetValue(entry.PersistenceId, out var item) == false) { - item = Entries[entry.PersistenceId] = new List>>(); + item = Entries[entry.PersistenceId] = new List(); + } + item.Add(entry); + } + } } item.Add(entry.TCS); } @@ -185,9 +191,9 @@ public ByteArraySnapshotDao( Manifest = r.Manifest, Payload = r.Payload, SerializerId = r.SerializerId, - RowNum = LinqToDB.Sql.Ext.Rank().Over().PartitionBy(r.PersistenceId).OrderByDesc(r.SequenceNumber).ToValue() + LatestRowNum = LinqToDB.Sql.Ext.Rank().Over().PartitionBy(r.PersistenceId).OrderByDesc(r.SequenceNumber).ToValue() }) - .Where(r => r.RowNum == 1) + .Where(r => r.LatestRowNum == 1) .Select( r => new LongSnapshotRow() { @@ -216,7 +222,15 @@ public ByteArraySnapshotDao( { foreach (var taskCompletionSource in taskCompletionSourcese) { - taskCompletionSource.TrySetException(c); + if (taskCompletionSource.CTS.IsCancellationRequested) + { + taskCompletionSource.TCS.TrySetCanceled(taskCompletionSource.CTS); + } + else + { + + taskCompletionSource.TCS.TrySetException(c); + } } } } @@ -230,7 +244,14 @@ public ByteArraySnapshotDao( { foreach (var taskCompletionSource in keyValuePair.Value) { - taskCompletionSource.TrySetResult(Option.None); + if (taskCompletionSource.CTS.IsCancellationRequested) + { + taskCompletionSource.TCS.TrySetCanceled(taskCompletionSource.CTS); + } + else + { + taskCompletionSource.TCS.TrySetResult(Option.None); + } } } } @@ -245,14 +266,28 @@ public ByteArraySnapshotDao( { foreach (var taskCompletionSource in toSet) { - taskCompletionSource.TrySetResult(res.Success); + if (taskCompletionSource.CTS.IsCancellationRequested) + { + taskCompletionSource.TCS.TrySetCanceled(taskCompletionSource.CTS); + } + else + { + taskCompletionSource.TCS.TrySetResult(res.Success); + } } } else { foreach (var taskCompletionSource in toSet) { - taskCompletionSource.TrySetException(res.Failure.Value); + if (taskCompletionSource.CTS.IsCancellationRequested) + { + taskCompletionSource.TCS.TrySetCanceled(taskCompletionSource.CTS); + } + else + { + taskCompletionSource.TCS.TrySetException(res.Failure.Value); + } } } } @@ -260,7 +295,7 @@ public ByteArraySnapshotDao( { foreach (var taskCompletionSource in toSet) { - taskCompletionSource.TrySetException(e); + taskCompletionSource.TCS.TrySetException(e); } } } @@ -275,7 +310,7 @@ public ByteArraySnapshotDao( { foreach (var taskCompletionSource in setNo) { - taskCompletionSource.TrySetResult(Option.None); + taskCompletionSource.TCS.TrySetResult(Option.None); } } } @@ -422,7 +457,7 @@ public Task> LatestSnapshotAsync( string persistenceId, CancellationToken cancellationToken = default) { - var req = new LatestSnapRequestEntry(persistenceId); + var req = new LatestSnapRequestEntry(persistenceId, cancellationToken); if (_pendingLatestChannel.Writer.TryWrite(req)) { return req.TCS.Task; From 5729e1ca458cfe73ebe0050df519920d5f4ff0ef Mon Sep 17 00:00:00 2001 From: to11mtm <12536917+to11mtm@users.noreply.github.com> Date: Thu, 15 Aug 2024 16:38:36 -0400 Subject: [PATCH 8/8] fix bad commit --- src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs index 988f7fca..603e8880 100644 --- a/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs +++ b/src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs @@ -78,10 +78,7 @@ public void Add(LatestSnapRequestEntry entry) item.Add(entry); } } - } - item.Add(entry.TCS); - } - } + public class ByteArraySnapshotDao : ISnapshotDao { private readonly AkkaPersistenceDataConnectionFactory _connectionFactory;