diff --git a/scripts/MySql/1_Migration_Setup.sql b/scripts/MySql/1_Migration_Setup.sql index 20416b80..ec3af2c4 100644 --- a/scripts/MySql/1_Migration_Setup.sql +++ b/scripts/MySql/1_Migration_Setup.sql @@ -6,10 +6,10 @@ CREATE TABLE IF NOT EXISTS tags( PRIMARY KEY (ordering_id, tag, persistence_id) ); -DROP PROCEDURE IF EXISTS Split; +DROP PROCEDURE IF EXISTS AkkaMigration_Split; DELIMITER ?? -CREATE PROCEDURE Split() +CREATE PROCEDURE AkkaMigration_Split() BEGIN DECLARE v_cursor_done TINYINT UNSIGNED DEFAULT 0; diff --git a/scripts/MySql/2_Migration.sql b/scripts/MySql/2_Migration.sql index 56ca2d4e..f9ef335b 100644 --- a/scripts/MySql/2_Migration.sql +++ b/scripts/MySql/2_Migration.sql @@ -1 +1 @@ -CALL Split(); \ No newline at end of file +CALL AkkaMigration_Split(); \ No newline at end of file diff --git a/scripts/MySql/3_Post_Migration_Cleanup.sql b/scripts/MySql/3_Post_Migration_Cleanup.sql index 74cc5203..58337716 100644 --- a/scripts/MySql/3_Post_Migration_Cleanup.sql +++ b/scripts/MySql/3_Post_Migration_Cleanup.sql @@ -1 +1 @@ -DROP PROCEDURE IF EXISTS Split; +DROP PROCEDURE IF EXISTS AkkaMigration_Split; diff --git a/scripts/MySql/Batched/1_Migration_Setup.sql b/scripts/MySql/Batched/1_Migration_Setup.sql new file mode 100644 index 00000000..453a486c --- /dev/null +++ b/scripts/MySql/Batched/1_Migration_Setup.sql @@ -0,0 +1,91 @@ +CREATE TABLE IF NOT EXISTS tags( + ordering_id BIGINT NOT NULL, + tag NVARCHAR(64) NOT NULL, + sequence_nr BIGINT NOT NULL, + persistence_id VARCHAR(255), + PRIMARY KEY (ordering_id, tag, persistence_id) +); + +DROP PROCEDURE IF EXISTS AkkaMigration_Split; + +DELIMITER ?? +CREATE PROCEDURE AkkaMigration_Split(IN fromId INT, IN toId INT) +BEGIN + + DECLARE v_cursor_done TINYINT UNSIGNED DEFAULT 0; + DECLARE Id INT UNSIGNED; + DECLARE String VARCHAR(8000); + DECLARE PId VARCHAR(255); + DECLARE SeqNr INT UNSIGNED; + DECLARE idx INT UNSIGNED; + DECLARE slice VARCHAR(8000); + + DECLARE v_cursor CURSOR FOR + SELECT ej.`ordering`, ej.tags + FROM event_journal ej + WHERE ej.`ordering` >= fromId AND ej.`ordering` <= toId + ORDER BY ej.`ordering`; + DECLARE CONTINUE HANDLER FOR NOT FOUND + SET v_cursor_done = 1; + + OPEN v_cursor; + REPEAT + FETCH v_cursor INTO Id, String, SeqNr, PId; + SET idx = 1; + + IF String IS NULL OR LENGTH(String) < 1 THEN + SET idx = 0; + END IF; + + WHILE idx != 0 DO + SET idx = LOCATE(';', String); + IF idx != 0 THEN + SET slice = LEFT(String, idx - 1); + ELSE + SET slice = String; + END IF; + + IF LENGTH(slice) > 0 THEN + INSERT IGNORE INTO tags (ordering_id, tag, sequence_nr, persistence_id) VALUES (Id, slice, SeqNr, PId); + END IF; + + SET String = RIGHT(String, LENGTH(String) - idx); + + IF LENGTH(String) = 0 THEN + SET idx = 0; + END IF; + END WHILE; + UNTIL v_cursor_done END REPEAT; + + CLOSE v_cursor; + +END?? + +CREATE PROCEDURE AkkaMigration_BatchedMigration(IN fromId BIGINT) +BEGIN + DECLARE maxId BIGINT UNSIGNED; + DECLARE oldCommitValue TINYINT DEFAULT @@autocommit; + + SELECT maxId = MAX(ej.`ordering`) + FROM event_journal ej + WHERE + ej.`tags` IS NOT null + AND LENGTH(ej.`tags`) > 0 + AND ej.`ordering` NOT IN (SELECT t.`ordering_id` FROM tags t); + + loopLabel: LOOP + IF fromId > maxId THEN + LEAVE loopLabel; + END IF; + + SET autocommit = 0; + START TRANSACTION; + CALL AkkaMigration_Split(fromId, fromId + 1000); + COMMIT; + SET autocommit = oldCommitValue; + + SET fromId = fromId + 1000; + END LOOP; +END ?? + +DELIMITER ; \ No newline at end of file diff --git a/scripts/MySql/Batched/2_Migration.sql b/scripts/MySql/Batched/2_Migration.sql new file mode 100644 index 00000000..7857f995 --- /dev/null +++ b/scripts/MySql/Batched/2_Migration.sql @@ -0,0 +1 @@ +CALL AkkaMigration_BatchedMigration(0); \ No newline at end of file diff --git a/scripts/MySql/Batched/3_Post_Migration_Cleanup.sql b/scripts/MySql/Batched/3_Post_Migration_Cleanup.sql new file mode 100644 index 00000000..3df6af9a --- /dev/null +++ b/scripts/MySql/Batched/3_Post_Migration_Cleanup.sql @@ -0,0 +1,2 @@ +DROP PROCEDURE IF EXISTS AkkaMigration_Split; +DROP PROCEDURE IF EXISTS AkkaMigration_BatchedMigration; \ No newline at end of file diff --git a/scripts/MySql/Batched/DROP_migration.sql b/scripts/MySql/Batched/DROP_migration.sql new file mode 100644 index 00000000..17e2faf0 --- /dev/null +++ b/scripts/MySql/Batched/DROP_migration.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS TagTable; +DROP PROCEDURE IF EXISTS AkkaMigration_Split; +DROP PROCEDURE IF EXISTS AkkaMigration_BatchedMigration; \ No newline at end of file diff --git a/scripts/MySql/DROP_migration.sql b/scripts/MySql/DROP_migration.sql index 7fd4c24e..d4ebd04c 100644 --- a/scripts/MySql/DROP_migration.sql +++ b/scripts/MySql/DROP_migration.sql @@ -1,2 +1,2 @@ DROP TABLE IF EXISTS TagTable; -DROP PROCEDURE IF EXISTS Split; +DROP PROCEDURE IF EXISTS AkkaMigration_Split; diff --git a/scripts/PostgreSql/1_Migration_Setup.sql b/scripts/PostgreSql/1_Migration_Setup.sql index ee6096c9..8fbe76a6 100644 --- a/scripts/PostgreSql/1_Migration_Setup.sql +++ b/scripts/PostgreSql/1_Migration_Setup.sql @@ -6,7 +6,7 @@ CREATE TABLE IF NOT EXISTS "public"."tags"( PRIMARY KEY (ordering_id, tag, persistence_id) ); -CREATE OR REPLACE PROCEDURE "public"."Split"(id bigint, tags varchar(8000), seq_nr bigint, pid varchar(255)) AS $$ +CREATE OR REPLACE PROCEDURE "public"."AkkaMigration_Split"(id bigint, tags varchar(8000), seq_nr bigint, pid varchar(255)) AS $$ DECLARE var_t record; BEGIN FOR var_t IN(SELECT unnest(string_to_array(tags, ';')) AS t) @@ -19,12 +19,12 @@ BEGIN END $$ LANGUAGE plpgsql; -CREATE OR REPLACE PROCEDURE "public"."Normalize"() AS $$ +CREATE OR REPLACE PROCEDURE "public"."AkkaMigration_Normalize"() AS $$ DECLARE var_r record; BEGIN - FOR var_r IN(SELECT ej."ordering" AS id, ej."tags", ej."sequence_nr" as seq_nr, ej."persistence_id" AS pid FROM "public"."event_journal" AS ej ORDER BY "ordering") - LOOP - CALL "public"."Split"(var_r.id, var_r.tags, var_r.seq_nr, var_r.pid); + FOR var_r IN(SELECT ej."ordering" AS id, ej."tags", ej."sequence_nr" as seq_nr, ej."persistence_id" AS pid FROM "public"."event_journal" AS ej ORDER BY "ordering") + LOOP + CALL "public"."AkkaMigration_Split"(var_r.id, var_r.tags, var_r.seq_nr, var_r.pid); END LOOP; END $$ LANGUAGE plpgsql; diff --git a/scripts/PostgreSql/2_Migration.sql b/scripts/PostgreSql/2_Migration.sql index 64949092..20850ce8 100644 --- a/scripts/PostgreSql/2_Migration.sql +++ b/scripts/PostgreSql/2_Migration.sql @@ -1 +1 @@ -CALL "public"."Normalize"(); \ No newline at end of file +CALL "public"."AkkaMigration_Normalize"(); \ No newline at end of file diff --git a/scripts/PostgreSql/3_Post_Migration_Cleanup.sql b/scripts/PostgreSql/3_Post_Migration_Cleanup.sql index e6b2f6f2..77f5f5e7 100644 --- a/scripts/PostgreSql/3_Post_Migration_Cleanup.sql +++ b/scripts/PostgreSql/3_Post_Migration_Cleanup.sql @@ -1,2 +1,2 @@ -DROP PROCEDURE IF EXISTS "public"."Split"; -DROP PROCEDURE IF EXISTS "public"."Normalize"; \ No newline at end of file +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Split"; +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Normalize"; \ No newline at end of file diff --git a/scripts/PostgreSql/Batched/1_Migration_Setup.sql b/scripts/PostgreSql/Batched/1_Migration_Setup.sql new file mode 100644 index 00000000..99c6bf0e --- /dev/null +++ b/scripts/PostgreSql/Batched/1_Migration_Setup.sql @@ -0,0 +1,55 @@ +CREATE TABLE IF NOT EXISTS "public"."tags"( + ordering_id BIGINT NOT NULL, + tag VARCHAR(64) NOT NULL, + sequence_nr BIGINT NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + PRIMARY KEY (ordering_id, tag) +); + +CREATE OR REPLACE PROCEDURE "public"."AkkaMigration_Split"(id bigint, tags varchar(8000), seq_nr bigint, pid varchar(255)) AS $$ +DECLARE var_t record; +BEGIN + FOR var_t IN(SELECT unnest(string_to_array(tags, ';')) AS t) + LOOP + CONTINUE WHEN var_t.t IS NULL OR var_t.t = ''; + INSERT INTO "public"."tags" (ordering_id, tag, sequence_nr, persistence_id) + VALUES (id, var_t.t, seq_nr, pid) + ON CONFLICT DO NOTHING; + END LOOP; +END +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE PROCEDURE "public"."AkkaMigration_Normalize"(IN fromId BIGINT, IN toId BIGINT) AS $$ +DECLARE var_r record; +BEGIN + FOR var_r IN( + SELECT ej."ordering" AS id, ej."tags", ej."sequence_nr" as seq_nr, ej."persistence_id" AS pid + FROM "public"."event_journal" AS ej + WHERE ej.ordering >= fromId AND ej.ordering <= toId + ORDER BY "ordering") + LOOP + CALL "public"."AkkaMigration_Split"(var_r.id, var_r.tags, var_r.seq_nr, var_r.pid); + END LOOP; +END +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE PROCEDURE "public"."AkkaMigration_BatchedMigration"(IN from_id BIGINT) AS $$ +DECLARE max_id BIGINT; +BEGIN + max_id := (SELECT MAX(ej."ordering") + FROM event_journal ej + WHERE + ej."tags" IS NOT NULL + AND LENGTH(ej."tags") > 0 + AND ej."ordering" NOT IN (SELECT t."ordering_id" FROM "public"."tags" t)); + + LOOP + EXIT WHEN from_id > max_id; + + CALL "public"."AkkaMigration_Normalize"(from_id, from_id + 1000); + COMMIT; + + from_id := from_id + 1000; + END LOOP; +END; +$$ LANGUAGE plpgsql; \ No newline at end of file diff --git a/scripts/PostgreSql/Batched/2_Migration.sql b/scripts/PostgreSql/Batched/2_Migration.sql new file mode 100644 index 00000000..5da5d3f7 --- /dev/null +++ b/scripts/PostgreSql/Batched/2_Migration.sql @@ -0,0 +1 @@ +CALL "public"."AkkaMigration_BatchedMigration"(0); \ No newline at end of file diff --git a/scripts/PostgreSql/Batched/3_Post_Migration_Cleanup.sql b/scripts/PostgreSql/Batched/3_Post_Migration_Cleanup.sql new file mode 100644 index 00000000..5a02e65e --- /dev/null +++ b/scripts/PostgreSql/Batched/3_Post_Migration_Cleanup.sql @@ -0,0 +1,3 @@ +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Split"; +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Normalize"; +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_BatchedMigration"; \ No newline at end of file diff --git a/scripts/PostgreSql/Batched/DROP_migration.sql b/scripts/PostgreSql/Batched/DROP_migration.sql new file mode 100644 index 00000000..d5c7549f --- /dev/null +++ b/scripts/PostgreSql/Batched/DROP_migration.sql @@ -0,0 +1,4 @@ +DROP TABLE IF EXISTS "public"."TagTable"; +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Split"; +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Normalize"; +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_BatchedMigration"; \ No newline at end of file diff --git a/scripts/PostgreSql/DROP_migration.sql b/scripts/PostgreSql/DROP_migration.sql index e7c3046b..47ce4af0 100644 --- a/scripts/PostgreSql/DROP_migration.sql +++ b/scripts/PostgreSql/DROP_migration.sql @@ -1,3 +1,3 @@ DROP TABLE IF EXISTS "public"."TagTable"; -DROP PROCEDURE IF EXISTS "public"."Split"; -DROP PROCEDURE IF EXISTS "public"."Normalize"; +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Split"; +DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Normalize"; diff --git a/scripts/SqlServer/1_Migration_Setup.sql b/scripts/SqlServer/1_Migration_Setup.sql index 64f97e9f..20eecc6e 100644 --- a/scripts/SqlServer/1_Migration_Setup.sql +++ b/scripts/SqlServer/1_Migration_Setup.sql @@ -11,7 +11,7 @@ BEGIN END GO -CREATE OR ALTER FUNCTION [dbo].[Split](@String VARCHAR(8000), @Delimiter CHAR(1)) +CREATE OR ALTER FUNCTION [dbo].[AkkaMigration_Split](@String VARCHAR(8000), @Delimiter CHAR(1)) RETURNS @temptable TABLE (items VARCHAR(8000)) AS BEGIN DECLARE @idx INT diff --git a/scripts/SqlServer/2_Migration.sql b/scripts/SqlServer/2_Migration.sql index 55fc033b..550f4e9a 100644 --- a/scripts/SqlServer/2_Migration.sql +++ b/scripts/SqlServer/2_Migration.sql @@ -1,8 +1,8 @@ INSERT INTO [dbo].[tags]([ordering_id], [tag], [sequence_nr], [persistence_id]) SELECT * FROM ( - SELECT a.[Ordering], b.[items], a.SequenceNr, a.PersistenceId FROM - [dbo].[EventJournal] AS a - CROSS APPLY [dbo].[Split](a.Tags, ';') b + SELECT records.[Ordering], cross_product.[items], records.SequenceNr, records.PersistenceId FROM + [dbo].[EventJournal] AS records + CROSS APPLY [dbo].[AkkaMigration_Split](records.Tags, ';') cross_product ) AS s([ordering_id], [tag], [sequence_nr], [persistence_id]) WHERE NOT EXISTS ( SELECT * FROM [dbo].[tags] t WITH (updlock) diff --git a/scripts/SqlServer/3_Post_Migration_Cleanup.sql b/scripts/SqlServer/3_Post_Migration_Cleanup.sql index b46a9195..575e5cc3 100644 --- a/scripts/SqlServer/3_Post_Migration_Cleanup.sql +++ b/scripts/SqlServer/3_Post_Migration_Cleanup.sql @@ -1 +1 @@ -DROP FUNCTION IF EXISTS [dbo].[Split]; \ No newline at end of file +DROP FUNCTION IF EXISTS [dbo].[AkkaMigration_Split]; \ No newline at end of file diff --git a/scripts/SqlServer/Batched/1_Migration_Setup.sql b/scripts/SqlServer/Batched/1_Migration_Setup.sql new file mode 100644 index 00000000..c6394bb0 --- /dev/null +++ b/scripts/SqlServer/Batched/1_Migration_Setup.sql @@ -0,0 +1,71 @@ +IF NOT EXISTS(SELECT * FROM INFORMATION_SCHEMA.TABLES + WHERE TABLE_SCHEMA = N'dbo' AND TABLE_NAME = N'tags') +BEGIN + CREATE TABLE [dbo].[tags]( + ordering_id BIGINT NOT NULL, + tag NVARCHAR(64) NOT NULL, + sequence_nr BIGINT NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + PRIMARY KEY (ordering_id, tag, persistence_id) + ); +END +GO + +CREATE OR ALTER FUNCTION [dbo].[AkkaMigration_Split](@String VARCHAR(8000), @Delimiter CHAR(1)) + RETURNS @temptable TABLE (items VARCHAR(8000)) AS +BEGIN + DECLARE @idx INT + DECLARE @slice VARCHAR(8000) + + SELECT @idx = 1 + IF LEN(@String) < 1 OR @String is NULL + RETURN + + WHILE @idx != 0 + BEGIN + SET @idx = CHARINDEX(@Delimiter, @String) + IF @idx != 0 + SET @slice = LEFT(@String,@idx - 1) + ELSE + SET @slice = @String + + IF(LEN(@slice) > 0) + INSERT INTO @temptable(Items) VALUES(@slice) + + SET @String = RIGHT(@String, LEN(@String) - @idx) + IF len(@String) = 0 + BREAK + END + RETURN +END; +GO + +CREATE OR ALTER PROCEDURE [dbo].[AkkaMigration_BatchedMigration](@from_id BIGINT) AS +BEGIN + DECLARE @max_id BIGINT; + + SELECT @max_id = MAX(ej.[Ordering]) + FROM [dbo].[EventJournal] ej + WHERE + ej.[Tags] IS NOT NULL + AND LEN(ej.[Tags]) > 0 + AND ej.[Ordering] NOT IN (SELECT t.[ordering_id] FROM [dbo].[tags] t); + + WHILE @from_id <= @max_id + BEGIN + BEGIN TRAN; + INSERT INTO [dbo].[tags]([ordering_id], [tag], [sequence_nr], [persistence_id]) + SELECT * FROM ( + SELECT records.[Ordering], cross_product.[items], records.SequenceNr, records.PersistenceId FROM + [dbo].[EventJournal] AS records + CROSS APPLY [dbo].[AkkaMigration_Split](records.Tags, ';') cross_product + ) AS s([ordering_id], [tag], [sequence_nr], [persistence_id]) + WHERE NOT EXISTS ( + SELECT * FROM [dbo].[tags] t WITH (updlock) + WHERE s.[ordering_id] = t.[ordering_id] AND s.[tag] = t.[tag] + ); + COMMIT TRAN; + + SET @from_id = @from_id + 1000; + END +END; diff --git a/scripts/SqlServer/Batched/2_Migration.sql b/scripts/SqlServer/Batched/2_Migration.sql new file mode 100644 index 00000000..2b04066c --- /dev/null +++ b/scripts/SqlServer/Batched/2_Migration.sql @@ -0,0 +1 @@ +EXEC [dbo].[AkkaMigration_BatchedMigration] @from_id = 0; \ No newline at end of file diff --git a/scripts/SqlServer/Batched/3_Post_Migration_Cleanup.sql b/scripts/SqlServer/Batched/3_Post_Migration_Cleanup.sql new file mode 100644 index 00000000..8b2a02b3 --- /dev/null +++ b/scripts/SqlServer/Batched/3_Post_Migration_Cleanup.sql @@ -0,0 +1,2 @@ +DROP FUNCTION IF EXISTS [dbo].[AkkaMigration_Split]; +DROP PROCEDURE IF EXISTS [dbo].[AkkaMigration_BatchedMigration]; \ No newline at end of file diff --git a/scripts/SqlServer/Batched/DROP_Migration.sql b/scripts/SqlServer/Batched/DROP_Migration.sql new file mode 100644 index 00000000..2d6f23ff --- /dev/null +++ b/scripts/SqlServer/Batched/DROP_Migration.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS [dbo].[TagTable]; +DROP FUNCTION IF EXISTS [dbo].[AkkaMigration_Split]; +DROP PROCEDURE IF EXISTS [dbo].[AkkaMigration_BatchedMigration]; \ No newline at end of file diff --git a/scripts/SqlServer/DROP_Migration.sql b/scripts/SqlServer/DROP_Migration.sql index e3cf6a35..3b89c8a7 100644 --- a/scripts/SqlServer/DROP_Migration.sql +++ b/scripts/SqlServer/DROP_Migration.sql @@ -1,2 +1,2 @@ DROP TABLE IF EXISTS [dbo].[TagTable]; -DROP FUNCTION IF EXISTS [dbo].[Split]; \ No newline at end of file +DROP FUNCTION IF EXISTS [dbo].[AkkaMigration_Split]; \ No newline at end of file