From 2a4ba06be6805aac5fcbb8bed108d16469520b13 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 10 Feb 2023 21:29:18 +0700 Subject: [PATCH 1/4] Merge branch 'dev' into add_batched_migration_script # Conflicts: # Akka.Persistence.Sql.sln # scripts/PostgreSql/1_Migration_Setup.sql --- scripts/MySql/Batched/1_Migration_Setup.sql | 91 +++++++++++++++++++ .../PostgreSql/Batched/1_Migration_Setup.sql | 55 +++++++++++ scripts/SqlServer/2_Migration.sql | 6 +- 3 files changed, 149 insertions(+), 3 deletions(-) create mode 100644 scripts/MySql/Batched/1_Migration_Setup.sql create mode 100644 scripts/PostgreSql/Batched/1_Migration_Setup.sql diff --git a/scripts/MySql/Batched/1_Migration_Setup.sql b/scripts/MySql/Batched/1_Migration_Setup.sql new file mode 100644 index 00000000..26aaceb8 --- /dev/null +++ b/scripts/MySql/Batched/1_Migration_Setup.sql @@ -0,0 +1,91 @@ +CREATE TABLE IF NOT EXISTS tags( + ordering_id BIGINT NOT NULL, + tag NVARCHAR(64) NOT NULL, + sequence_nr BIGINT NOT NULL, + persistence_id VARCHAR(255), + PRIMARY KEY (ordering_id, tag, persistence_id) +); + +DROP PROCEDURE IF EXISTS Split; + +DELIMITER ?? +CREATE PROCEDURE Split(IN fromId INT, IN toId INT) +BEGIN + + DECLARE v_cursor_done TINYINT UNSIGNED DEFAULT 0; + DECLARE Id INT UNSIGNED; + DECLARE String VARCHAR(8000); + DECLARE PId VARCHAR(255); + DECLARE SeqNr INT UNSIGNED; + DECLARE idx INT UNSIGNED; + DECLARE slice VARCHAR(8000); + + DECLARE v_cursor CURSOR FOR + SELECT ej.`ordering`, ej.tags + FROM event_journal ej + WHERE ej.`ordering` >= fromId AND ej.`ordering` <= toId + ORDER BY ej.`ordering`; + DECLARE CONTINUE HANDLER FOR NOT FOUND + SET v_cursor_done = 1; + + OPEN v_cursor; + REPEAT + FETCH v_cursor INTO Id, String, SeqNr, PId; + SET idx = 1; + + IF String IS NULL OR LENGTH(String) < 1 THEN + SET idx = 0; + END IF; + + WHILE idx != 0 DO + SET idx = LOCATE(';', String); + IF idx != 0 THEN + SET slice = LEFT(String, idx - 1); + ELSE + SET slice = String; + END IF; + + IF LENGTH(slice) > 0 THEN + INSERT IGNORE INTO tags (ordering_id, tag, sequence_nr, persistence_id) VALUES (Id, slice, SeqNr, PId); + END IF; + + SET String = RIGHT(String, LENGTH(String) - idx); + + IF LENGTH(String) = 0 THEN + SET idx = 0; + END IF; + END WHILE; + UNTIL v_cursor_done END REPEAT; + + CLOSE v_cursor; + +END?? + +CREATE PROCEDURE BatchedMigration(IN fromId BIGINT) +BEGIN + DECLARE maxId BIGINT UNSIGNED; + DECLARE oldCommitValue TINYINT DEFAULT @@autocommit; + + SELECT maxId = MAX(ej.`ordering`) + FROM event_journal ej + WHERE + ej.`tags` IS NOT null + AND LENGTH(ej.`tags`) > 0 + AND ej.`ordering` NOT IN (SELECT t.`ordering_id` FROM tags t); + + loopLabel: LOOP + IF fromId > maxId THEN + LEAVE loopLabel; + END IF; + + SET autocommit = 0; + START TRANSACTION; + CALL Split(fromId, fromId + 1000); + COMMIT; + SET autocommit = oldCommitValue; + + SET fromId = fromId + 1000; + END LOOP; +END ?? + +DELIMITER ; \ No newline at end of file diff --git a/scripts/PostgreSql/Batched/1_Migration_Setup.sql b/scripts/PostgreSql/Batched/1_Migration_Setup.sql new file mode 100644 index 00000000..8f0d231e --- /dev/null +++ b/scripts/PostgreSql/Batched/1_Migration_Setup.sql @@ -0,0 +1,55 @@ +CREATE TABLE IF NOT EXISTS "public"."tags"( + ordering_id BIGINT NOT NULL, + tag VARCHAR(64) NOT NULL, + sequence_nr BIGINT NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + PRIMARY KEY (ordering_id, tag) +); + +CREATE OR REPLACE PROCEDURE "public"."Split"(id bigint, tags varchar(8000), seq_nr bigint, pid varchar(255)) AS $$ +DECLARE var_t record; +BEGIN + FOR var_t IN(SELECT unnest(string_to_array(tags, ';')) AS t) + LOOP + CONTINUE WHEN var_t.t IS NULL OR var_t.t = ''; + INSERT INTO "public"."tags" (ordering_id, tag, sequence_nr, persistence_id) + VALUES (id, var_t.t, seq_nr, pid) + ON CONFLICT DO NOTHING; + END LOOP; +END +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE PROCEDURE "public"."Normalize"(IN fromId BIGINT, IN toId BIGINT) AS $$ +DECLARE var_r record; +BEGIN + FOR var_r IN( + SELECT ej."ordering" AS id, ej."tags", ej."sequence_nr" as seq_nr, ej."persistence_id" AS pid + FROM "public"."event_journal" AS ej + WHERE ej.ordering >= fromId AND ej.ordering <= toId + ORDER BY "ordering") + LOOP + CALL "public"."Split"(var_r.id, var_r.tags, var_r.seq_nr, var_r.pid); + END LOOP; +END +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE PROCEDURE "public"."BatchedMigration"(IN from_id BIGINT) AS $$ +DECLARE max_id BIGINT; +BEGIN + max_id := (SELECT MAX(ej."ordering") + FROM event_journal ej + WHERE + ej."tags" IS NOT NULL + AND LENGTH(ej."tags") > 0 + AND ej."ordering" NOT IN (SELECT t."ordering_id" FROM "public"."tags" t)); + + LOOP + EXIT WHEN from_id > max_id; + + CALL "public"."Normalize"(from_id, from_id + 1000); + COMMIT; + + from_id := from_id + 1000; + END LOOP; +END; +$$ LANGUAGE plpgsql; \ No newline at end of file diff --git a/scripts/SqlServer/2_Migration.sql b/scripts/SqlServer/2_Migration.sql index 55fc033b..86d9052a 100644 --- a/scripts/SqlServer/2_Migration.sql +++ b/scripts/SqlServer/2_Migration.sql @@ -1,8 +1,8 @@ INSERT INTO [dbo].[tags]([ordering_id], [tag], [sequence_nr], [persistence_id]) SELECT * FROM ( - SELECT a.[Ordering], b.[items], a.SequenceNr, a.PersistenceId FROM - [dbo].[EventJournal] AS a - CROSS APPLY [dbo].[Split](a.Tags, ';') b + SELECT records.[Ordering], cross_product.[items], records.SequenceNr, records.PersistenceId FROM + [dbo].[EventJournal] AS records + CROSS APPLY [dbo].[Split](records.Tags, ';') cross_product ) AS s([ordering_id], [tag], [sequence_nr], [persistence_id]) WHERE NOT EXISTS ( SELECT * FROM [dbo].[tags] t WITH (updlock) From 364f4a0bf1c008e07df0f6bcc928e5940b889652 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 10 Feb 2023 21:29:51 +0700 Subject: [PATCH 2/4] Implement TagTable changes to migration script (cherry-picked from 4d9ef437db8011ceb161f269283d9042c3f18eaa) --- .../SqlServer/Batched/1_Migration_Setup.sql | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 scripts/SqlServer/Batched/1_Migration_Setup.sql diff --git a/scripts/SqlServer/Batched/1_Migration_Setup.sql b/scripts/SqlServer/Batched/1_Migration_Setup.sql new file mode 100644 index 00000000..da7983ae --- /dev/null +++ b/scripts/SqlServer/Batched/1_Migration_Setup.sql @@ -0,0 +1,71 @@ +IF NOT EXISTS(SELECT * FROM INFORMATION_SCHEMA.TABLES + WHERE TABLE_SCHEMA = N'dbo' AND TABLE_NAME = N'tags') +BEGIN + CREATE TABLE [dbo].[tags]( + ordering_id BIGINT NOT NULL, + tag NVARCHAR(64) NOT NULL, + sequence_nr BIGINT NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + PRIMARY KEY (ordering_id, tag, persistence_id) + ); +END +GO + +CREATE OR ALTER FUNCTION [dbo].[Split](@String VARCHAR(8000), @Delimiter CHAR(1)) + RETURNS @temptable TABLE (items VARCHAR(8000)) AS +BEGIN + DECLARE @idx INT + DECLARE @slice VARCHAR(8000) + + SELECT @idx = 1 + IF LEN(@String) < 1 OR @String is NULL + RETURN + + WHILE @idx != 0 + BEGIN + SET @idx = CHARINDEX(@Delimiter, @String) + IF @idx != 0 + SET @slice = LEFT(@String,@idx - 1) + ELSE + SET @slice = @String + + IF(LEN(@slice) > 0) + INSERT INTO @temptable(Items) VALUES(@slice) + + SET @String = RIGHT(@String, LEN(@String) - @idx) + IF len(@String) = 0 + BREAK + END + RETURN +END; +GO + +CREATE OR ALTER PROCEDURE [dbo].[BatchedMigration](@from_id BIGINT) AS +BEGIN + DECLARE @max_id BIGINT; + + SELECT @max_id = MAX(ej.[Ordering]) + FROM [dbo].[EventJournal] ej + WHERE + ej.[Tags] IS NOT NULL + AND LEN(ej.[Tags]) > 0 + AND ej.[Ordering] NOT IN (SELECT t.[ordering_id] FROM [dbo].[tags] t); + + WHILE @from_id <= @max_id + BEGIN + BEGIN TRAN; + INSERT INTO [dbo].[tags]([ordering_id], [tag], [sequence_nr], [persistence_id]) + SELECT * FROM ( + SELECT records.[Ordering], cross_product.[items], records.SequenceNr, records.PersistenceId FROM + [dbo].[EventJournal] AS records + CROSS APPLY [dbo].[Split](records.Tags, ';') cross_product + ) AS s([ordering_id], [tag], [sequence_nr], [persistence_id]) + WHERE NOT EXISTS ( + SELECT * FROM [dbo].[tags] t WITH (updlock) + WHERE s.[ordering_id] = t.[ordering_id] AND s.[tag] = t.[tag] + ); + COMMIT TRAN; + + SET @from_id = @from_id + 1000; + END +END; From 46ee07bff5378a418e2899f56c01f04191f18b0c Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 10 Feb 2023 21:48:25 +0700 Subject: [PATCH 3/4] Replace function/procedure names with a more unique name (cherry-picked from 000a68c5452128d49950da0be55f7bc803fbb760) --- scripts/MySql/1_Migration_Setup.sql | 4 +- scripts/MySql/2_Migration.sql | 2 +- scripts/MySql/3_Post_Migration_Cleanup.sql | 2 +- scripts/MySql/Batched/1_Migration_Setup.sql | 8 +- scripts/MySql/Batched/2_Migration.sql | 1 + .../Batched/3_Post_Migration_Cleanup.sql | 2 + scripts/MySql/Batched/DROP_migration.sql | 3 + scripts/MySql/DROP_migration.sql | 2 +- scripts/PostgreSql/1_Migration_Setup.sql | 10 +- scripts/PostgreSql/2_Migration.sql | 2 +- .../PostgreSql/3_Post_Migration_Cleanup.sql | 4 +- .../PostgreSql/Batched/1_Migration_Setup.sql | 10 +- scripts/PostgreSql/Batched/2_Migration.sql | 1 + .../Batched/3_Post_Migration_Cleanup.sql | 3 + scripts/PostgreSql/Batched/DROP_migration.sql | 4 + scripts/PostgreSql/DROP_migration.sql | 4 +- scripts/SqlServer/1_Migration_Setup.sql | 2 +- scripts/SqlServer/2_Migration.sql | 2 +- .../SqlServer/3_Post_Migration_Cleanup.sql | 2 +- .../SqlServer/Batched/1_Migration_Setup.sql | 6 +- scripts/SqlServer/Batched/2_Migration.sql | 1 + .../Batched/3_Post_Migration_Cleanup.sql | 2 + scripts/SqlServer/Batched/DROP_Migration.sql | 3 + scripts/SqlServer/DROP_Migration.sql | 2 +- .../Akka.Persistence.Linq2Db.HelperLib.csproj | 18 +++ .../JournalIndexHelper.cs | 49 ++++++ .../TagTableMigrator.cs | 143 ++++++++++++++++++ 27 files changed, 261 insertions(+), 31 deletions(-) create mode 100644 scripts/MySql/Batched/2_Migration.sql create mode 100644 scripts/MySql/Batched/3_Post_Migration_Cleanup.sql create mode 100644 scripts/MySql/Batched/DROP_migration.sql create mode 100644 scripts/PostgreSql/Batched/2_Migration.sql create mode 100644 scripts/PostgreSql/Batched/3_Post_Migration_Cleanup.sql create mode 100644 scripts/PostgreSql/Batched/DROP_migration.sql create mode 100644 scripts/SqlServer/Batched/2_Migration.sql create mode 100644 scripts/SqlServer/Batched/3_Post_Migration_Cleanup.sql create mode 100644 scripts/SqlServer/Batched/DROP_Migration.sql create mode 100644 src/Akka.Persistence.Linq2Db.HelperLib/Akka.Persistence.Linq2Db.HelperLib.csproj create mode 100644 src/Akka.Persistence.Linq2Db.HelperLib/JournalIndexHelper.cs create mode 100644 src/Akka.Persistence.Linq2Db.HelperLib/TagTableMigrator.cs diff --git a/scripts/MySql/1_Migration_Setup.sql b/scripts/MySql/1_Migration_Setup.sql index 20416b80..ec3af2c4 100644 --- a/scripts/MySql/1_Migration_Setup.sql +++ b/scripts/MySql/1_Migration_Setup.sql @@ -6,10 +6,10 @@ CREATE TABLE IF NOT EXISTS tags( PRIMARY KEY (ordering_id, tag, persistence_id) ); -DROP PROCEDURE IF EXISTS Split; +DROP PROCEDURE IF EXISTS AkkaMigration_Split; DELIMITER ?? -CREATE PROCEDURE Split() +CREATE PROCEDURE AkkaMigration_Split() BEGIN DECLARE v_cursor_done TINYINT UNSIGNED DEFAULT 0; diff --git a/scripts/MySql/2_Migration.sql b/scripts/MySql/2_Migration.sql index 56ca2d4e..f9ef335b 100644 --- a/scripts/MySql/2_Migration.sql +++ b/scripts/MySql/2_Migration.sql @@ -1 +1 @@ -CALL Split(); \ No newline at end of file +CALL AkkaMigration_Split(); \ No newline at end of file diff --git a/scripts/MySql/3_Post_Migration_Cleanup.sql b/scripts/MySql/3_Post_Migration_Cleanup.sql index 74cc5203..58337716 100644 --- a/scripts/MySql/3_Post_Migration_Cleanup.sql +++ b/scripts/MySql/3_Post_Migration_Cleanup.sql @@ -1 +1 @@ -DROP PROCEDURE IF EXISTS Split; +DROP PROCEDURE IF EXISTS AkkaMigration_Split; diff --git a/scripts/MySql/Batched/1_Migration_Setup.sql b/scripts/MySql/Batched/1_Migration_Setup.sql index 26aaceb8..453a486c 100644 --- a/scripts/MySql/Batched/1_Migration_Setup.sql +++ b/scripts/MySql/Batched/1_Migration_Setup.sql @@ -6,10 +6,10 @@ CREATE TABLE IF NOT EXISTS tags( PRIMARY KEY (ordering_id, tag, persistence_id) ); -DROP PROCEDURE IF EXISTS Split; +DROP PROCEDURE IF EXISTS AkkaMigration_Split; DELIMITER ?? -CREATE PROCEDURE Split(IN fromId INT, IN toId INT) +CREATE PROCEDURE AkkaMigration_Split(IN fromId INT, IN toId INT) BEGIN DECLARE v_cursor_done TINYINT UNSIGNED DEFAULT 0; @@ -61,7 +61,7 @@ BEGIN END?? -CREATE PROCEDURE BatchedMigration(IN fromId BIGINT) +CREATE PROCEDURE AkkaMigration_BatchedMigration(IN fromId BIGINT) BEGIN DECLARE maxId BIGINT UNSIGNED; DECLARE oldCommitValue TINYINT DEFAULT @@autocommit; @@ -80,7 +80,7 @@ BEGIN SET autocommit = 0; START TRANSACTION; - CALL Split(fromId, fromId + 1000); + CALL AkkaMigration_Split(fromId, fromId + 1000); COMMIT; SET autocommit = oldCommitValue; diff --git a/scripts/MySql/Batched/2_Migration.sql b/scripts/MySql/Batched/2_Migration.sql new file mode 100644 index 00000000..7857f995 --- /dev/null +++ b/scripts/MySql/Batched/2_Migration.sql @@ -0,0 +1 @@ +CALL AkkaMigration_BatchedMigration(0); \ No newline at end of file diff --git a/scripts/MySql/Batched/3_Post_Migration_Cleanup.sql b/scripts/MySql/Batched/3_Post_Migration_Cleanup.sql new file mode 100644 index 00000000..3df6af9a --- /dev/null +++ b/scripts/MySql/Batched/3_Post_Migration_Cleanup.sql @@ -0,0 +1,2 @@ +DROP PROCEDURE IF EXISTS AkkaMigration_Split; +DROP PROCEDURE IF EXISTS AkkaMigration_BatchedMigration; \ No newline at end of file diff --git a/scripts/MySql/Batched/DROP_migration.sql b/scripts/MySql/Batched/DROP_migration.sql new file mode 100644 index 00000000..17e2faf0 --- /dev/null +++ b/scripts/MySql/Batched/DROP_migration.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS TagTable; +DROP PROCEDURE IF EXISTS AkkaMigration_Split; +DROP PROCEDURE IF EXISTS AkkaMigration_BatchedMigration; \ No newline at end of file diff --git a/scripts/MySql/DROP_migration.sql b/scripts/MySql/DROP_migration.sql index 7fd4c24e..d4ebd04c 100644 --- a/scripts/MySql/DROP_migration.sql +++ b/scripts/MySql/DROP_migration.sql @@ -1,2 +1,2 @@ DROP TABLE IF EXISTS TagTable; -DROP PROCEDURE IF EXISTS Split; +DROP PROCEDURE IF EXISTS AkkaMigration_Split; diff --git a/scripts/PostgreSql/1_Migration_Setup.sql b/scripts/PostgreSql/1_Migration_Setup.sql index ee6096c9..8fbe76a6 100644 --- a/scripts/PostgreSql/1_Migration_Setup.sql +++ b/scripts/PostgreSql/1_Migration_Setup.sql @@ -6,7 +6,7 @@ CREATE TABLE IF NOT EXISTS "public"."tags"( PRIMARY KEY (ordering_id, tag, persistence_id) ); -CREATE OR REPLACE PROCEDURE "public"."Split"(id bigint, tags varchar(8000), seq_nr bigint, pid varchar(255)) AS $$ +CREATE OR REPLACE PROCEDURE "public"."AkkaMigration_Split"(id bigint, tags varchar(8000), seq_nr bigint, pid varchar(255)) AS $$ DECLARE var_t record; BEGIN FOR var_t IN(SELECT unnest(string_to_array(tags, ';')) AS t) @@ -19,12 +19,12 @@ BEGIN END $$ LANGUAGE plpgsql; -CREATE OR REPLACE PROCEDURE "public"."Normalize"() AS $$ +CREATE OR REPLACE PROCEDURE "public"."AkkaMigration_Normalize"() AS $$ DECLARE var_r record; BEGIN - FOR var_r IN(SELECT ej."ordering" AS id, ej."tags", ej."sequence_nr" as seq_nr, ej."persistence_id" AS pid FROM "public"."event_journal" AS ej ORDER BY "ordering") - LOOP - CALL "public"."Split"(var_r.id, var_r.tags, var_r.seq_nr, var_r.pid); + FOR var_r IN(SELECT ej."ordering" AS id, ej."tags", ej."sequence_nr" as seq_nr, ej."persistence_id" AS pid FROM "public"."event_journal" AS ej ORDER BY "ordering") + LOOP + CALL "public"."AkkaMigration_Split"(var_r.id, var_r.tags, var_r.seq_nr, var_r.pid); END LOOP; END $$ LANGUAGE plpgsql; diff --git a/scripts/PostgreSql/2_Migration.sql b/scripts/PostgreSql/2_Migration.sql index 64949092..20850ce8 100644 --- a/scripts/PostgreSql/2_Migration.sql +++ b/scripts/PostgreSql/2_Migration.sql @@ -1 +1 @@ -CALL "public"."Normalize"(); \ No newline at end of file +CALL "public"."AkkaMigration_Normalize"(); \ No newline at end of file diff --git a/scripts/PostgreSql/3_Post_Migration_Cleanup.sql b/scripts/PostgreSql/3_Post_Migration_Cleanup.sql index e6b2f6f2..77f5f5e7 100644 --- a/scripts/PostgreSql/3_Post_Migration_Cleanup.sql +++ b/scripts/PostgreSql/3_Post_Migration_Cleanup.sql @@ -1,2 +1,2 @@ -DROP PROCEDURE IF EXISTS "public"."Split"; -DROP PROCEDURE IF EXISTS "public"."Normalize"; \ No newline at end of file +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Split"; +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Normalize"; \ No newline at end of file diff --git a/scripts/PostgreSql/Batched/1_Migration_Setup.sql b/scripts/PostgreSql/Batched/1_Migration_Setup.sql index 8f0d231e..99c6bf0e 100644 --- a/scripts/PostgreSql/Batched/1_Migration_Setup.sql +++ b/scripts/PostgreSql/Batched/1_Migration_Setup.sql @@ -6,7 +6,7 @@ CREATE TABLE IF NOT EXISTS "public"."tags"( PRIMARY KEY (ordering_id, tag) ); -CREATE OR REPLACE PROCEDURE "public"."Split"(id bigint, tags varchar(8000), seq_nr bigint, pid varchar(255)) AS $$ +CREATE OR REPLACE PROCEDURE "public"."AkkaMigration_Split"(id bigint, tags varchar(8000), seq_nr bigint, pid varchar(255)) AS $$ DECLARE var_t record; BEGIN FOR var_t IN(SELECT unnest(string_to_array(tags, ';')) AS t) @@ -19,7 +19,7 @@ BEGIN END $$ LANGUAGE plpgsql; -CREATE OR REPLACE PROCEDURE "public"."Normalize"(IN fromId BIGINT, IN toId BIGINT) AS $$ +CREATE OR REPLACE PROCEDURE "public"."AkkaMigration_Normalize"(IN fromId BIGINT, IN toId BIGINT) AS $$ DECLARE var_r record; BEGIN FOR var_r IN( @@ -28,12 +28,12 @@ BEGIN WHERE ej.ordering >= fromId AND ej.ordering <= toId ORDER BY "ordering") LOOP - CALL "public"."Split"(var_r.id, var_r.tags, var_r.seq_nr, var_r.pid); + CALL "public"."AkkaMigration_Split"(var_r.id, var_r.tags, var_r.seq_nr, var_r.pid); END LOOP; END $$ LANGUAGE plpgsql; -CREATE OR REPLACE PROCEDURE "public"."BatchedMigration"(IN from_id BIGINT) AS $$ +CREATE OR REPLACE PROCEDURE "public"."AkkaMigration_BatchedMigration"(IN from_id BIGINT) AS $$ DECLARE max_id BIGINT; BEGIN max_id := (SELECT MAX(ej."ordering") @@ -46,7 +46,7 @@ BEGIN LOOP EXIT WHEN from_id > max_id; - CALL "public"."Normalize"(from_id, from_id + 1000); + CALL "public"."AkkaMigration_Normalize"(from_id, from_id + 1000); COMMIT; from_id := from_id + 1000; diff --git a/scripts/PostgreSql/Batched/2_Migration.sql b/scripts/PostgreSql/Batched/2_Migration.sql new file mode 100644 index 00000000..5da5d3f7 --- /dev/null +++ b/scripts/PostgreSql/Batched/2_Migration.sql @@ -0,0 +1 @@ +CALL "public"."AkkaMigration_BatchedMigration"(0); \ No newline at end of file diff --git a/scripts/PostgreSql/Batched/3_Post_Migration_Cleanup.sql b/scripts/PostgreSql/Batched/3_Post_Migration_Cleanup.sql new file mode 100644 index 00000000..5a02e65e --- /dev/null +++ b/scripts/PostgreSql/Batched/3_Post_Migration_Cleanup.sql @@ -0,0 +1,3 @@ +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Split"; +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Normalize"; +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_BatchedMigration"; \ No newline at end of file diff --git a/scripts/PostgreSql/Batched/DROP_migration.sql b/scripts/PostgreSql/Batched/DROP_migration.sql new file mode 100644 index 00000000..d5c7549f --- /dev/null +++ b/scripts/PostgreSql/Batched/DROP_migration.sql @@ -0,0 +1,4 @@ +DROP TABLE IF EXISTS "public"."TagTable"; +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Split"; +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Normalize"; +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_BatchedMigration"; \ No newline at end of file diff --git a/scripts/PostgreSql/DROP_migration.sql b/scripts/PostgreSql/DROP_migration.sql index e7c3046b..47ce4af0 100644 --- a/scripts/PostgreSql/DROP_migration.sql +++ b/scripts/PostgreSql/DROP_migration.sql @@ -1,3 +1,3 @@ DROP TABLE IF EXISTS "public"."TagTable"; -DROP PROCEDURE IF EXISTS "public"."Split"; -DROP PROCEDURE IF EXISTS "public"."Normalize"; +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Split"; +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Normalize"; diff --git a/scripts/SqlServer/1_Migration_Setup.sql b/scripts/SqlServer/1_Migration_Setup.sql index 64f97e9f..20eecc6e 100644 --- a/scripts/SqlServer/1_Migration_Setup.sql +++ b/scripts/SqlServer/1_Migration_Setup.sql @@ -11,7 +11,7 @@ BEGIN END GO -CREATE OR ALTER FUNCTION [dbo].[Split](@String VARCHAR(8000), @Delimiter CHAR(1)) +CREATE OR ALTER FUNCTION [dbo].[AkkaMigration_Split](@String VARCHAR(8000), @Delimiter CHAR(1)) RETURNS @temptable TABLE (items VARCHAR(8000)) AS BEGIN DECLARE @idx INT diff --git a/scripts/SqlServer/2_Migration.sql b/scripts/SqlServer/2_Migration.sql index 86d9052a..550f4e9a 100644 --- a/scripts/SqlServer/2_Migration.sql +++ b/scripts/SqlServer/2_Migration.sql @@ -2,7 +2,7 @@ INSERT INTO [dbo].[tags]([ordering_id], [tag], [sequence_nr], [persistence_id]) SELECT * FROM ( SELECT records.[Ordering], cross_product.[items], records.SequenceNr, records.PersistenceId FROM [dbo].[EventJournal] AS records - CROSS APPLY [dbo].[Split](records.Tags, ';') cross_product + CROSS APPLY [dbo].[AkkaMigration_Split](records.Tags, ';') cross_product ) AS s([ordering_id], [tag], [sequence_nr], [persistence_id]) WHERE NOT EXISTS ( SELECT * FROM [dbo].[tags] t WITH (updlock) diff --git a/scripts/SqlServer/3_Post_Migration_Cleanup.sql b/scripts/SqlServer/3_Post_Migration_Cleanup.sql index b46a9195..575e5cc3 100644 --- a/scripts/SqlServer/3_Post_Migration_Cleanup.sql +++ b/scripts/SqlServer/3_Post_Migration_Cleanup.sql @@ -1 +1 @@ -DROP FUNCTION IF EXISTS [dbo].[Split]; \ No newline at end of file +DROP FUNCTION IF EXISTS [dbo].[AkkaMigration_Split]; \ No newline at end of file diff --git a/scripts/SqlServer/Batched/1_Migration_Setup.sql b/scripts/SqlServer/Batched/1_Migration_Setup.sql index da7983ae..c6394bb0 100644 --- a/scripts/SqlServer/Batched/1_Migration_Setup.sql +++ b/scripts/SqlServer/Batched/1_Migration_Setup.sql @@ -11,7 +11,7 @@ BEGIN END GO -CREATE OR ALTER FUNCTION [dbo].[Split](@String VARCHAR(8000), @Delimiter CHAR(1)) +CREATE OR ALTER FUNCTION [dbo].[AkkaMigration_Split](@String VARCHAR(8000), @Delimiter CHAR(1)) RETURNS @temptable TABLE (items VARCHAR(8000)) AS BEGIN DECLARE @idx INT @@ -40,7 +40,7 @@ BEGIN END; GO -CREATE OR ALTER PROCEDURE [dbo].[BatchedMigration](@from_id BIGINT) AS +CREATE OR ALTER PROCEDURE [dbo].[AkkaMigration_BatchedMigration](@from_id BIGINT) AS BEGIN DECLARE @max_id BIGINT; @@ -58,7 +58,7 @@ BEGIN SELECT * FROM ( SELECT records.[Ordering], cross_product.[items], records.SequenceNr, records.PersistenceId FROM [dbo].[EventJournal] AS records - CROSS APPLY [dbo].[Split](records.Tags, ';') cross_product + CROSS APPLY [dbo].[AkkaMigration_Split](records.Tags, ';') cross_product ) AS s([ordering_id], [tag], [sequence_nr], [persistence_id]) WHERE NOT EXISTS ( SELECT * FROM [dbo].[tags] t WITH (updlock) diff --git a/scripts/SqlServer/Batched/2_Migration.sql b/scripts/SqlServer/Batched/2_Migration.sql new file mode 100644 index 00000000..2b04066c --- /dev/null +++ b/scripts/SqlServer/Batched/2_Migration.sql @@ -0,0 +1 @@ +EXEC [dbo].[AkkaMigration_BatchedMigration] @from_id = 0; \ No newline at end of file diff --git a/scripts/SqlServer/Batched/3_Post_Migration_Cleanup.sql b/scripts/SqlServer/Batched/3_Post_Migration_Cleanup.sql new file mode 100644 index 00000000..8b2a02b3 --- /dev/null +++ b/scripts/SqlServer/Batched/3_Post_Migration_Cleanup.sql @@ -0,0 +1,2 @@ +DROP FUNCTION IF EXISTS [dbo].[AkkaMigration_Split]; +DROP PROCEDURE IF EXISTS [dbo].[AkkaMigration_BatchedMigration]; \ No newline at end of file diff --git a/scripts/SqlServer/Batched/DROP_Migration.sql b/scripts/SqlServer/Batched/DROP_Migration.sql new file mode 100644 index 00000000..2d6f23ff --- /dev/null +++ b/scripts/SqlServer/Batched/DROP_Migration.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS [dbo].[TagTable]; +DROP FUNCTION IF EXISTS [dbo].[AkkaMigration_Split]; +DROP PROCEDURE IF EXISTS [dbo].[AkkaMigration_BatchedMigration]; \ No newline at end of file diff --git a/scripts/SqlServer/DROP_Migration.sql b/scripts/SqlServer/DROP_Migration.sql index e3cf6a35..3b89c8a7 100644 --- a/scripts/SqlServer/DROP_Migration.sql +++ b/scripts/SqlServer/DROP_Migration.sql @@ -1,2 +1,2 @@ DROP TABLE IF EXISTS [dbo].[TagTable]; -DROP FUNCTION IF EXISTS [dbo].[Split]; \ No newline at end of file +DROP FUNCTION IF EXISTS [dbo].[AkkaMigration_Split]; \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.HelperLib/Akka.Persistence.Linq2Db.HelperLib.csproj b/src/Akka.Persistence.Linq2Db.HelperLib/Akka.Persistence.Linq2Db.HelperLib.csproj new file mode 100644 index 00000000..1a873e96 --- /dev/null +++ b/src/Akka.Persistence.Linq2Db.HelperLib/Akka.Persistence.Linq2Db.HelperLib.csproj @@ -0,0 +1,18 @@ + + + + + netstandard2.0 + 10 + + + + + + + + + + + + diff --git a/src/Akka.Persistence.Linq2Db.HelperLib/JournalIndexHelper.cs b/src/Akka.Persistence.Linq2Db.HelperLib/JournalIndexHelper.cs new file mode 100644 index 00000000..6db1e14b --- /dev/null +++ b/src/Akka.Persistence.Linq2Db.HelperLib/JournalIndexHelper.cs @@ -0,0 +1,49 @@ +using FluentMigrator.Model; + +namespace Akka.Persistence.Linq2Db.HelperLib +{ + public class JournalIndexHelper + { + public IndexDefinition DefaultJournalIndex(string tableName, string persistenceIdCol, string sequenceNoCol, string schemaName = null) + { + var idx = BeginCreateIndex(tableName, schemaName, $"UX_{tableName}_PID_SEQNO"); + //short name for easy compat with all dbs. (*cough* oracle *cough*) + idx.Columns.Add(new IndexColumnDefinition(){ Name = persistenceIdCol }); + idx.Columns.Add(new IndexColumnDefinition(){Name = sequenceNoCol, Direction = Direction.Ascending}); + idx.IsUnique = true; + return idx; + } + + public IndexDefinition JournalOrdering(string tableName, + string orderingCol, string schemaName = null) + { + var idx = BeginCreateIndex(tableName, schemaName,$"IX_{tableName}_Ordering"); + idx.Columns.Add(new IndexColumnDefinition(){Name = orderingCol}); + //Should it be? + //idx.IsUnique = true; + return idx; + } + + public IndexDefinition JournalTimestamp(string tableName, + string timestampCol, string schemaName = null) + { + var idx = BeginCreateIndex(tableName, schemaName, + $"IX_{tableName}_TimeStamp"); + idx.Columns.Add(new IndexColumnDefinition(){Name = timestampCol}); + //Not unique by any stretch. + return idx; + } + + private static IndexDefinition BeginCreateIndex(string tableName, string schemaName, string indexName) + { + var idx = new IndexDefinition(); + if (string.IsNullOrWhiteSpace(schemaName) == false) + { + idx.SchemaName = schemaName; + } + idx.TableName = tableName; + idx.Name = indexName; + return idx; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.HelperLib/TagTableMigrator.cs b/src/Akka.Persistence.Linq2Db.HelperLib/TagTableMigrator.cs new file mode 100644 index 00000000..d33e051e --- /dev/null +++ b/src/Akka.Persistence.Linq2Db.HelperLib/TagTableMigrator.cs @@ -0,0 +1,143 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2023 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Data; +using System.Linq; +using System.Threading.Tasks; +using Akka.Configuration; +using Akka.Persistence.Sql.Linq2Db; +using Akka.Persistence.Sql.Linq2Db.Config; +using Akka.Persistence.Sql.Linq2Db.Db; +using Akka.Persistence.Sql.Linq2Db.Journal.Types; +using LinqToDB; +using LinqToDB.Data; +using LinqToDB.Tools; + +namespace Akka.Persistence.Linq2Db.HelperLib +{ + public class TagTableMigrator + { + private readonly AkkaPersistenceDataConnectionFactory _connectionFactory; + private readonly JournalConfig _journalConfig; + private readonly string _separator; + + public TagTableMigrator(Config config) + { + config = config + .WithFallback(Linq2DbPersistence.DefaultConfiguration) + .GetConfig("akka.persistence.journal.linq2db"); + + var mapping = config.GetString("table-mapping"); + if(string.IsNullOrWhiteSpace(mapping) || mapping == "default") + throw new ConfigurationException("akka.persistence.journal.linq2db.table-mapping must not be empty or 'default'"); + + _journalConfig = new JournalConfig(config); + if (_journalConfig.TableConfig.TagWriteMode != TagWriteMode.Both) + throw new ConfigurationException("akka.persistence.journal.linq2db.tag-write-mode has to be 'Both'"); + + _connectionFactory = new AkkaPersistenceDataConnectionFactory(_journalConfig); + _separator = _journalConfig.PluginConfig.TagSeparator; + } + + public async Task Migrate(long startOffset, int batchSize, long? endOffset = null) + { + var config = _journalConfig.DaoConfig; + + await using var db = _connectionFactory.GetConnection(); + + // Create the tag table if it doesn't exist + var schemaProvider = db.DataProvider.GetSchemaProvider(); + var dbSchema = schemaProvider.GetSchema(db); + if (dbSchema.Tables.All(t => t.TableName != _journalConfig.TableConfig.TagTable.Name)) + { + await db.CreateTableAsync(); + } + + long maxId; + if (endOffset is null) + { + var jtrQuery = db.GetTable() + .Select(jtr => jtr.OrderingId) + .Distinct(); + + maxId = await db.GetTable() + .Where(r => + r.Tags != null + && r.Tags.Length > 0 + && r.Ordering.NotIn(jtrQuery)) + .Select(r => r.Ordering) + .OrderByDescending(r => r) + .FirstOrDefaultAsync(); + } + else + { + maxId = endOffset.Value; + } + + while (startOffset <= maxId) + { + await using (var tx = await db.BeginTransactionAsync(IsolationLevel.ReadCommitted)) + { + try + { + var offset = startOffset; + var rows = await db.GetTable() + .Where(r => + r.Ordering >= offset + && r.Ordering < offset + batchSize + && r.Tags != null + && r.Tags.Length > 0) + .ToListAsync(); + + var tagList = new List(); + foreach (var row in rows) + { + var tags = row.Tags + .Split(new [] {_separator}, StringSplitOptions.RemoveEmptyEntries) + .Where(s => !string.IsNullOrWhiteSpace(s)); + + tagList.AddRange(tags.Select(tag => new JournalTagRow + { + OrderingId = row.Ordering, + TagValue = tag, + SequenceNumber = row.SequenceNumber, + PersistenceId = row.PersistenceId + })); + } + + await db.GetTable().BulkCopyAsync(new BulkCopyOptions + { + BulkCopyType = BulkCopyType.MultipleRows, + UseParameters = config.PreferParametersOnMultiRowInsert, + MaxBatchSize = config.DbRoundTripTagBatchSize + }, tagList); + + await tx.CommitAsync(); + } + catch (Exception e1) + { + try + { + await tx.RollbackAsync(); + } + catch (Exception e2) + { + throw new AggregateException( + $"Migration failed on offset {startOffset} to {startOffset + batchSize}, Rollback failed.", + e2, e1); + } + throw new Exception( + $"Migration failed on offset {startOffset} to {startOffset + batchSize}, Rollback successful.", + e1); + } + } + startOffset += batchSize; + } + } + } +} \ No newline at end of file From fe7f514fb440614dca15de4f08fb53be1854827f Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 10 Feb 2023 22:29:50 +0700 Subject: [PATCH 4/4] Delete mis-commited files (cherry-picked from fba47172f267f11e7b3baa8cce986da136a75a38) --- .../Akka.Persistence.Linq2Db.HelperLib.csproj | 18 --- .../JournalIndexHelper.cs | 49 ------ .../TagTableMigrator.cs | 143 ------------------ 3 files changed, 210 deletions(-) delete mode 100644 src/Akka.Persistence.Linq2Db.HelperLib/Akka.Persistence.Linq2Db.HelperLib.csproj delete mode 100644 src/Akka.Persistence.Linq2Db.HelperLib/JournalIndexHelper.cs delete mode 100644 src/Akka.Persistence.Linq2Db.HelperLib/TagTableMigrator.cs diff --git a/src/Akka.Persistence.Linq2Db.HelperLib/Akka.Persistence.Linq2Db.HelperLib.csproj b/src/Akka.Persistence.Linq2Db.HelperLib/Akka.Persistence.Linq2Db.HelperLib.csproj deleted file mode 100644 index 1a873e96..00000000 --- a/src/Akka.Persistence.Linq2Db.HelperLib/Akka.Persistence.Linq2Db.HelperLib.csproj +++ /dev/null @@ -1,18 +0,0 @@ - - - - - netstandard2.0 - 10 - - - - - - - - - - - - diff --git a/src/Akka.Persistence.Linq2Db.HelperLib/JournalIndexHelper.cs b/src/Akka.Persistence.Linq2Db.HelperLib/JournalIndexHelper.cs deleted file mode 100644 index 6db1e14b..00000000 --- a/src/Akka.Persistence.Linq2Db.HelperLib/JournalIndexHelper.cs +++ /dev/null @@ -1,49 +0,0 @@ -using FluentMigrator.Model; - -namespace Akka.Persistence.Linq2Db.HelperLib -{ - public class JournalIndexHelper - { - public IndexDefinition DefaultJournalIndex(string tableName, string persistenceIdCol, string sequenceNoCol, string schemaName = null) - { - var idx = BeginCreateIndex(tableName, schemaName, $"UX_{tableName}_PID_SEQNO"); - //short name for easy compat with all dbs. (*cough* oracle *cough*) - idx.Columns.Add(new IndexColumnDefinition(){ Name = persistenceIdCol }); - idx.Columns.Add(new IndexColumnDefinition(){Name = sequenceNoCol, Direction = Direction.Ascending}); - idx.IsUnique = true; - return idx; - } - - public IndexDefinition JournalOrdering(string tableName, - string orderingCol, string schemaName = null) - { - var idx = BeginCreateIndex(tableName, schemaName,$"IX_{tableName}_Ordering"); - idx.Columns.Add(new IndexColumnDefinition(){Name = orderingCol}); - //Should it be? - //idx.IsUnique = true; - return idx; - } - - public IndexDefinition JournalTimestamp(string tableName, - string timestampCol, string schemaName = null) - { - var idx = BeginCreateIndex(tableName, schemaName, - $"IX_{tableName}_TimeStamp"); - idx.Columns.Add(new IndexColumnDefinition(){Name = timestampCol}); - //Not unique by any stretch. - return idx; - } - - private static IndexDefinition BeginCreateIndex(string tableName, string schemaName, string indexName) - { - var idx = new IndexDefinition(); - if (string.IsNullOrWhiteSpace(schemaName) == false) - { - idx.SchemaName = schemaName; - } - idx.TableName = tableName; - idx.Name = indexName; - return idx; - } - } -} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.HelperLib/TagTableMigrator.cs b/src/Akka.Persistence.Linq2Db.HelperLib/TagTableMigrator.cs deleted file mode 100644 index d33e051e..00000000 --- a/src/Akka.Persistence.Linq2Db.HelperLib/TagTableMigrator.cs +++ /dev/null @@ -1,143 +0,0 @@ -// ----------------------------------------------------------------------- -// -// Copyright (C) 2013-2023 .NET Foundation -// -// ----------------------------------------------------------------------- - -using System; -using System.Collections.Generic; -using System.Data; -using System.Linq; -using System.Threading.Tasks; -using Akka.Configuration; -using Akka.Persistence.Sql.Linq2Db; -using Akka.Persistence.Sql.Linq2Db.Config; -using Akka.Persistence.Sql.Linq2Db.Db; -using Akka.Persistence.Sql.Linq2Db.Journal.Types; -using LinqToDB; -using LinqToDB.Data; -using LinqToDB.Tools; - -namespace Akka.Persistence.Linq2Db.HelperLib -{ - public class TagTableMigrator - { - private readonly AkkaPersistenceDataConnectionFactory _connectionFactory; - private readonly JournalConfig _journalConfig; - private readonly string _separator; - - public TagTableMigrator(Config config) - { - config = config - .WithFallback(Linq2DbPersistence.DefaultConfiguration) - .GetConfig("akka.persistence.journal.linq2db"); - - var mapping = config.GetString("table-mapping"); - if(string.IsNullOrWhiteSpace(mapping) || mapping == "default") - throw new ConfigurationException("akka.persistence.journal.linq2db.table-mapping must not be empty or 'default'"); - - _journalConfig = new JournalConfig(config); - if (_journalConfig.TableConfig.TagWriteMode != TagWriteMode.Both) - throw new ConfigurationException("akka.persistence.journal.linq2db.tag-write-mode has to be 'Both'"); - - _connectionFactory = new AkkaPersistenceDataConnectionFactory(_journalConfig); - _separator = _journalConfig.PluginConfig.TagSeparator; - } - - public async Task Migrate(long startOffset, int batchSize, long? endOffset = null) - { - var config = _journalConfig.DaoConfig; - - await using var db = _connectionFactory.GetConnection(); - - // Create the tag table if it doesn't exist - var schemaProvider = db.DataProvider.GetSchemaProvider(); - var dbSchema = schemaProvider.GetSchema(db); - if (dbSchema.Tables.All(t => t.TableName != _journalConfig.TableConfig.TagTable.Name)) - { - await db.CreateTableAsync(); - } - - long maxId; - if (endOffset is null) - { - var jtrQuery = db.GetTable() - .Select(jtr => jtr.OrderingId) - .Distinct(); - - maxId = await db.GetTable() - .Where(r => - r.Tags != null - && r.Tags.Length > 0 - && r.Ordering.NotIn(jtrQuery)) - .Select(r => r.Ordering) - .OrderByDescending(r => r) - .FirstOrDefaultAsync(); - } - else - { - maxId = endOffset.Value; - } - - while (startOffset <= maxId) - { - await using (var tx = await db.BeginTransactionAsync(IsolationLevel.ReadCommitted)) - { - try - { - var offset = startOffset; - var rows = await db.GetTable() - .Where(r => - r.Ordering >= offset - && r.Ordering < offset + batchSize - && r.Tags != null - && r.Tags.Length > 0) - .ToListAsync(); - - var tagList = new List(); - foreach (var row in rows) - { - var tags = row.Tags - .Split(new [] {_separator}, StringSplitOptions.RemoveEmptyEntries) - .Where(s => !string.IsNullOrWhiteSpace(s)); - - tagList.AddRange(tags.Select(tag => new JournalTagRow - { - OrderingId = row.Ordering, - TagValue = tag, - SequenceNumber = row.SequenceNumber, - PersistenceId = row.PersistenceId - })); - } - - await db.GetTable().BulkCopyAsync(new BulkCopyOptions - { - BulkCopyType = BulkCopyType.MultipleRows, - UseParameters = config.PreferParametersOnMultiRowInsert, - MaxBatchSize = config.DbRoundTripTagBatchSize - }, tagList); - - await tx.CommitAsync(); - } - catch (Exception e1) - { - try - { - await tx.RollbackAsync(); - } - catch (Exception e2) - { - throw new AggregateException( - $"Migration failed on offset {startOffset} to {startOffset + batchSize}, Rollback failed.", - e2, e1); - } - throw new Exception( - $"Migration failed on offset {startOffset} to {startOffset + batchSize}, Rollback successful.", - e1); - } - } - startOffset += batchSize; - } - } - } -} \ No newline at end of file