Skip to content

[DO NOT MERGE] Add transaction test app #529

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Akka.Persistence.Sql.sln
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "scripts", "scripts", "{FD34
build-system\scripts\getReleaseNotes.ps1 = build-system\scripts\getReleaseNotes.ps1
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TransactionTest", "src\Examples\TransactionTest\TransactionTest.csproj", "{D105AF9E-AFF6-4326-AEA2-E9AD8A6B5E8E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -139,6 +141,10 @@ Global
{043A8917-5931-40FC-A093-21BB0AB56875}.Debug|Any CPU.Build.0 = Debug|Any CPU
{043A8917-5931-40FC-A093-21BB0AB56875}.Release|Any CPU.ActiveCfg = Release|Any CPU
{043A8917-5931-40FC-A093-21BB0AB56875}.Release|Any CPU.Build.0 = Release|Any CPU
{D105AF9E-AFF6-4326-AEA2-E9AD8A6B5E8E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D105AF9E-AFF6-4326-AEA2-E9AD8A6B5E8E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D105AF9E-AFF6-4326-AEA2-E9AD8A6B5E8E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D105AF9E-AFF6-4326-AEA2-E9AD8A6B5E8E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -155,6 +161,7 @@ Global
{043A8917-5931-40FC-A093-21BB0AB56875} = {42F8143A-03CA-41DA-B24D-33193ABB252C}
{7537908B-6A42-4E34-889F-6004A97D4987} = {20C26B2D-59EA-421C-9F64-BFFCE3D96260}
{FD345711-9A7C-4168-8766-FCD8115C82E0} = {7537908B-6A42-4E34-889F-6004A97D4987}
{D105AF9E-AFF6-4326-AEA2-E9AD8A6B5E8E} = {42F8143A-03CA-41DA-B24D-33193ABB252C}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {B99E6BB8-642A-4A68-86DF-69567CBA700A}
Expand Down
1 change: 1 addition & 0 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<PackageVersion Include="Akka.Serialization.Hyperion" Version="$(AkkaVersion)" />
<PackageVersion Include="Akka.Streams" Version="$(AkkaVersion)" />
<PackageVersion Include="MathNet.Numerics" Version="5.0.0" />
<PackageVersion Include="System.Data.SqlClient" Version="4.9.0" />
</ItemGroup>
<!-- App dependencies -->
<ItemGroup>
Expand Down
43 changes: 43 additions & 0 deletions src/Examples/TransactionTest/ErrorListenerActor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// -----------------------------------------------------------------------
// <copyright file="ErrorListenerActor.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Actor;
using Akka.Event;

namespace TransactionTest
{
public class ErrorListenerActor: ReceiveActor
{
private static readonly DateTime Epoch = new (1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
private readonly StreamWriter _streamWriter;

public ErrorListenerActor()
{
var timestamp = (long)(DateTime.UtcNow - Epoch).TotalSeconds;
_streamWriter = new StreamWriter(File.Open($"error-{timestamp}.log", FileMode.Create, FileAccess.Write, FileShare.Read));
Context.System.EventStream.Subscribe(Self, typeof(Error));
Context.System.EventStream.Subscribe(Self, typeof(Warning));
ReceiveAsync<Error>(
async err =>
{
await _streamWriter.WriteLineAsync(err.ToString());
});
ReceiveAsync<Warning>(
async warn =>
{
await _streamWriter.WriteLineAsync(warn.ToString());
});
}

protected override void PostStop()
{
base.PostStop();
_streamWriter.Flush();
_streamWriter.Close();
_streamWriter.Dispose();
}
}
}
102 changes: 102 additions & 0 deletions src/Examples/TransactionTest/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
using System.Data;
using Akka.Actor;
using Akka.Event;
using Akka.Hosting;
using Akka.Persistence;
using Akka.Persistence.Sql.Hosting;
using LinqToDB;
using LinqToDB.Data;
using LinqToDB.Data.RetryPolicy;
using LinqToDB.DataProvider.SqlServer;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using TransactionTest;
using LogLevel = Microsoft.Extensions.Logging.LogLevel;

const string connectionString = "Server=localhost, 1433;Database=akka;User Id=sa;Password='Strong(!)Password';TrustServerCertificate=true;";
//const string connectionString = "Server=MY-COMPUTER\\SQLEXPRESS;Database=akka;User Id=sa;Password='Strong(!)Password';TrustServerCertificate=true;";

await Host.CreateDefaultBuilder(args)
.ConfigureLogging(
(context, logger) =>
{
logger.ClearProviders();
logger.AddConsole();
logger.SetMinimumLevel(LogLevel.Debug);
})
.ConfigureServices(
(context, services) =>
{
services.AddHostedService<StressTestService>();
services.AddAkka(
"TestSystem",
(builder, provider) =>
{
builder
.ConfigureLoggers(
logger =>
{
logger.LogLevel = Akka.Event.LogLevel.DebugLevel;
logger.ClearLoggers();
logger.AddLoggerFactory();
})
.WithSqlPersistence(
options =>
{
options.ConnectionString = connectionString;
options.ProviderName = ProviderName.SqlServer2022;
options.DataOptions = new DataOptions()
.WithOptions(
new ConnectionOptions()
.WithConnectionString(connectionString)
.WithProviderName(ProviderName.SqlServer2022))
.WithOptions( new RetryPolicyOptions()
.WithFactory(_ => new SqlServerRetryPolicy(10)));
},
options =>
{
options.ConnectionString = connectionString;
options.ProviderName = ProviderName.SqlServer2022;
options.DataOptions = new DataOptions()
.WithOptions(
new ConnectionOptions()
.WithConnectionString(connectionString)
.WithProviderName(ProviderName.SqlServer2022))
.WithOptions( new RetryPolicyOptions()
.WithFactory(_ => new SqlServerRetryPolicy(10)));
}
)
.AddHocon(
"""
akka.persistence {
journal {
auto-start-journals = [ "akka.persistence.journal.sql" ]
sql {
recovery-event-timeout = 120s
circuit-breaker {
call-timeout = 160s
}
}
}
snapshot-store {
auto-start-snapshot-stores = [ "akka.persistence.snapshot-store.sql" ]
sql {
circuit-breaker {
call-timeout = 160s
}
}
}
}
""", HoconAddMode.Prepend)
.WithActors(
(system, registry) =>
{
var actor = system.ActorOf(Props.Create(() => new ErrorListenerActor()));
registry.Register<ErrorListenerActor>(actor);
});
});
})
.UseConsoleLifetime()
.RunConsoleAsync();

80 changes: 80 additions & 0 deletions src/Examples/TransactionTest/StressTestService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// -----------------------------------------------------------------------
// <copyright file="StressTestService.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Actor;
using Microsoft.Extensions.Hosting;

namespace TransactionTest;

public class StressTestService: IHostedService
{
private const int TotalActors = 500;
private const int LargePayloadActorCount = 20;
private const int LargePayloadSize = 1024 * 1024;
private const int SmallPayloadSize = 1024;
private const int PersistBurstSize = 50;

private readonly Random _random = new();
private readonly IActorRef?[] _actors = new IActorRef[TotalActors];
private readonly IHostApplicationLifetime _applicationLifetime;
private readonly ActorSystem _system;
private CancellationTokenSource? _shutdownCts;
private PeriodicTimer? _timer;
private Task? _timerTask;

public StressTestService(ActorSystem system, IHostApplicationLifetime applicationLifetime)
{
_system = system;
_applicationLifetime = applicationLifetime;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
_shutdownCts = new CancellationTokenSource();
_timer = new PeriodicTimer(TimeSpan.FromMilliseconds(100));
_timerTask = Task.Run(
async () =>
{
try
{
while (await _timer.WaitForNextTickAsync(_shutdownCts.Token))
{
var startIndex = _random.Next(0, TotalActors - PersistBurstSize);
foreach (var index in Enumerable.Range(startIndex, PersistBurstSize))
{
var actor = _actors[index];
if (actor is null)
{
actor = index < LargePayloadActorCount
? _system.ActorOf(Props.Create(() => new TestActor($"p-{index}", LargePayloadSize, _applicationLifetime)), $"p-{index}")
: _system.ActorOf(Props.Create(() => new TestActor($"p-{index}", SmallPayloadSize, _applicationLifetime)), $"p-{index}");
await actor.Ask<Initialized>(Initialize.Instance, _shutdownCts.Token);
_actors[index] = actor;
}
actor.Tell(SaveEvent.Instance);
}
}
}
catch (TimeoutException)
{
// no-op
}
catch (OperationCanceledException)
{
// no-op
}
}, _shutdownCts.Token);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
if(_shutdownCts is not null)
await _shutdownCts.CancelAsync();
if(_timerTask is not null)
await _timerTask;
_timer?.Dispose();
}
}
119 changes: 119 additions & 0 deletions src/Examples/TransactionTest/TestActor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// -----------------------------------------------------------------------
// <copyright file="TestActor.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Event;
using Akka.Persistence;
using Microsoft.Extensions.Hosting;

namespace TransactionTest;

public sealed class SaveEvent
{
public static readonly SaveEvent Instance = new ();
private SaveEvent() { }
}

public sealed class Initialize
{
public static readonly Initialize Instance = new ();
private Initialize() { }
}

public sealed class Initialized
{
public static readonly Initialized Instance = new ();
private Initialized() { }
}

public class TestActor: ReceivePersistentActor
{
private readonly IHostApplicationLifetime _applicationLifetime;
private readonly ILoggingAdapter _log;
private int _currentIndex;
private byte[]? _payload;

public TestActor(string persistenceId, int payloadSize, IHostApplicationLifetime applicationLifetime)
{
PersistenceId = persistenceId;
_applicationLifetime = applicationLifetime;

_log = Context.GetLogger();

Recover<SnapshotOffer>(offer => _payload = (byte[])offer.Snapshot);
Recover<byte[]>(
bytes =>
{
_payload = bytes;
_currentIndex++;
});
Recover<RecoveryCompleted>(_ =>
{
_log.Info("Recovery Completed");
if (_payload == null)
{
var rnd = new Random();
_payload = new byte[payloadSize];
for (var i = 0; i < payloadSize; i++)
{
_payload[i] = (byte)rnd.Next(0, 255);
}
}
});
Command<Initialize>(_ => Sender.Tell(Initialized.Instance, Self));
Command<SaveEvent>(_ =>
{
Persist(_payload,
_ =>
{
_currentIndex++;
if (_currentIndex % 10 == 0 || _currentIndex > 10)
{
SaveSnapshot(_payload);
}
});
});
Command<SaveSnapshotSuccess>(
evt =>
{
DeleteMessages(evt.Metadata.SequenceNr - 1);
});
Command<SaveSnapshotFailure>(
fail =>
{
_log.Error(fail.Cause, "Failed to save snapshot");
// _applicationLifetime.StopApplication();
});
Command<DeleteMessagesSuccess>(
_ =>
{
// no-op
// log.Info("Messages deleted");
});
Command<DeleteMessagesFailure>(
fail =>
{
_log.Error(fail.Cause, "Failed to delete messages");
// _applicationLifetime.StopApplication();
});
}

public override string PersistenceId { get; }

/*
protected override void OnPersistFailure(Exception cause, object @event, long sequenceNr)
{
base.OnPersistFailure(cause, @event, sequenceNr);
if(cause is not TimeoutException)
_applicationLifetime.StopApplication();
}

protected override void OnPersistRejected(Exception cause, object @event, long sequenceNr)
{
base.OnPersistRejected(cause, @event, sequenceNr);
_applicationLifetime.StopApplication();
}
*/
}
Loading